English 中文(简体)
Spark Kinessis Stream Scala 样例试图跟随 Spark Document 文档, 但没有产生输出
原标题:Spark Kinesis Stream Scala Example trying to follow Spark Documentation but does not produce output
I am trying to process data from Kinesis stream using Spark streaming. I am using https://spark.apache.org/docs/latest/streaming-kinesis-integration.html as reference to write code object TimeAndLanFixer { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[6]").setAppName("LangAndTimeZoneFixer") val ssc = new StreamingContext(conf, Seconds(3)) val time = System.currentTimeMillis() println(s"Current time in millis: $time") val lang = System.getProperty("user.language") println(s"Current language: $lang") val initialPositionTimestamp = "2024-06-27 00:00:00" val date: Date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(initialPositionTimestamp) val kinesisStream = KinesisInputDStream.builder .streamName(System.getenv("STREAM_NAME")) .streamingContext(ssc) .endpointUrl("https://kinesis.us-west-2.amazonaws.com") .regionName("us-west-2") .checkpointAppName("DebugApp").checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .initialPosition(new AtTimestamp(date)) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .build() kinesisStream.start() kinesisStream.foreachRDD(rdd => { rdd.foreach(println) }) ssc.awaitTermination() } } When I run this,I don t see any output on console in Intellij. I just see two println that i added in program. My input kinesis stream has 5 shards and based on some of reading that I did on internet,if testing locally,we need to supply a number more than number of shards (local[6]) to process data locally. Program is still not moving ahead pom.xml org.apache.spark spark-core_2.12 3.5.1 org.apache.spark spark-streaming-kinesis-asl_2.12 3.5.1
问题回答
Missing link was ssc.start() It seems weird that u need to call ssc.start() as well in addition to kinesis.start() Facing a lot of young GEN GC issues but that is for other post Regards Bhanu Regards Bhanu




相关问题
How to flatten a List of different types in Scala?

I have 4 elements:List[List[Object]] (Objects are different in each element) that I want to zip so that I can have a List[List[obj1],List[obj2],List[obj3],List[obj4]] I tried to zip them and I ...

To use or not to use Scala for new Java projects? [closed]

I m impressed with Twitter and investigating to use Scala for a new large scale web project with Hibernate and Wicket. What do you think about Scala, and should I use it instead of Java? EDIT: And, ...

Why does Scala create a ~/tmp directory when I run a script?

When I execute a Scala script from the command line, a directory named "tmp" is created in my home directory. It is always empty, so I simply deleted it without any apparent problem. Of course, when I ...

Include jar file in Scala interpreter

Is it possible to include a jar file run running the Scala interpreter? My code is working when I compile from scalac: scalac script.scala -classpath *.jar But I would like to be able to include a ...

Scala and tail recursion

There are various answers on Stack Overflow which explain the conditions under which tail recursion is possible in Scala. I understand the limitations and how and where I can take advantage of tail ...

热门标签