English 中文(简体)
Calculating percentage change between average price using Java Spark Struct Streaming
原标题:

Summary

Developing a Spark application using Java for financial analytics. Apache Kafka is being used for streaming data in conjunction with Spark Struct Streaming API for reading data. group_by() and window() functions are used to calculate the average bid and ask prices every minute. I want to calculate the percentage change of the average of those two prices between the current and past window.

Relevant code block

`public static void test2() throws Exception {
    // Create SparkSession
    SparkSession spark = SparkSession.builder()
            .appName("BidAskPriceChanges")
            .master("local")
            .getOrCreate();

    // Define the schema for the streaming data
    StructType schema = new StructType()
            .add("book", "string", false)
            .add("volume", "string", false)
            .add("high", "string",true)
            .add("last", "string", false)
            .add("low", "string", false)
            .add("vwap", "string", false)
            .add("ask", "string", false)
            .add("bid", "string", false)
            .add("created_at", "string",true);

    // Read streaming data from Kafka
    Dataset<Row> data = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9093")
            .option("subscribe", "topic1")
            .load()
            .selectExpr("CAST(value AS STRING) as json")
            .select(functions.from_json(functions.col("json"), schema).as("data"))
            .select("data.*");

    // Parse the  created_at  column as timestamp
    data = data
            .withColumn("volume", col("volume").cast(DataTypes.DoubleType))
            .withColumn("high", col("high").cast(DataTypes.DoubleType))
            .withColumn("last", col("last").cast(DataTypes.DoubleType))
            .withColumn("low", col("low").cast(DataTypes.DoubleType))
            .withColumn("vwap", col("vwap").cast(DataTypes.DoubleType))
            .withColumn("ask", col("ask").cast(DataTypes.DoubleType))
            .withColumn("bid", col("bid").cast(DataTypes.DoubleType))
            .withColumn("created_at", to_timestamp(col("created_at"), "yyyy-MM-dd T HH:mm:ssZ"));
    
    
    // Define the window duration and sliding interval
    String windowDuration = "5 minutes";
    String slidingInterval = "1 minute";
    
    WindowSpec windowSpec = Window.partitionBy(window(col("created_at"), windowDuration, slidingInterval))
            .orderBy("created_at");
    Dataset<Row> windowedData = data
            .withColumn("avg_bid", avg(col("bid")).over(windowSpec))
            .withColumn("avg_ask", avg(col("ask")).over(windowSpec));

    // Compute the lagged average bid/ask prices for comparison
    Column prevAvgBid = lag(col("avg_bid"), 1).over(windowSpec);
    Column prevAvgAsk = lag(col("avg_ask"), 1).over(windowSpec);

    windowedData = windowedData.withColumn("prev_avg_bid", prevAvgBid)
            .withColumn("prev_avg_ask", prevAvgAsk);

    
    Dataset<Row> avgPrices = data
            .withWatermark("created_at", "1 minute")
            .groupBy(window(col("created_at"), "1 minute"))
            .agg(avg("bid").alias("avg_bid"), avg("ask").alias("avg_ask"))
            .withColumn("next_minute", col("window.end"));
            .withColumn("bid_percentage_change", (col("avg_bid").minus(lag("avg_bid", 1).over(Window.orderBy("window")))).divide(lag("avg_bid", 1).over(Window.orderBy("window"))).multiply(100))
            .withColumn("ask_percentage_change", (col("avg_ask").minus(lag("avg_ask", 1).over(Window.orderBy("window")))).divide(lag("avg_ask", 1).over(Window.orderBy("window"))).multiply(100));

        // Start the streaming query to write the results to the console
        StreamingQuery query = avgPrices
            .writeStream()
            .outputMode("complete")
            .format("console")
            .trigger(Trigger.ProcessingTime("1 minute"))
            .start();

    query.awaitTermination();`

Errors displayed

When I add the columns bid_percentage_change and ask_percentage_change I go the error: org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets;. If I remove those two columns, the program runs without a problem.

Additional information

I ve consulted additional resources before posting my question but none have thrown a solution. According to one answer from a similar question, Spark Struct Streaming does not handle multiple aggregation operations. Tried implementing an apporach using flatMapGroupWithState with no result. Here is a list of resources I already consulted.

  1. ChatGPT
  2. Similar SOF questions and answers Spark - Non-time-based windows are not supported on streaming DataFrames/Datasets;.
  3. Official Spark Documentation
问题回答

暂无回答




相关问题
Spring Properties File

Hi have this j2ee web application developed using spring framework. I have a problem with rendering mnessages in nihongo characters from the properties file. I tried converting the file to ascii using ...

Logging a global ID in multiple components

I have a system which contains multiple applications connected together using JMS and Spring Integration. Messages get sent along a chain of applications. [App A] -> [App B] -> [App C] We set a ...

Java Library Size

If I m given two Java Libraries in Jar format, 1 having no bells and whistles, and the other having lots of them that will mostly go unused.... my question is: How will the larger, mostly unused ...

How to get the Array Class for a given Class in Java?

I have a Class variable that holds a certain type and I need to get a variable that holds the corresponding array class. The best I could come up with is this: Class arrayOfFooClass = java.lang....

SQLite , Derby vs file system

I m working on a Java desktop application that reads and writes from/to different files. I think a better solution would be to replace the file system by a SQLite database. How hard is it to migrate ...

热门标签