English 中文(简体)
Pyspark: 动态地点燃分层表
原标题:Pyspark: Flattening of Hierarchy table dynamically

我有下面的坑鼠数据框架:

HierarchyNode       ParentNode
 U1.1               U1
 U1.1.1             U1.1
 24.0.1             24
 24.1.1             24.0.1
 24.1.1.1           24.1.1

这更像一个等级表。我想平整它,预期应该公布如下:

ParentWBSElementExternalID  LN-1            LN-2              LN-3
 U1                         NULL            NULL              NULL
 U1                         U1.1            NULL              NULL
 U1                         U1.1            U1.1.1            NULL
 24                         NULL            NULL              NULL
 24                         24.0.1          NULL              NULL
 24                         24.0.1          24.1.1            NULL
 24                         24.0.1          24.1.1        24.1.1.1

使用 recursive CTE 是一个解决方案,但不幸的是,在数据砖中,我们不能有循环的 CTE。同样,你可以看到,等级论的水平,即儿童节点可以高达2级、3级等。因此,它是动态的。

我跟踪了这个。但没有得到任何适当的答案。

任何使用PySpark的帮助都会受到欢迎。

最佳回答

是的, 您可以使用循环 CTE 来做到这一点。 您也可以正确指出没有数据错误 SQL 递归 CTE 。 但是, 在 < a href=\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\

我会用它作为解决你问题的基地 并且用火柴和SQL

首先将您的数据框架保存为临时视图 。

df.createOrReplaceTempView( test1 )

现在通过提及上述文章而出现递归的 CTE 部分。

i = 1
df = spark.sql("""
        SELECT ParentNode,
        HierarchyNode,
        1 AS level
        FROM test1
        """)
# this view is our  CTE  that we reference with each pass
df.createOrReplaceTempView( recursion_df )

print("entering loop")
while True:
    # select data for this recursion level
    new_df = spark.sql("""
        SELECT
            r.ParentNode,
            t.HierarchyNode,
            (level + 1) AS level
            FROM test1 t
            INNER JOIN recursion_df r
            ON r.HierarchyNode = t.ParentNode
        """)

    # this view is our  CTE  that we reference with each pass
    new_df.createOrReplaceTempView( recursion_df )
    # add the results to the main output dataframe
    df = df.union(new_df)
    # if there are no results at this recursion level then break
    print(f"{i} : {new_df.count()}")
    if (new_df.count() == 0 or i == 5) :
        df.createOrReplaceTempView("final_df")
        break
    else:
        i += 1

But can check the view final_df to see what it contains. enter image description here

它还有另外一个被称为 " 循环水平 " 的COL水平,即循环水平。

现在在结果上创建一个临时视图, 以获取根父节点 。

%sql
CREATE OR REPLACE TEMPORARY VIEW TEST1 AS
SELECT ParentNode FROM final_df WHERE level = 1 and ParentNode NOT IN (SELECT HierarchyNode FROM final_df);

既然您已经知道这个级别不能超过3级, 您就可以创建这样的工会声明 。

%sql
SELECT c.ParentNode, r.HierarchyNode AS LN_1, NULL as LN_2, NULL as LN_3 FROM TEST1 c LEFT JOIN final_df r ON r.ParentNode = c.ParentNode AND r.level = 0
UNION
SELECT c.ParentNode, r.HierarchyNode AS LN_1, NULL as LN_2, NULL as LN_3 FROM TEST1 c LEFT JOIN final_df r ON r.ParentNode = c.ParentNode AND r.level = 1
UNION
SELECT c.ParentNode, r.HierarchyNode AS LN_1, s.HierarchyNode as LN_2, NULL as LN_3 FROM TEST1 c 
LEFT JOIN final_df r ON r.ParentNode = c.ParentNode AND r.level = 1
LEFT JOIN final_df s ON s.ParentNode = c.ParentNode AND s.level = 2 
UNION
SELECT c.ParentNode, r.HierarchyNode AS LN_1, s.HierarchyNode as LN_2, t.HierarchyNode as LN_3 FROM TEST1 c 
LEFT JOIN final_df r ON r.ParentNode = c.ParentNode AND r.level = 1
LEFT JOIN final_df s ON s.ParentNode = c.ParentNode AND s.level = 2
LEFT JOIN final_df t ON t.ParentNode = c.ParentNode AND t.level = 3 
; 

你的结果是

" 强 " 强 " 和 " 强 " 和 " 强 " 和 " 强 " 和 " 强 " 和 " 强 " 和 " 强 " 和 " 强 " 和 " 强 " 和 " 强 " 和 " 强 " 和 " 强 " 和 " 强 " 和 " 强 " 和 " 强 " 和 " 强 " 和 " 强 " 和 " 强 " 和 " 强 "

The above answer assumes that the level recursion is 3. However, let s say we do not know what the level is. First execute the recursive CTE pyspark function. You need to cap the recursive lvl at some max. It can really be anything (say 100) as the loop will break when either there is no more elements or if the max recursion has been reached.

recursion_lvl = 5
i = 1
df = spark.sql("""
        SELECT ParentNode,
        HierarchyNode,
        1 AS level
        FROM test1
        """)
# this view is our  CTE  that we reference with each pass
df.createOrReplaceTempView( recursion_df )

print("entering loop")
while True:
    # select data for this recursion level
    new_df = spark.sql("""
        SELECT
            r.ParentNode,
            t.HierarchyNode,
            (level + 1) AS level
            FROM test1 t
            INNER JOIN recursion_df r
            ON r.HierarchyNode = t.ParentNode
        """)

    # this view is our  CTE  that we reference with each pass
    new_df.createOrReplaceTempView( recursion_df )
    # add the results to the main output dataframe
    df = df.union(new_df)
    # if there are no results at this recursion level then break
    print(f"{i} : {new_df.count()}")
    if (new_df.count() == 0 or i == recursion_lvl) :
        df.createOrReplaceTempView("final_df")
        break
    else:
        i += 1

现在获取唯一底根( 如上所示)

%sql
CREATE OR REPLACE TEMPORARY VIEW TEST1 AS
SELECT ParentNode FROM final_df WHERE level = 1 and ParentNode NOT IN (SELECT HierarchyNode FROM final_df);

SELECT * FROM TEST1;

现在获取您数据中的最大递归水平

lvldf = spark.sql("SELECT MAX(level) AS MAXLVL from final_df;")
lvlmax = lvldf.head().asDict()[ MAXLVL ]
print(lvlmax)

Finally is the complicated piece of code which does the looping. -Create an empty final table. -Insert rows with the base parent root and nulls for remaining columns -Loop and create insert statements and execute them on the final table.

   unionsql = "SELECT c.ParentNode,"
    col = ""
    for i in range(lvlmax):
        col = col + f"NULL AS LN_{i+1}"
        if (i+1) < lvlmax:
            col = col +  , 
    unionsql = "SELECT c.ParentNode," + col +   FROM TEST1 c LEFT JOIN final_df r ON r.ParentNode = c.ParentNode AND r.level = 0 
    
    print(unionsql)
    
    createsql = "CREATE OR REPLACE TABLE final_table (HierarchyNode STRING, "
    for i in range(lvlmax):
        createsql = createsql + f"LN_{i+1} STRING"
        if (i+1) < lvlmax:
            createsql = createsql + ", "
    createsql = createsql + ");"
    
    print(createsql)
    spark.sql(createsql)   # Execute the create table statement
    spark.sql("INSERT INTO final_table " + unionsql)  # INSERT THE ROOT parent with nulls for remaining cols
    
    #Check the max
    lvldf = spark.sql("SELECT MAX(level) AS MAXLVL from final_df;")
    lvlmax = lvldf.head().asDict()[ MAXLVL ]
    print(lvlmax)
    
    sql = "INSERT INTO final_table SELECT c.ParentNode, "
    jointmp = ""
    for i in range(lvlmax):
        for j in range(i+1):
            sql = sql + f" a{j+1}.HierarchyNode AS LN_{j+1}"
            if (j+1) < (lvlmax):
                sql = sql +  , 
        for k in range(i+1, lvlmax):
            sql = sql + f" NULL as LN_{k+1}"
            if (k+1) < (lvlmax):
                sql = sql +  , 
        jointmp = jointmp + f" LEFT JOIN final_df a{i+1} ON a{i+1}.ParentNode = c.ParentNode AND a{i+1}.level = {i+1}"
        sql = sql + f" FROM TEST1 c " + jointmp
        print(sql)
        spark.sql(sql)
        sql = "INSERT INTO final_table SELECT c.ParentNode, "

最后的表格将包含复制件。 所以要获得您需要的, 只需选择不同的, 像这样 。

%sql
Select DISTINCT * FROM final_table;
问题回答

暂无回答




相关问题
Escaping curly braces in python string format

I have SQL query which constructs the JSON in the SELECT clause. After reading the query in JSON, string format is used to substitute the values. Because of the curly braces, string format(.format()) ...

Parse a CSV File and Rename the columns

I have a csv file which has spaces in the column names in the header record. The source system is unable to rename the fields before sending - so, we have to handle this at our end before ingestion. ...

热门标签