Summary
Developing a Spark application using Java for financial analytics. Apache Kafka is being used for streaming data in conjunction with Spark Struct Streaming API for reading data. group_by()
and window()
functions are used to calculate the average bid and ask prices every minute. I want to calculate the percentage change of the average of those two prices between the current and past window.
Relevant code block
`public static void test2() throws Exception {
// Create SparkSession
SparkSession spark = SparkSession.builder()
.appName("BidAskPriceChanges")
.master("local")
.getOrCreate();
// Define the schema for the streaming data
StructType schema = new StructType()
.add("book", "string", false)
.add("volume", "string", false)
.add("high", "string",true)
.add("last", "string", false)
.add("low", "string", false)
.add("vwap", "string", false)
.add("ask", "string", false)
.add("bid", "string", false)
.add("created_at", "string",true);
// Read streaming data from Kafka
Dataset<Row> data = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9093")
.option("subscribe", "topic1")
.load()
.selectExpr("CAST(value AS STRING) as json")
.select(functions.from_json(functions.col("json"), schema).as("data"))
.select("data.*");
// Parse the created_at column as timestamp
data = data
.withColumn("volume", col("volume").cast(DataTypes.DoubleType))
.withColumn("high", col("high").cast(DataTypes.DoubleType))
.withColumn("last", col("last").cast(DataTypes.DoubleType))
.withColumn("low", col("low").cast(DataTypes.DoubleType))
.withColumn("vwap", col("vwap").cast(DataTypes.DoubleType))
.withColumn("ask", col("ask").cast(DataTypes.DoubleType))
.withColumn("bid", col("bid").cast(DataTypes.DoubleType))
.withColumn("created_at", to_timestamp(col("created_at"), "yyyy-MM-dd T HH:mm:ssZ"));
// Define the window duration and sliding interval
String windowDuration = "5 minutes";
String slidingInterval = "1 minute";
WindowSpec windowSpec = Window.partitionBy(window(col("created_at"), windowDuration, slidingInterval))
.orderBy("created_at");
Dataset<Row> windowedData = data
.withColumn("avg_bid", avg(col("bid")).over(windowSpec))
.withColumn("avg_ask", avg(col("ask")).over(windowSpec));
// Compute the lagged average bid/ask prices for comparison
Column prevAvgBid = lag(col("avg_bid"), 1).over(windowSpec);
Column prevAvgAsk = lag(col("avg_ask"), 1).over(windowSpec);
windowedData = windowedData.withColumn("prev_avg_bid", prevAvgBid)
.withColumn("prev_avg_ask", prevAvgAsk);
Dataset<Row> avgPrices = data
.withWatermark("created_at", "1 minute")
.groupBy(window(col("created_at"), "1 minute"))
.agg(avg("bid").alias("avg_bid"), avg("ask").alias("avg_ask"))
.withColumn("next_minute", col("window.end"));
.withColumn("bid_percentage_change", (col("avg_bid").minus(lag("avg_bid", 1).over(Window.orderBy("window")))).divide(lag("avg_bid", 1).over(Window.orderBy("window"))).multiply(100))
.withColumn("ask_percentage_change", (col("avg_ask").minus(lag("avg_ask", 1).over(Window.orderBy("window")))).divide(lag("avg_ask", 1).over(Window.orderBy("window"))).multiply(100));
// Start the streaming query to write the results to the console
StreamingQuery query = avgPrices
.writeStream()
.outputMode("complete")
.format("console")
.trigger(Trigger.ProcessingTime("1 minute"))
.start();
query.awaitTermination();`
Errors displayed
When I add the columns bid_percentage_change
and ask_percentage_change
I go the error: org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets;
. If I remove those two columns, the program runs without a problem.
Additional information
I ve consulted additional resources before posting my question but none have thrown a solution. According to one answer from a similar question, Spark Struct Streaming does not handle multiple aggregation operations. Tried implementing an apporach using flatMapGroupWithState
with no result. Here is a list of resources I already consulted.
- ChatGPT
- Similar SOF questions and answers Spark - Non-time-based windows are not supported on streaming DataFrames/Datasets;.
- Official Spark Documentation