Issue
I m getting an error while installing the following dependency spark-sql-kafka
. I m using Jupyter notebook under a python virtual environment.
Project Configuration
Scala version: 2.12.17
Pyspark version: 3.4.0
Python version: 3.11
Dependencies: org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0
oS: Unix (macOs)
Error displayed during compilation
Ivy Default Cache set to: /Users/diegogallovalenzuela/.ivy2/cache
The jars for the packages stored in: /Users/diegogallovalenzuela/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b695ed4f-3ee2-4af4-bcdb-724f3ed5ce8f;1.0
confs: [default]
found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.0 in local-m2-cache
found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.0 in local-m2-cache
found org.apache.kafka#kafka-clients;3.3.2 in local-m2-cache
found org.lz4#lz4-java;1.8.0 in local-m2-cache
found org.xerial.snappy#snappy-java;1.1.9.1 in local-m2-cache
found org.slf4j#slf4j-api;2.0.6 in local-m2-cache
found org.apache.hadoop#hadoop-client-runtime;3.3.4 in local-m2-cache
found org.apache.hadoop#hadoop-client-api;3.3.4 in local-m2-cache
found commons-logging#commons-logging;1.1.3 in local-m2-cache
found com.google.code.findbugs#jsr305;3.0.0 in local-m2-cache
found org.apache.commons#commons-pool2;2.11.1 in local-m2-cache
:: resolution report :: resolve 713ms :: artifacts dl 29ms
:: modules in use:
com.google.code.findbugs#jsr305;3.0.0 from local-m2-cache in [default]
commons-logging#commons-logging;1.1.3 from local-m2-cache in [default]
org.apache.commons#commons-pool2;2.11.1 from local-m2-cache in [default]
org.apache.hadoop#hadoop-client-api;3.3.4 from local-m2-cache in [default]
org.apache.hadoop#hadoop-client-runtime;3.3.4 from local-m2-cache in [default]
org.apache.kafka#kafka-clients;3.3.2 from local-m2-cache in [default]
org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.0 from local-m2-cache in [default]
org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.0 from local-m2-cache in [default]
org.lz4#lz4-java;1.8.0 from local-m2-cache in [default]
org.slf4j#slf4j-api;2.0.6 from local-m2-cache in [default]
org.xerial.snappy#snappy-java;1.1.9.1 from local-m2-cache in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 11 | 0 | 0 | 0 || 11 | 0 |
---------------------------------------------------------------------
:: problems summary ::
:::: WARNINGS
[NOT FOUND ] org.apache.kafka#kafka-clients;3.3.2!kafka-clients.jar (2ms)
==== local-m2-cache: tried
file:/Users/diegogallovalenzuela/.m2/repository/org/apache/kafka/kafka-clients/3.3.2/kafka-clients-3.3.2.jar
[NOT FOUND ] org.lz4#lz4-java;1.8.0!lz4-java.jar (1ms)
==== local-m2-cache: tried
file:/Users/diegogallovalenzuela/.m2/repository/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar
[NOT FOUND ] org.xerial.snappy#snappy-java;1.1.9.1!snappy-java.jar(bundle) (0ms)
==== local-m2-cache: tried
file:/Users/diegogallovalenzuela/.m2/repository/org/xerial/snappy/snappy-java/1.1.9.1/snappy-java-1.1.9.1.jar
[NOT FOUND ] org.slf4j#slf4j-api;2.0.6!slf4j-api.jar (0ms)
==== local-m2-cache: tried
file:/Users/diegogallovalenzuela/.m2/repository/org/slf4j/slf4j-api/2.0.6/slf4j-api-2.0.6.jar
::::::::::::::::::::::::::::::::::::::::::::::
:: FAILED DOWNLOADS ::
:: ^ see resolution messages for details ^ ::
::::::::::::::::::::::::::::::::::::::::::::::
:: org.apache.kafka#kafka-clients;3.3.2!kafka-clients.jar
:: org.lz4#lz4-java;1.8.0!lz4-java.jar
:: org.xerial.snappy#snappy-java;1.1.9.1!snappy-java.jar(bundle)
:: org.slf4j#slf4j-api;2.0.6!slf4j-api.jar
::::::::::::::::::::::::::::::::::::::::::::::
:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
Exception in thread "main" java.lang.RuntimeException: [download failed: org.apache.kafka#kafka-clients;3.3.2!kafka-clients.jar, download failed: org.lz4#lz4-java;1.8.0!lz4-java.jar, download failed: org.xerial.snappy#snappy-java;1.1.9.1!snappy-java.jar(bundle), download failed: org.slf4j#slf4j-api;2.0.6!slf4j-api.jar]
at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1528)
at org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:185)
at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:332)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Project Code
from pyspark.sql.functions import from_json, col
import os
scala_version = 2.12 # TODO: Ensure this is correct
spark_version = 3.4.0
packages = [
f org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version} ,
org.apache.kafka:kafka-clients:3.3.2
]
spark = SparkSession.builder
.master("local")
.appName("kafka-example")
.config("spark.jars.packages", ",".join(packages))
.getOrCreate()
spark
# Define the Kafka broker(s) and topic(s)
kafka_brokers = "localhost:9093"
kafka_topic = "topic1"
# Define the Kafka source options
kafka_options = {
"kafka.bootstrap.servers": kafka_brokers,
"subscribe": kafka_topic,
"startingOffsets": "latest"
}
# Read from Kafka as a streaming DataFrame
streaming_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_brokers)
.option("subscribe", kafka_topic)
.load()
# Define the schema for the value column
value_schema = spark.read.json(streaming_df.select("value").limit(1).rdd.map(lambda x: x[0])).schema
# Parse the value column as JSON and select required fields
parsed_df = streaming_df
.select(from_json(col("value").cast("string"), value_schema).alias("data"))
.select("data.column1", "data.column2", "data.column3")
# Define the sink for the parsed data (e.g., console, file, etc.)
sink_query = parsed_df
.writeStream
.format("console")
.outputMode("append")
.start()
# Start the streaming query
sink_query.awaitTermination()
Resources and similar question consulted
Update
I m not sure if this is a PATH
or network connection error, but it works if I run it on Windows. Need to make it work on Unix (macOs).