[fix](nereids) lost exchange before global limit merge node sometimes (#19396)
should add exchange node between global and local limit
This commit is contained in:
@ -1602,13 +1602,21 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
}
|
||||
|
||||
PlanNode child = inputFragment.getPlanRoot();
|
||||
|
||||
// This case means GlobalLimit's child isn't gatherNode, which suggests the child is UNPARTITIONED
|
||||
// When there is valid offset, exchangeNode should be added because other node don't support offset
|
||||
if (physicalLimit.isGlobal() && physicalLimit.hasValidOffset()
|
||||
&& !(child instanceof ExchangeNode)) {
|
||||
inputFragment = createParentFragment(inputFragment, DataPartition.UNPARTITIONED, context);
|
||||
child = inputFragment.getPlanRoot();
|
||||
if (physicalLimit.isGlobal()) {
|
||||
if (child instanceof ExchangeNode) {
|
||||
DataPartition outputPartition = DataPartition.UNPARTITIONED;
|
||||
ExchangeNode exchangeNode = (ExchangeNode) inputFragment.getPlanRoot();
|
||||
inputFragment.setOutputPartition(outputPartition);
|
||||
inputFragment.setPlanRoot(exchangeNode.getChild(0));
|
||||
inputFragment.setDestination(exchangeNode);
|
||||
inputFragment = new PlanFragment(context.nextFragmentId(), exchangeNode, DataPartition.UNPARTITIONED);
|
||||
context.addPlanFragment(inputFragment);
|
||||
} else if (physicalLimit.hasValidOffset()) {
|
||||
// This case means GlobalLimit's child isn't gatherNode, which suggests the child is UNPARTITIONED
|
||||
// When there is valid offset, exchangeNode should be added because other node don't support offset
|
||||
inputFragment = createParentFragment(inputFragment, DataPartition.UNPARTITIONED, context);
|
||||
child = inputFragment.getPlanRoot();
|
||||
}
|
||||
}
|
||||
child.setOffset(physicalLimit.getOffset());
|
||||
child.setLimit(physicalLimit.getLimit());
|
||||
|
||||
@ -124,6 +124,10 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
|
||||
public PhysicalProperties visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, PlanContext context) {
|
||||
Preconditions.checkState(childrenOutputProperties.size() == 1);
|
||||
PhysicalProperties childOutputProperty = childrenOutputProperties.get(0);
|
||||
if (limit.getPhase().isLocal()) {
|
||||
return new PhysicalProperties(childOutputProperty.getDistributionSpec(),
|
||||
childOutputProperty.getOrderSpec());
|
||||
}
|
||||
return new PhysicalProperties(DistributionSpecGather.INSTANCE, childOutputProperty.getOrderSpec());
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user