[feature](pipelineX) add local_shuffle in sort partition sort analytic node (#28265)
This commit is contained in:
@ -1800,6 +1800,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
}
|
||||
SortNode sortNode = (SortNode) inputFragment.getPlanRoot().getChild(0);
|
||||
((ExchangeNode) inputFragment.getPlanRoot()).setMergeInfo(sortNode.getSortInfo());
|
||||
sortNode.setMergeByExchange();
|
||||
}
|
||||
return inputFragment;
|
||||
}
|
||||
@ -1854,6 +1855,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
exchangeNode.setMergeInfo(((SortNode) exchangeNode.getChild(0)).getSortInfo());
|
||||
exchangeNode.setLimit(topN.getLimit());
|
||||
exchangeNode.setOffset(topN.getOffset());
|
||||
((SortNode) exchangeNode.getChild(0)).setMergeByExchange();
|
||||
}
|
||||
updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), topN);
|
||||
return inputFragment;
|
||||
@ -2020,6 +2022,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
// TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary
|
||||
if (findOlapScanNodesByPassExchangeAndJoinNode(inputPlanFragment.getPlanRoot())) {
|
||||
inputPlanFragment.setHasColocatePlanNode(true);
|
||||
analyticEvalNode.setColocate(true);
|
||||
}
|
||||
return inputPlanFragment;
|
||||
}
|
||||
|
||||
@ -75,6 +75,8 @@ public class AnalyticEvalNode extends PlanNode {
|
||||
private final Expr orderByEq;
|
||||
private final TupleDescriptor bufferedTupleDesc;
|
||||
|
||||
private boolean isColocate = false;
|
||||
|
||||
public AnalyticEvalNode(
|
||||
PlanNodeId id, PlanNode input, List<Expr> analyticFnCalls,
|
||||
List<Expr> partitionExprs, List<OrderByElement> orderByElements,
|
||||
@ -181,6 +183,10 @@ public class AnalyticEvalNode extends PlanNode {
|
||||
cardinality = getChild(0).cardinality;
|
||||
}
|
||||
|
||||
public void setColocate(boolean colocate) {
|
||||
this.isColocate = colocate;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String debugString() {
|
||||
List<String> orderByElementStrs = Lists.newArrayList();
|
||||
@ -215,7 +221,7 @@ public class AnalyticEvalNode extends PlanNode {
|
||||
msg.analytic_node.setPartitionExprs(Expr.treesToThrift(substitutedPartitionExprs));
|
||||
msg.analytic_node.setOrderByExprs(Expr.treesToThrift(OrderByElement.getOrderByExprs(orderByElements)));
|
||||
msg.analytic_node.setAnalyticFunctions(Expr.treesToThrift(analyticFnCalls));
|
||||
|
||||
msg.analytic_node.setIsColocate(isColocate);
|
||||
if (analyticWindow == null) {
|
||||
if (!orderByElements.isEmpty()) {
|
||||
msg.analytic_node.setWindow(AnalyticWindow.DEFAULT_WINDOW.toThrift());
|
||||
|
||||
@ -1308,6 +1308,7 @@ public class DistributedPlanner {
|
||||
exchNode.setLimit(limit);
|
||||
}
|
||||
exchNode.setMergeInfo(node.getSortInfo());
|
||||
node.setMergeByExchange();
|
||||
exchNode.setOffset(offset);
|
||||
|
||||
// Child nodes should not process the offset. If there is a limit,
|
||||
|
||||
@ -64,7 +64,11 @@ public class SortNode extends PlanNode {
|
||||
private boolean useTopnOpt;
|
||||
private boolean useTwoPhaseReadOpt;
|
||||
|
||||
private boolean isDefaultLimit;
|
||||
// If mergeByexchange is set to true, the sort information is pushed to the
|
||||
// exchange node, and the sort node is used for the ORDER BY .
|
||||
private boolean mergeByexchange = false;
|
||||
|
||||
private boolean isDefaultLimit;
|
||||
// if true, the output of this node feeds an AnalyticNode
|
||||
private boolean isAnalyticSort;
|
||||
private DataPartition inputPartition;
|
||||
@ -134,6 +138,10 @@ public class SortNode extends PlanNode {
|
||||
return info;
|
||||
}
|
||||
|
||||
public void setMergeByExchange() {
|
||||
this.mergeByexchange = true;
|
||||
}
|
||||
|
||||
public boolean getUseTopnOpt() {
|
||||
return useTopnOpt;
|
||||
}
|
||||
@ -309,6 +317,7 @@ public class SortNode extends PlanNode {
|
||||
msg.sort_node = sortNode;
|
||||
msg.sort_node.setOffset(offset);
|
||||
msg.sort_node.setUseTopnOpt(useTopnOpt);
|
||||
msg.sort_node.setMergeByExchange(this.mergeByexchange);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user