[fix](nereids) fix multi window projection issue temporarily (#24912)
Current multi-window plan generation has problem on the project sequence, for example:
+--LogicalWindow ( windowExpressions=[avg(sum_sales#115) WindowSpec(...) AS `avg_monthly_sales`#116, rank() WindowSpec(...) AS `rn`#117], ...)
and correspond physical plan is:
+--PhysicalWindow[6572]@16 ( windowFrameGroup=(Funcs=[avg(sum_sales#115) WindowSpec(...) AS `avg_monthly_sales`#116], ... )
+--PhysicalWindow[6568]@29 ( windowFrameGroup=(Funcs=[rank() WindowSpec(...) AS `rn`#117], ...] )
If the final plan is generated as following:
MultiCastDataSinks
STREAM DATA SINK
EXCHANGE ID: 20
HASH_PARTITIONED: rn[#208], i_brand[#202], cc_name[#203], i_category[#201]
Before we eventually resolve the multi-window issue, we add a projection as following and force a mapping but this will not cover all potential problems.
MultiCastDataSinks
STREAM DATA SINK
EXCHANGE ID: 20
HASH_PARTITIONED: rn[#219], i_brand[#213], cc_name[#214], i_category[#212]
PROJECTIONS: i_category[#184], i_brand[#185], cc_name[#186], d_year[#187], d_moy[#188], sum_sales[#189], avg_monthly_sales[#191], rn[#190]
PROJECTION TUPLE: 20
This commit is contained in:
@ -287,6 +287,23 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
keys.addAll(validOutputIds);
|
||||
validOutputIds = keys;
|
||||
}
|
||||
if (inputFragment instanceof MultiCastPlanFragment) {
|
||||
// TODO: remove this logic when we split to multi-window in logical window to physical window conversion
|
||||
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
|
||||
DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
|
||||
multiCastDataSink.getDataStreamSinks().size() - 1);
|
||||
if (!(distribute.child() instanceof PhysicalProject)) {
|
||||
List<Expr> projectionExprs = new ArrayList<>();
|
||||
PhysicalCTEConsumer consumer = getCTEConsumerChild(distribute);
|
||||
Preconditions.checkState(consumer != null, "consumer not found");
|
||||
for (Slot slot : distribute.getOutput()) {
|
||||
projectionExprs.add(ExpressionTranslator.translate(consumer.getProducerSlot(slot), context));
|
||||
}
|
||||
TupleDescriptor projectionTuple = generateTupleDesc(distribute.getOutput(), null, context);
|
||||
dataStreamSink.setProjections(projectionExprs);
|
||||
dataStreamSink.setOutputTupleDesc(projectionTuple);
|
||||
}
|
||||
}
|
||||
DataPartition dataPartition = toDataPartition(distribute.getDistributionSpec(), validOutputIds, context);
|
||||
PlanFragment parentFragment = new PlanFragment(context.nextFragmentId(), exchangeNode, dataPartition);
|
||||
exchangeNode.setNumInstances(inputFragment.getPlanRoot().getNumInstances());
|
||||
@ -2332,4 +2349,16 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
return dataType instanceof ArrayType || dataType instanceof MapType || dataType instanceof JsonType
|
||||
|| dataType instanceof StructType;
|
||||
}
|
||||
|
||||
private PhysicalCTEConsumer getCTEConsumerChild(PhysicalPlan root) {
|
||||
if (root == null) {
|
||||
return null;
|
||||
} else if (root instanceof PhysicalCTEConsumer) {
|
||||
return (PhysicalCTEConsumer) root;
|
||||
} else if (root.children().size() != 1) {
|
||||
return null;
|
||||
} else {
|
||||
return getCTEConsumerChild((PhysicalPlan) root.child(0));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user