English 中文(简体)
Error during dependencies installation in pyspark 3.4.0 using a virtual environment and jupyter notebook
原标题:

Issue

I m getting an error while installing the following dependency spark-sql-kafka. I m using Jupyter notebook under a python virtual environment.

Project Configuration

Scala version: 2.12.17 Pyspark version: 3.4.0 Python version: 3.11 Dependencies: org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0 oS: Unix (macOs)

Error displayed during compilation

Ivy Default Cache set to: /Users/diegogallovalenzuela/.ivy2/cache
The jars for the packages stored in: /Users/diegogallovalenzuela/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b695ed4f-3ee2-4af4-bcdb-724f3ed5ce8f;1.0
    confs: [default]
    found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.0 in local-m2-cache
    found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.0 in local-m2-cache
    found org.apache.kafka#kafka-clients;3.3.2 in local-m2-cache
    found org.lz4#lz4-java;1.8.0 in local-m2-cache
    found org.xerial.snappy#snappy-java;1.1.9.1 in local-m2-cache
    found org.slf4j#slf4j-api;2.0.6 in local-m2-cache
    found org.apache.hadoop#hadoop-client-runtime;3.3.4 in local-m2-cache
    found org.apache.hadoop#hadoop-client-api;3.3.4 in local-m2-cache
    found commons-logging#commons-logging;1.1.3 in local-m2-cache
    found com.google.code.findbugs#jsr305;3.0.0 in local-m2-cache
    found org.apache.commons#commons-pool2;2.11.1 in local-m2-cache
:: resolution report :: resolve 713ms :: artifacts dl 29ms
    :: modules in use:
    com.google.code.findbugs#jsr305;3.0.0 from local-m2-cache in [default]
    commons-logging#commons-logging;1.1.3 from local-m2-cache in [default]
    org.apache.commons#commons-pool2;2.11.1 from local-m2-cache in [default]
    org.apache.hadoop#hadoop-client-api;3.3.4 from local-m2-cache in [default]
    org.apache.hadoop#hadoop-client-runtime;3.3.4 from local-m2-cache in [default]
    org.apache.kafka#kafka-clients;3.3.2 from local-m2-cache in [default]
    org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.0 from local-m2-cache in [default]
    org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.0 from local-m2-cache in [default]
    org.lz4#lz4-java;1.8.0 from local-m2-cache in [default]
    org.slf4j#slf4j-api;2.0.6 from local-m2-cache in [default]
    org.xerial.snappy#snappy-java;1.1.9.1 from local-m2-cache in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   11  |   0   |   0   |   0   ||   11  |   0   |
    ---------------------------------------------------------------------

:: problems summary ::
:::: WARNINGS
        [NOT FOUND  ] org.apache.kafka#kafka-clients;3.3.2!kafka-clients.jar (2ms)

    ==== local-m2-cache: tried

      file:/Users/diegogallovalenzuela/.m2/repository/org/apache/kafka/kafka-clients/3.3.2/kafka-clients-3.3.2.jar

        [NOT FOUND  ] org.lz4#lz4-java;1.8.0!lz4-java.jar (1ms)

    ==== local-m2-cache: tried

      file:/Users/diegogallovalenzuela/.m2/repository/org/lz4/lz4-java/1.8.0/lz4-java-1.8.0.jar

        [NOT FOUND  ] org.xerial.snappy#snappy-java;1.1.9.1!snappy-java.jar(bundle) (0ms)

    ==== local-m2-cache: tried

      file:/Users/diegogallovalenzuela/.m2/repository/org/xerial/snappy/snappy-java/1.1.9.1/snappy-java-1.1.9.1.jar

        [NOT FOUND  ] org.slf4j#slf4j-api;2.0.6!slf4j-api.jar (0ms)

    ==== local-m2-cache: tried

      file:/Users/diegogallovalenzuela/.m2/repository/org/slf4j/slf4j-api/2.0.6/slf4j-api-2.0.6.jar

        ::::::::::::::::::::::::::::::::::::::::::::::

        ::              FAILED DOWNLOADS            ::

        :: ^ see resolution messages for details  ^ ::

        ::::::::::::::::::::::::::::::::::::::::::::::

        :: org.apache.kafka#kafka-clients;3.3.2!kafka-clients.jar

        :: org.lz4#lz4-java;1.8.0!lz4-java.jar

        :: org.xerial.snappy#snappy-java;1.1.9.1!snappy-java.jar(bundle)

        :: org.slf4j#slf4j-api;2.0.6!slf4j-api.jar

        ::::::::::::::::::::::::::::::::::::::::::::::



:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
Exception in thread "main" java.lang.RuntimeException: [download failed: org.apache.kafka#kafka-clients;3.3.2!kafka-clients.jar, download failed: org.lz4#lz4-java;1.8.0!lz4-java.jar, download failed: org.xerial.snappy#snappy-java;1.1.9.1!snappy-java.jar(bundle), download failed: org.slf4j#slf4j-api;2.0.6!slf4j-api.jar]
    at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1528)
    at org.apache.spark.util.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:185)
    at org.apache.spark.deploy.SparkSubmit.prepareSubmitEnvironment(SparkSubmit.scala:332)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Project Code

from pyspark.sql.functions import from_json, col
import os

scala_version =  2.12   # TODO: Ensure this is correct
spark_version =  3.4.0 
packages = [
    f org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version} ,
     org.apache.kafka:kafka-clients:3.3.2 
]
spark = SparkSession.builder
   .master("local")
   .appName("kafka-example")
   .config("spark.jars.packages", ",".join(packages))
   .getOrCreate()
spark

# Define the Kafka broker(s) and topic(s)
kafka_brokers = "localhost:9093"
kafka_topic = "topic1"

# Define the Kafka source options
kafka_options = {
    "kafka.bootstrap.servers": kafka_brokers,
    "subscribe": kafka_topic,
    "startingOffsets": "latest"
}

# Read from Kafka as a streaming DataFrame
streaming_df = spark 
    .readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", kafka_brokers) 
    .option("subscribe", kafka_topic) 
    .load()

# Define the schema for the value column
value_schema = spark.read.json(streaming_df.select("value").limit(1).rdd.map(lambda x: x[0])).schema

# Parse the value column as JSON and select required fields
parsed_df = streaming_df 
    .select(from_json(col("value").cast("string"), value_schema).alias("data")) 
    .select("data.column1", "data.column2", "data.column3")

# Define the sink for the parsed data (e.g., console, file, etc.)
sink_query = parsed_df 
    .writeStream 
    .format("console") 
    .outputMode("append") 
    .start()

# Start the streaming query
sink_query.awaitTermination()

Resources and similar question consulted

  1. github
  2. Stack Overflow

Update

I m not sure if this is a PATH or network connection error, but it works if I run it on Windows. Need to make it work on Unix (macOs).

问题回答

暂无回答




相关问题
Get webpage contents with Python?

I m using Python 3.1, if that helps. Anyways, I m trying to get the contents of this webpage. I Googled for a little bit and tried different things, but they didn t work. I m guessing that this ...

What is internal representation of string in Python 3.x

In Python 3.x, a string consists of items of Unicode ordinal. (See the quotation from the language reference below.) What is the internal representation of Unicode string? Is it UTF-16? The items ...

What does Python s builtin __build_class__ do?

In Python 3.1, there is a new builtin function I don t know in the builtins module: __build_class__(...) __build_class__(func, name, *bases, metaclass=None, **kwds) -> class Internal ...

what functional tools remain in Python 3k?

I have have read several entries regarding dropping several functional functions from future python, including map and reduce. What is the official policy regarding functional extensions? is lambda ...

Building executables for Python 3 and PyQt

I built a rather simple application in Python 3.1 using PyQt4. Being done, I want the application to be distributed to computers without either of those installed. I almost exclusively care about ...

热门标签