English 中文(简体)
Databricks Spark streaming from Kafka creates empty delta records
原标题:

We are trying to implement a streaming ingestion from Kafka using Databricks. Here is our Spark job:

# Databricks notebook source
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, DoubleType, LongType

# COMMAND ----------

spark_depenedencies_jars = [ 
    "org.apache.hadoop:hadoop-common:3.3.1",
    "org.apache.hadoop:hadoop-client:3.3.1",
    "org.apache.hadoop:hadoop-aws:3.3.1",
    "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1",
    "org.apache.spark:spark-streaming-kafka-0-10_2.12:3.3.1",
    "io.delta:delta-core_2.12:2.2.0"
]
spark_depenedencies_jars_str = ",".join(spark_depenedencies_jars)

# COMMAND ----------

KAFKA_BOOTSTRAP_SERVERS = "10.0.0.91:9094"
BASE_CHECKPOINT_LOCATION = "wasbs://warehouse@miniobucketsphera.blob.core.windows.net/checkpoint/"
BASE_DELTA_DIR_VM_AZURE = "wasbs://warehouse@miniobucketsphera.blob.core.windows.net/delta/inventory/"

spark = SparkSession.builder 
    .appName("Kafka Streaming Example") 
    .master(SPARK_MASTER_LOCAL) 
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") 
    .config("spark.databricks.delta.retentionDurationCheck.enabled", "false") 
    .config("spark.hadoop.hive.metastore.uris", HIVE_METASTORE_URI) 
    .config("spark.sql.catalogImplementation", "hive") 
    .enableHiveSupport() 
    .getOrCreate()
   
df.write.format("delta").mode("overwrite").save(delta_wasbs_path)

# COMMAND ----------

# .writeStream  test

def read_kafka_stream(kafka_topic, schema):
    kafka_stream = spark 
                .readStream 
                .format("kafka") 
                .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) 
                .option("subscribe", kafka_topic) 
                .option("failOnDataLoss","false") 
                .option("startingOffsets", "earliest") 
                .load()
    data_stream = kafka_stream.selectExpr("cast (value as string) as json") 
                            .select(from_json("json", schema).alias("cdc")) 
                            .select("cdc.payload.after.*", "cdc.payload.op")
    data_stream = data_stream.withColumn("curr_timestamp", current_timestamp())
    return data_stream

# COMMAND ----------

# .save() test

def write_delta_table(data_stream, checkpoint_location, delta_dir):
    data_stream.writeStream 
                .format("delta") 
                .outputMode("append") 
                .option("checkpointLocation", checkpoint_location) 
                .start(delta_dir)

# COMMAND ----------

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema_table_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("age", IntegerType(), True),
    StructField("name", StringType(), True),
])

source_schema = StructType([
    StructField("version", StringType(), False),
    StructField("connector", StringType(), False),
    StructField("name", StringType(), False),
    StructField("ts_ms", IntegerType(), False),
    StructField("snapshot", StringType(), True),
    StructField("db", StringType(), False),
    StructField("sequence", StringType(), True),
    StructField("table", StringType(), True),
    StructField("server_id", IntegerType(), False),
    StructField("gtid", StringType(), True),
    StructField("file", StringType(), False),
    StructField("pos", IntegerType(), False),
    StructField("row", IntegerType(), False),
    StructField("thread", IntegerType(), True),
    StructField("query", StringType(), True),
])

transaction_schema = StructType([
    StructField("id", StringType(), False),
    StructField("total_order", IntegerType(), False),
    StructField("data_collection_order", IntegerType(), False),
])

schema_payload_schema = StructType([
    StructField("before", schema_table_schema, True),
    StructField("after", schema_table_schema, True),
    StructField("source", source_schema, True),
    StructField("op", StringType(), False),
    StructField("ts_ms", IntegerType(), True),
    StructField("transaction", transaction_schema, True),
])

schema_schema = StructType([
    StructField("schema", StringType(), True),
    StructField("payload", schema_payload_schema, True),
])


# COMMAND ----------

schema_table_topic = "sqlf-eus-sccspoc-dev.schema1.table1"


# COMMAND ----------

schema_table_checkpoint_location = BASE_CHECKPOINT_LOCATION + "schema_table"


# COMMAND ----------

schema_table_delta_dir = BASE_DELTA_DIR_VM_AZURE + "schema_table"


# COMMAND ----------

schema_table_stream = read_kafka_stream(schema_table_topic, schema_schema)


# COMMAND ----------

write_delta_table(schema_table_stream, schema_table_checkpoint_location, schema_table_delta_dir)

We are able to see parquet partitions being created, but they are empty (only metadata but no real data)

empty deltatable

Have anyone encountered a similar issue? If so - what was the resolution?

问题回答

I haven t encountered the issue but intrigued by the question, found this answer in Databricks Knowledge Base:

If your streaming application is writing to a target Delta table and your source data is empty on certain micro batches, it can result in writing empty files to your target Delta table. ...

Solution

You should upgrade your clusters to Databricks Runtime 9.1 LTS or above. Databricks Runtime 9.1 LTS and above contains a fix for the issue and no longer creates empty files for empty writes.

If you can t upgrade, there is another workaround:

If you cannot upgrade to Databricks Runtime 9.1 LTS or above, you should periodically run OPTIMIZE (AWS | Azure | GCP) on the affected table to clean up the empty files. This is not a permanent fix and should be considered a workaround until you can upgrade to a newer runtime.

For a more Spark-based solution, there is a configuration flag spark.sql.streaming.noDataMicroBatches.enabled. It s used in the MicroBatchExecution:

  private def constructNextBatch(noDataBatchesEnabled: Boolean): Boolean = withProgressLocked {
    if (isCurrentBatchConstructed) return true
// ...
    val lastExecutionRequiresAnotherBatch = noDataBatchesEnabled &&
      Option(lastExecution).exists(_.shouldRunAnotherBatch(offsetSeqMetadata))
    val shouldConstructNextBatch = isNewDataAvailable || lastExecutionRequiresAnotherBatch
//..

  protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = {
// ...
          // Try to construct the next batch. This will return true only if the next batch is
          // ready and runnable. Note that the current batch may be runnable even without
          // new data to process as `constructNextBatch` may decide to run a batch for
          // state cleanup, etc. `isNewDataAvailable` will be updated to reflect whether new data
          // is available or not.
          if (!isCurrentBatchConstructed) {
            isCurrentBatchConstructed = constructNextBatch(noDataBatchesEnabled)
          }

// ...
          currentStatus = currentStatus.copy(isDataAvailable = isNewDataAvailable)
          if (isCurrentBatchConstructed) {
            if (currentBatchHasNewData) updateStatusMessage("Processing new data")
            else updateStatusMessage("No new data but cleaning up state")
            runBatch(sparkSessionForStream)
          } else {
            updateStatusMessage("Waiting for data to arrive")
          }

Of course, it won t help if the empty files are the result of your transformations, e.g. if you filtered out all the input rows. In that case, the OPTIMIZE or the runtime upgrade should help.

Please notice that when you disable the emptiness flag, you will impact stateful operations but since your pipeline doesn t use them, you should be fin.





相关问题
how to use phoenix5.0 with spark 3.0 preview

case "phoenix" =>{ outputStream.foreachRDD(rdd=>{ val spark=SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate() val ds=spark.createDataFrame(rdd,...

同一S3bucket使用多位证书

I'm using 2.1.1 with Hadoop 2.7.3 and I'm use data from different S3 sites in one管线。

热门标签