We are trying to implement a streaming ingestion from Kafka using Databricks. Here is our Spark job:
# Databricks notebook source
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, DoubleType, LongType
# COMMAND ----------
spark_depenedencies_jars = [
"org.apache.hadoop:hadoop-common:3.3.1",
"org.apache.hadoop:hadoop-client:3.3.1",
"org.apache.hadoop:hadoop-aws:3.3.1",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1",
"org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.1",
"io.delta:delta-core_2.12:2.2.0"
]
spark_depenedencies_jars_str = ",".join(spark_depenedencies_jars)
# COMMAND ----------
KAFKA_BOOTSTRAP_SERVERS = "10.0.0.91:9094"
BASE_CHECKPOINT_LOCATION = "wasbs://warehouse@miniobucketsphera.blob.core.windows.net/checkpoint/"
BASE_DELTA_DIR_VM_AZURE = "wasbs://warehouse@miniobucketsphera.blob.core.windows.net/delta/inventory/"
spark = SparkSession.builder
.appName("Kafka Streaming Example")
.master(SPARK_MASTER_LOCAL)
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
.config("spark.hadoop.hive.metastore.uris", HIVE_METASTORE_URI)
.config("spark.sql.catalogImplementation", "hive")
.enableHiveSupport()
.getOrCreate()
df.write.format("delta").mode("overwrite").save(delta_wasbs_path)
# COMMAND ----------
# .writeStream test
def read_kafka_stream(kafka_topic, schema):
kafka_stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
.option("subscribe", kafka_topic)
.option("failOnDataLoss","false")
.option("startingOffsets", "earliest")
.load()
data_stream = kafka_stream.selectExpr("cast (value as string) as json")
.select(from_json("json", schema).alias("cdc"))
.select("cdc.payload.after.*", "cdc.payload.op")
data_stream = data_stream.withColumn("curr_timestamp", current_timestamp())
return data_stream
# COMMAND ----------
# .save() test
def write_delta_table(data_stream, checkpoint_location, delta_dir):
data_stream.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpoint_location)
.start(delta_dir)
# COMMAND ----------
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema_table_schema = StructType([
StructField("id", IntegerType(), True),
StructField("age", IntegerType(), True),
StructField("name", StringType(), True),
])
source_schema = StructType([
StructField("version", StringType(), False),
StructField("connector", StringType(), False),
StructField("name", StringType(), False),
StructField("ts_ms", IntegerType(), False),
StructField("snapshot", StringType(), True),
StructField("db", StringType(), False),
StructField("sequence", StringType(), True),
StructField("table", StringType(), True),
StructField("server_id", IntegerType(), False),
StructField("gtid", StringType(), True),
StructField("file", StringType(), False),
StructField("pos", IntegerType(), False),
StructField("row", IntegerType(), False),
StructField("thread", IntegerType(), True),
StructField("query", StringType(), True),
])
transaction_schema = StructType([
StructField("id", StringType(), False),
StructField("total_order", IntegerType(), False),
StructField("data_collection_order", IntegerType(), False),
])
schema_payload_schema = StructType([
StructField("before", schema_table_schema, True),
StructField("after", schema_table_schema, True),
StructField("source", source_schema, True),
StructField("op", StringType(), False),
StructField("ts_ms", IntegerType(), True),
StructField("transaction", transaction_schema, True),
])
schema_schema = StructType([
StructField("schema", StringType(), True),
StructField("payload", schema_payload_schema, True),
])
# COMMAND ----------
schema_table_topic = "sqlf-eus-sccspoc-dev.schema1.table1"
# COMMAND ----------
schema_table_checkpoint_location = BASE_CHECKPOINT_LOCATION + "schema_table"
# COMMAND ----------
schema_table_delta_dir = BASE_DELTA_DIR_VM_AZURE + "schema_table"
# COMMAND ----------
schema_table_stream = read_kafka_stream(schema_table_topic, schema_schema)
# COMMAND ----------
write_delta_table(schema_table_stream, schema_table_checkpoint_location, schema_table_delta_dir)
We are able to see parquet partitions being created, but they are empty (only metadata but no real data)
Have anyone encountered a similar issue? If so - what was the resolution?