English 中文(简体)
Got "org.apache.kafka.common.errors.TimeoutException: Topic xxx not present in metadata after 60000 ms" when write from EMR to MSK
原标题:

Issue

I have a Spark application in Scala in Amazon EMR. It tries to write data to Kafka in Amazon MSK through IAM authentication way.

I have created a topic called hm.motor.avro in the Amazon MSK by:

bin/kafka-topics.sh 
    --bootstrap-server=b-1.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098,b-2.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098 
    --command-config=config/client.properties 
    --create 
    --topic=hm.motor.avro 
    --partitions=3 
    --replication-factor=2

Here is the related Spark code writing to MSK using IAM authentication way:

val query = df.writeStream
  .format("kafka")
  .option(
    "kafka.bootstrap.servers",
    "b-1.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098,b-2.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098",
  )
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.sasl.mechanism", "AWS_MSK_IAM")
  .option("kafka.sasl.jaas.config", "software.amazon.msk.auth.iam.IAMLoginModule required;")
  .option("kafka.sasl.client.callback.handler.class", "software.amazon.msk.auth.iam.IAMClientCallbackHandler")
  .option("topic", "hm.motor.avro")
  .option("checkpointLocation", "/tmp/checkpoint")
  .start()

build.sbt (I am using Spark 3.3.1 which is the Spark version used in Amazon EMR)

name := "IngestFromS3ToKafka"
version := "1.0"
scalaVersion := "2.12.17"
resolvers += "confluent" at "https://packages.confluent.io/maven/"

val sparkVersion = "3.3.1"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
  "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
  "org.apache.hadoop" % "hadoop-common" % "3.3.5" % "provided",
  "org.apache.hadoop" % "hadoop-aws" % "3.3.5" % "provided",
  "com.amazonaws" % "aws-java-sdk-bundle" % "1.12.475" % "provided",

  "org.apache.spark" %% "spark-avro" % sparkVersion,
  "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
  "io.delta" %% "delta-core" % "2.3.0",
  "za.co.absa" %% "abris" % "6.3.0",
  "software.amazon.msk" % "aws-msk-iam-auth" % "1.1.6"
)

ThisBuild / assemblyMergeStrategy := {
  // https://stackoverflow.com/a/67937671/2000548
  case PathList("module-info.class") => MergeStrategy.discard
  case x if x.endsWith("/module-info.class") => MergeStrategy.discard
  // https://stackoverflow.com/a/76129963/2000548
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  // https://stackoverflow.com/a/54634225/2000548
  case x if x.contains("io.netty.versions.properties") => MergeStrategy.discard
  case x =>
    val oldStrategy = (ThisBuild / assemblyMergeStrategy).value
    oldStrategy(x)
}

However, when I spark-submit in both YARN cluster and client modes, I got error:

Caused by: org.apache.spark.SparkException: Writing job aborted
    at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:767)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:409)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:353)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:302)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:313)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3932)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3161)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3922)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:554)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3920)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3920)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:3161)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:669)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:664)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:664)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:256)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:219)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:213)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:307)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:285)
    ... 1 more
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 4 times, most recent failure: Lost task 0.3 in stage 6.0 (TID 106) (ip-xxx-xxx-xxx-xxx.xxx.com executor 1): org.apache.kafka.common.errors.TimeoutException: Topic hm.motor.avro not present in metadata after 60000 ms.

Note the last line

org.apache.kafka.common.errors.TimeoutException: Topic hm.motor.avro not present in metadata after 60000 ms.

Experiment 1

I installed Kafka CLI in Amazon EMR master node. I am able to list the topics in Amazon MSK by

[hadoop@ip-xxx-xxx-xxx-xxx kafka_2.13-3.4.0]$ bin/kafka-topics.sh 
    --bootstrap-server=b-1.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098,b-2.myemr.xxx.c12.kafka.us-west-2.amazonaws.com:9098 
    --command-config=config/client.properties 
    --list

__amazon_msk_canary
__consumer_offsets
_schema_encoders
_schemas
hm.motor.avro

config/client.properties:

security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

You can see hm.motor.avro is there. It also means using same IAM way, in Spark master node, it is able to access the topics in Amazon MSK.

When I created the EMR cluster, it shows

The instance profile assigns a role to every EC2 instance in a cluster.

enter image description here

So I expect both Spark master and worker nodes in EMR share same access to MSK.

Any guide would be appreciated!

问题回答

暂无回答




相关问题
Mount windows shared drive to MWAA in bootscript

In MWAA startup script sudo yum install samba-client cifs-utils -y sudo mount.cifs //dev/test/drop /mnt/dev/test-o username=testuser,password= pwd ,domain=XX Executing above commonds giving error - ...

How to get Amazon Seller Central orders programmatically?

We have been manually been keying Amazon orders into our system and would like to automate it. However, I can t seem to figure out how to go about it. Their documentation is barely there. There is: ...

Using a CDN like Amazon S3 to control access to media

I want to use Amazon S3/CloudFront to store flash files. These files must be private as they will be accessed by members. This will be done by storing each file with a link to Amazon using a mysql ...

unable to connect to database on AWS

actually I have my website build with Joomla hosted on hostmonster but all Joomla website need a database support to run this database is on AWS configuration files need to be updated for that I ...

Using EC2 Load Balancing with Existing Wordpress Blog

I currently have a virtual dedicated server through Media Temple that I use to run several high traffic Wordpress blogs. Both tend to receive sudden StumbleUpon traffic surges that (I m assuming) ...

SSL slowness in EC2

We ve deployed our rails app to EC2. In our setup, we have two proxies on small instances behind round-robin DNS. These run nginx load balancers for a dynamically growing and shrinking farm of web ...

热门标签