Issue
I have a Spark application in Scala in Amazon EMR. It tries to write data to Kafka in Amazon MSK through IAM authentication way.
I have created a topic called hm.motor.avro
in the Amazon MSK by:
bin/kafka-topics.sh
--bootstrap-server=b-1.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098,b-2.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098
--command-config=config/client.properties
--create
--topic=hm.motor.avro
--partitions=3
--replication-factor=2
Here is the related Spark code writing to MSK using IAM authentication way:
val query = df.writeStream
.format("kafka")
.option(
"kafka.bootstrap.servers",
"b-1.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098,b-2.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098",
)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "AWS_MSK_IAM")
.option("kafka.sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
.option("kafka.sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
.option("topic", "hm.motor.avro")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
build.sbt (I am using Spark 3.3.1 which is the Spark version used in Amazon EMR)
name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
resolvers += "confluent" at "https://packages.confluent.io/maven/"
val sparkVersion = "3.3.1"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
"org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
"com.amazonaws" % "aws-java-sdk-bundle" % "1.12.475" % "provided",
"org.apache.spark" %% "spark-avro" % sparkVersion,
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
"io.delta" %% "delta-core" % "2.3.0",
"za.co.absa" %% "abris" % "6.3.0",
"software.amazon.msk" % "aws-msk-iam-auth" % "1.1.6"
)
ThisBuild / assemblyMergeStrategy := {
// https://stackoverflow.com/a/67937671/2000548
case PathList("module-info.class") => MergeStrategy.discard
case x if x.endsWith("/module-info.class") => MergeStrategy.discard
// https://stackoverflow.com/a/76129963/2000548
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
// https://stackoverflow.com/a/54634225/2000548
case x if x.contains("io.netty.versions.properties") => MergeStrategy.discard
case x =>
val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
oldStrategy(x)
}
However, when I spark-submit
in both YARN cluster and client modes, I got error:
Caused by: org.apache.spark.SparkException: Writing job aborted
at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:767)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:409)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:353)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:302)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:313)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3932)
at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3161)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3922)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:554)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3920)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3920)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:3161)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:669)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:664)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:664)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
... 1 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 106) (ip-xxx-xxx-xxx-xxx.xxx.com executor 1): org.apache.kafka.common.errors.TimeoutException: Topic hm.motor.avro not present in metadata after 60000 ms.
Note the last line
org.apache.kafka.common.errors.TimeoutException: Topic hm.motor.avro not present in metadata after 60000 ms.
Experiment 1
I installed Kafka CLI in Amazon EMR master node. I am able to list the topics in Amazon MSK by
[hadoop@ip-xxx-xxx-xxx-xxx kafka_2.13-3.4.0]$ bin/kafka-topics.sh
--bootstrap-server=b-1.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098,b-2.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098
--command-config=config/client.properties
--list
__amazon_msk_canary
__consumer_offsets
_schema_encoders
_schemas
hm.motor.avro
config/client.properties:
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
You can see hm.motor.avro
is there. It also means using same IAM way, in Spark master node, it is able to access the topics in Amazon MSK.
When I created the EMR cluster, it shows
The instance profile assigns a role to every EC2 instance in a cluster.
So I expect both Spark master and worker nodes in EMR share same access to MSK.
Any guide would be appreciated!