English 中文(简体)
缩略语
原标题:Join two DFs on column name as key

我有<代码>df.summary(>>的DF。

+-------+-----------+-----------+
|summary|    col1   |    col2   |
+-------+-----------+-----------+
|  count|       1000|       1000|
|   mean|  45.678923|  67.890123|
| stddev|   7.123456|   9.234567|
|    min|      32.45|      54.23|
|    25%|      40.12|      63.45|
|    50%|      45.67|      67.89|
|    75%|      50.23|      72.34|
|    max|      58.90|      87.65|
+-------+-----------+-----------+

虽然如此,我还是有一个具有某些习俗衡量标准的二级国防军:

distinct_col1, complete_col1, distinct_col2, complete_col2, distinct_col3, complete_col3, max_col3, min_col3
989, 1000, 1000, 1000, 540, 1000, ‘2023-10-01’, ‘2021-01-01’

我怀疑,我如何能够加入两个国防军,以取得以下产出:

+-------+-----------+-----------+-----------+
|summary|    col1   |    col2   |   col3    |
+-------+-----------+-----------+-----------+
|  count|       1000|       1000|       1000|
| dstnct|        989|       1000|        540|
| complt|       1000|       1000|       1000|
|   mean|  45.678923|  67.890123|           |
| stddev|   7.123456|   9.234567|           |
|    min|      32.45|      54.23|2021-01-01 | 
|    25%|      40.12|      63.45|           |
|    50%|      45.67|      67.89|           |
|    75%|      50.23|      72.34|           |
|    max|      58.90|      87.65|2023-10-01 |
+-------+-----------+-----------+-----------+

I’ve tried some queries using spark.sql with DESCRIBE but with no success.

问题回答

一种可能性是:unpivot > 。 请注意,当你不计算时,所有价值栏目都必须相同(因为它们将全部放在同一栏目中),因此,我在行文中有一个试组,将日期和编号从<条码>第3栏<<>>改为指示。

# additional imports you may want to have
import pyspark.sql.functions as F
from pyspark.sql.utils import AnalysisException
from functools import reduce

df_custom_list = []

for col_name in [ col1 , col2 , col3 ]:
    selected_cols = [col for col in df_custom.columns if col_name in col]
    summary_variables = [full_col_name.split(f _{col_name} )[0] for full_col_name in selected_cols]
    
    try:
        df_custom_list.append(
            df_custom.select(
                selected_cols
            ).unpivot(
                ids=[], values=selected_cols, variableColumnName="variable", valueColumnName=col_name
            ).withColumn(
                 summary , F.split( variable , _ ).getItem(0)
            ).select(
                 summary ,
                col_name
            )
        )

    ## AnalysisException when a column has mixed types
    except AnalysisException:
        df_custom_list.append(
            df_custom.select(
                [F.col(c).cast("string") for c in selected_cols]
            ).unpivot(
                ids=[], values=selected_cols, variableColumnName="variable", valueColumnName=col_name
            ).withColumn(
                 summary , F.split( variable , _ ).getItem(0)
            ).select(
                 summary ,
                col_name
            )
        )

def join_reduce(left,right):
    return left.join(right,on= summary ,how= outer )

df_custom_long = reduce(join_reduce, df_custom_list)

这为我们提供了以下“长篇”的二级参考书:

+--------+------+------+----------+
| summary|  col1|  col2|      col3|
+--------+------+------+----------+
|complete|1000.0|1000.0|    1000.0|
|distinct| 989.0|1000.0|     540.0|
|     max|  NULL|  NULL|2023-10-01|
|     min|  NULL|  NULL|2021-01-01|
+--------+------+------+----------+

然后,我们就能够加入总结数据框架:

df_summary.join(
    df_custom_long, on=[ summary , col1 , col2 , col3 ], how= outer 
)

+--------+--------------------+--------------------+--------------------+
| summary|                col1|                col2|                col3|
+--------+--------------------+--------------------+--------------------+
|     25%|  0.2316717013834434| 0.24697879907199982| 0.25871164215544096|
|     50%| 0.49836777273947974|  0.5148173539296902| 0.49286458444744663|
|     75%|  0.7527166395576412|  0.7457337827312321|  0.7387231507265664|
|complete|              1000.0|              1000.0|              1000.0|
|   count|                1000|                1000|                1000|
|distinct|               989.0|              1000.0|               540.0|
|     max|                NULL|                NULL|          2023-10-01|
|     min|                NULL|                NULL|          2021-01-01|
|     max|  0.9997176732861306|  0.9994137257706666|  0.9978208556819782|
|    mean|  0.4966566190249577|  0.5027365256998564|  0.4956940165311255|
|     min|0.004632023004602859|0.003218263604278...|1.163475536614111...|
|  stddev| 0.29447076332379996| 0.28861846745815306|  0.2886593597926857|
+--------+--------------------+--------------------+--------------------+

视您的使用情况,您可能需要重新界定其图谱中的类型,因为数据类型和投放日期和浮标有好有好有坏,因此,您最后可以把所有栏目作为座标。





相关问题
how to use phoenix5.0 with spark 3.0 preview

case "phoenix" =>{ outputStream.foreachRDD(rdd=>{ val spark=SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate() val ds=spark.createDataFrame(rdd,...

同一S3bucket使用多位证书

I'm using 2.1.1 with Hadoop 2.7.3 and I'm use data from different S3 sites in one管线。

热门标签