组合后选择的弹点会模糊不清,但单列只应出现在一个数据框中
原标题:Pyspark select after join raises ambiguity but column should only be present in one of the dataframes
I m doing a join on two dataframes that come from the same original dataframe. These then suffer some aggregations and the columns selected are not equal except for the ones that are used to join.
So we can say in df1 we get columns [a,b,max(c),min(d)] and in df2 we select [a,b,e, avg(f)] and I want to join them so that my final df has [a,b,max(c), min(d),e, avg(f)] so something like
df_final = df1.join(df2, (df1.a == df2.a) & (df1.b == df2.b))
.select( a , b , max(c) , min(d) , e , avg(f) )
should do the trick.
In my specific case the join conditions also have an aggregate, but it s not related.
Here s the real example with the warning:
df_3 =dfs_dict[ trigger ]
.groupBy(
F.col( opportunity_id ),
F.col( action_status ).alias( latest_urgency ))
.agg({ user_id : max , write_date : min })
.withColumnRenamed( min(write_date) , latest_urgency_date )
.withColumnRenamed( max(user_id) , user_id )
window = Window.partitionBy("opportunity_id")
.orderBy(F.col("write_date").desc())
df_4 = dfs_dict[ trigger ].select(
opportunity_id ,
write_date ,
action_status ,
opportunity_type ,
user_id ,
F.row_number().over(window).alias( row_num ))
.where(((F.col("action_status").isNotNull()) & (F.col("action_status") != )))
.where(F.col( row_num ) == 1)
And the output for the columns:
df_3.columns
# [ opportunity_id , latest_urgency , latest_urgency_date , user_id ]
df_4.columns
# [ opportunity_id , write_date , action_status , opportunity_type , user_id , row_num ]
And the join:
df_5 = df_3.join(df_4,
(df_3.opportunity_id == df_4.opportunity_id) & (
df_3.latest_urgency == df_4.action_status)
).select(df_3.opportunity_id,
df_3.latest_urgency,
df_4.opportunity_type,
df_3.user_id,
df_3.latest_urgency_date)
This will complain that column opportunity_type is ambiguous:
AnalysisException: Column opportunity_type#1559 are ambiguous. It s probably because you joined several Datasets together, and some of these Datasets are the same. This column points to one of the Datasets but Spark is unable to figure out which one. Please alias the Datasets with different names via Dataset.as before joining them, and specify the column using qualified name, e.g. df.as("a").join(df.as("b"), $"a.id" > $"b.id"). You can also set spark.sql.analyzer.failAmbiguousSelfJoin to false to disable this check.
I d like to understand why. Even if I do a select in df_3 before the aggregation the error remains. I ve checked the query plan and since these are from the same original df, they both scan the offending column, but in the end in only one it is selected...
If i remove the df_4 before opportunity_type and just call it opportunity_type , the error disappears, which kind of makes me more confused. (I had also tried using alias for the dfs but with the same error)
Anyway, I d like to understand why does this happen? And what should be the best way to avoid it/do it correctly.
Edit: These are the columns from the original DF, so no repetitions
(1) Scan ExistingRDD
Output [31]: [opportunity_id#2317, opportunity_type#2318,
account_manager_role#2319, action_reason#2320, action_status#2321,
active_trigger#2322, churn_probability#2323,
churn_threshold_exceeded#2324, company_id#2325, customer_id#2326,
datetime#2327, global_product#2328, has_lost_order_date#2329,
has_taken_action#2330, last_2_orders_margin_percentage#2331,
last_feedback_time#2332, last_order_delivery_date#2333,
last_order_margin_percentage#2334, last_order_placement_date#2335,
last_ordered_by_customer#2336, last_ordered_in_company#2337,
lost_order_date#2338, margin#2339, margin_benchmark_percentage#2340,
material_id#2341, order_cycle_status#2342, reminder#2343,
reminder_date#2344, sales_person_id#2345, user_id#2346,
write_date#2347]
问题回答
Please check columns in your input dataframe. dfs_dict[ trigger ]
It could be that your input dataframe having duplicate column names.