English 中文(简体)
计算Sum, 具备Schala数据条件
原标题:Calculating Rolling Sum with Condition in Scala DataFrame

I have a DataFrame in Scala that looks like this:

+---+----------+------------------+-------+-----------+
| id|      date|            amount|ndxDays|ndxDaysDate|
+---+----------+------------------+-------+-----------+
|  1|2023-09-13| 2.976479132636267|      2| 2023-09-11|
|  1|2023-09-14|20.956374354696695|      2| 2023-09-12|
|  1|2023-09-15| 98.13910624090528|      2| 2023-09-13|
|  1|2023-09-16|  84.8245242496161|      2| 2023-09-14|
|  1|2023-09-17|  56.3138752292941|      2| 2023-09-15|

我需要计算每个行的滚动数额,如果这笔钱应当回溯,并且只从目前行走的行数少于行日数的行中加上“一站”。 换言之,在尊重先天条件的同时,我想计算到目前为止的累计“数额”。

问题回答

这样做的一个可能途径是:

import sys
import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext
from pyspark.sql.window import Window
import pandas as pd
import itertools
from pyspark.sql.types import *

sc = SparkContext( local )
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.streaming.backpressure.enabled", True)
#schemaPeople = schemaPeople.withColumnRenamed()

data1 = [
    [1, "2023-09-13", 2.976479132636267, 2, "2023-09-11"],
    [1, "2023-09-14", 20.956374354696695, 2, "2023-09-12"],
    [1, "2023-09-15", 98.13910624090528, 2, "2023-09-13"],
    [1, "2023-09-16", 84.8245242496161, 2, "2023-09-14"],
    [1, "2023-09-17", 56.3138752292941, 2, "2023-09-15"],

]


df1Columns = ["id", "date", "amount", "ndxDays", "ndxDaysDate"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)

df1.show(n=10, truncate=False)

window_spec =Window.partitionBy( id ).orderBy(F.col("date"), F.col("ndxDaysDate")).rowsBetween(Window.unboundedPreceding, 0)

spark_df = df1.withColumn("cum_sum", F.sum(F.col("amount")).over(window_spec))


print("cum sum dataframe")
spark_df.show(n=100, truncate=False)

产出:

+---+----------+------------------+-------+-----------+
|id |date      |amount            |ndxDays|ndxDaysDate|
+---+----------+------------------+-------+-----------+
|1  |2023-09-13|2.976479132636267 |2      |2023-09-11 |
|1  |2023-09-14|20.956374354696695|2      |2023-09-12 |
|1  |2023-09-15|98.13910624090528 |2      |2023-09-13 |
|1  |2023-09-16|84.8245242496161  |2      |2023-09-14 |
|1  |2023-09-17|56.3138752292941  |2      |2023-09-15 |
+---+----------+------------------+-------+-----------+

cum sum dataframe
+---+----------+------------------+-------+-----------+------------------+
|id |date      |amount            |ndxDays|ndxDaysDate|cum_sum           |
+---+----------+------------------+-------+-----------+------------------+
|1  |2023-09-13|2.976479132636267 |2      |2023-09-11 |2.976479132636267 |
|1  |2023-09-14|20.956374354696695|2      |2023-09-12 |23.932853487332963|
|1  |2023-09-15|98.13910624090528 |2      |2023-09-13 |122.07195972823824|
|1  |2023-09-16|84.8245242496161  |2      |2023-09-14 |206.89648397785436|
|1  |2023-09-17|56.3138752292941  |2      |2023-09-15 |263.2103592071485 |
+---+----------+------------------+-------+-----------+------------------+




相关问题
how to use phoenix5.0 with spark 3.0 preview

case "phoenix" =>{ outputStream.foreachRDD(rdd=>{ val spark=SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate() val ds=spark.createDataFrame(rdd,...

同一S3bucket使用多位证书

I'm using 2.1.1 with Hadoop 2.7.3 and I'm use data from different S3 sites in one管线。

热门标签