[Fix](Nereids) fix pipelineX distribute expr list with child output expr ids (#29621)

This commit is contained in:
Gabriel
2024-01-08 10:46:27 +08:00
committed by GitHub
parent e556536de1
commit 59d7f64360
15 changed files with 212 additions and 15 deletions

View File

@ -270,6 +270,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
public PlanFragment visitPhysicalDistribute(PhysicalDistribute<? extends Plan> distribute,
PlanTranslatorContext context) {
PlanFragment inputFragment = distribute.child().accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(distribute.child());
// TODO: why need set streaming here? should remove this.
if (inputFragment.getPlanRoot() instanceof AggregationNode
&& distribute.child() instanceof PhysicalHashAggregate
@ -315,6 +316,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
}
DataPartition dataPartition = toDataPartition(distribute.getDistributionSpec(), validOutputIds, context);
exchangeNode.setPartitionType(dataPartition.getType());
exchangeNode.setDistributeExprLists(distributeExprLists);
PlanFragment parentFragment = new PlanFragment(context.nextFragmentId(), exchangeNode, dataPartition);
if (distribute.getDistributionSpec() instanceof DistributionSpecGather) {
// gather to one instance
@ -807,6 +809,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
PlanTranslatorContext context) {
PlanFragment inputPlanFragment = aggregate.child(0).accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(aggregate.child(0));
List<Expression> groupByExpressions = aggregate.getGroupByExpressions();
List<NamedExpression> outputExpressions = aggregate.getOutputExpressions();
@ -849,6 +852,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
aggFunOutputIds, isPartial, outputTupleDesc, outputTupleDesc, aggregate.getAggPhase().toExec());
AggregationNode aggregationNode = new AggregationNode(aggregate.translatePlanNodeId(),
inputPlanFragment.getPlanRoot(), aggInfo);
aggregationNode.setDistributeExprLists(distributeExprLists);
if (!aggregate.getAggMode().isFinalPhase) {
aggregationNode.unsetNeedsFinalize();
}
@ -941,10 +947,12 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
public PlanFragment visitPhysicalAssertNumRows(PhysicalAssertNumRows<? extends Plan> assertNumRows,
PlanTranslatorContext context) {
PlanFragment currentFragment = assertNumRows.child().accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(assertNumRows.child());
// create assertNode
AssertNumRowsNode assertNumRowsNode = new AssertNumRowsNode(assertNumRows.translatePlanNodeId(),
currentFragment.getPlanRoot(),
ExpressionTranslator.translateAssert(assertNumRows.getAssertNumRowsElement()));
assertNumRowsNode.setDistributeExprLists(distributeExprLists);
addPlanRoot(currentFragment, assertNumRowsNode, assertNumRows);
return currentFragment;
}
@ -1143,6 +1151,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
// NOTICE: We must visit from right to left, to ensure the last fragment is root fragment
PlanFragment rightFragment = hashJoin.child(1).accept(this, context);
PlanFragment leftFragment = hashJoin.child(0).accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(physicalHashJoin.left(), physicalHashJoin.right());
if (JoinUtils.shouldNestedLoopJoin(hashJoin)) {
throw new RuntimeException("Physical hash join could not execute without equal join condition.");
@ -1161,7 +1170,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
HashJoinNode hashJoinNode = new HashJoinNode(hashJoin.translatePlanNodeId(), leftPlanRoot,
rightPlanRoot, JoinType.toJoinOperator(joinType), execEqConjuncts, Lists.newArrayList(),
null, null, null, hashJoin.isMarkJoin());
hashJoinNode.setDistributeExprLists(distributeExprLists);
PlanFragment currentFragment = connectJoinNode(hashJoinNode, leftFragment, rightFragment, context, hashJoin);
if (joinType == JoinType.NULL_AWARE_LEFT_ANTI_JOIN) {
@ -1183,6 +1192,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
} else {
hashJoinNode.setDistributionMode(DistributionMode.PARTITIONED);
}
// Nereids does not care about output order of join,
// but BE need left child's output must be before right child's output.
// So we need to swap the output order of left and right child if necessary.
@ -1394,6 +1404,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
// PhysicalPlan plan, PlanVisitor visitor, Context context).
PlanFragment rightFragment = nestedLoopJoin.child(1).accept(this, context);
PlanFragment leftFragment = nestedLoopJoin.child(0).accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(nestedLoopJoin.child(0), nestedLoopJoin.child(1));
PlanNode leftFragmentPlanRoot = leftFragment.getPlanRoot();
PlanNode rightFragmentPlanRoot = rightFragment.getPlanRoot();
if (JoinUtils.shouldNestedLoopJoin(nestedLoopJoin)) {
@ -1407,6 +1418,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
NestedLoopJoinNode nestedLoopJoinNode = new NestedLoopJoinNode(nestedLoopJoin.translatePlanNodeId(),
leftFragmentPlanRoot, rightFragmentPlanRoot, tupleIds, JoinType.toJoinOperator(joinType),
null, null, null, nestedLoopJoin.isMarkJoin());
nestedLoopJoinNode.setDistributeExprLists(distributeExprLists);
if (nestedLoopJoin.getStats() != null) {
nestedLoopJoinNode.setCardinality((long) nestedLoopJoin.getStats().getRowCount());
}
@ -1573,8 +1585,10 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
public PlanFragment visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN,
PlanTranslatorContext context) {
PlanFragment inputFragment = partitionTopN.child(0).accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(partitionTopN.child(0));
PartitionSortNode partitionSortNode = translatePartitionSortNode(
partitionTopN, inputFragment.getPlanRoot(), context);
partitionSortNode.setDistributeExprLists(distributeExprLists);
addPlanRoot(inputFragment, partitionSortNode, partitionTopN);
// in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution
// we need turn of parallel scan to ensure to get correct result.
@ -1818,11 +1832,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
public PlanFragment visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sort,
PlanTranslatorContext context) {
PlanFragment inputFragment = sort.child(0).accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(sort.child(0));
// 2. According to the type of sort, generate physical plan
if (!sort.getSortPhase().isMerge()) {
// For localSort or Gather->Sort, we just need to add sortNode
SortNode sortNode = translateSortNode(sort, inputFragment.getPlanRoot(), context);
sortNode.setDistributeExprLists(distributeExprLists);
addPlanRoot(inputFragment, sortNode, sort);
} else {
// For mergeSort, we need to push sortInfo to exchangeNode
@ -1835,6 +1851,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
SortNode sortNode = (SortNode) inputFragment.getPlanRoot().getChild(0);
((ExchangeNode) inputFragment.getPlanRoot()).setMergeInfo(sortNode.getSortInfo());
sortNode.setMergeByExchange();
sortNode.setDistributeExprLists(distributeExprLists);
}
return inputFragment;
}
@ -1842,6 +1859,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
@Override
public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanTranslatorContext context) {
PlanFragment inputFragment = topN.child(0).accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(topN.child(0));
// 2. According to the type of sort, generate physical plan
if (!topN.getSortPhase().isMerge()) {
@ -1874,6 +1892,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
}
}
}
sortNode.setDistributeExprLists(distributeExprLists);
addPlanRoot(inputFragment, sortNode, topN);
} else {
// For mergeSort, we need to push sortInfo to exchangeNode
@ -1886,6 +1905,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
return inputFragment;
}
ExchangeNode exchangeNode = (ExchangeNode) inputFragment.getPlanRoot();
exchangeNode.setDistributeExprLists(distributeExprLists);
exchangeNode.setMergeInfo(((SortNode) exchangeNode.getChild(0)).getSortInfo());
exchangeNode.setLimit(topN.getLimit());
exchangeNode.setOffset(topN.getOffset());
@ -1918,6 +1938,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
@Override
public PlanFragment visitPhysicalRepeat(PhysicalRepeat<? extends Plan> repeat, PlanTranslatorContext context) {
PlanFragment inputPlanFragment = repeat.child(0).accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(repeat.child(0));
Set<VirtualSlotReference> sortedVirtualSlots = repeat.getSortedVirtualSlots();
TupleDescriptor virtualSlotsTuple =
@ -1965,6 +1986,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
RepeatNode repeatNode = new RepeatNode(repeat.translatePlanNodeId(),
inputPlanFragment.getPlanRoot(), groupingInfo, repeatSlotIdList,
allSlotId, repeat.computeVirtualSlotValues(sortedVirtualSlots));
repeatNode.setDistributeExprLists(distributeExprLists);
addPlanRoot(inputPlanFragment, repeatNode, repeat);
updateLegacyPlanIdToPhysicalPlan(inputPlanFragment.getPlanRoot(), repeat);
return inputPlanFragment;
@ -1974,6 +1996,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
public PlanFragment visitPhysicalWindow(PhysicalWindow<? extends Plan> physicalWindow,
PlanTranslatorContext context) {
PlanFragment inputPlanFragment = physicalWindow.child(0).accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(physicalWindow.child(0));
// 1. translate to old optimizer variable
// variable in Nereids
@ -2049,6 +2072,11 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
orderElementsIsNullableMatched,
bufferedTupleDesc
);
analyticEvalNode.setDistributeExprLists(distributeExprLists);
PlanNode root = inputPlanFragment.getPlanRoot();
if (root instanceof SortNode) {
((SortNode) root).setIsAnalyticSort(true);
}
inputPlanFragment.addPlanRoot(analyticEvalNode);
// in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution
@ -2057,6 +2085,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
if (findOlapScanNodesByPassExchangeAndJoinNode(inputPlanFragment.getPlanRoot())) {
inputPlanFragment.setHasColocatePlanNode(true);
analyticEvalNode.setColocate(true);
if (root instanceof SortNode) {
((SortNode) root).setColocate(true);
}
}
return inputPlanFragment;
}
@ -2447,4 +2478,31 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
}
return false;
}
private List<List<Expr>> getDistributeExprs(Plan ... children) {
List<List<Expr>> distributeExprLists = Lists.newArrayList();
for (Plan child : children) {
DistributionSpec spec = ((PhysicalPlan) child).getPhysicalProperties().getDistributionSpec();
distributeExprLists.add(getDistributeExpr(child.getOutputExprIds(), spec));
}
return distributeExprLists;
}
private List<Expr> getDistributeExpr(List<ExprId> childOutputIds, DistributionSpec spec) {
if (spec instanceof DistributionSpecHash) {
DistributionSpecHash distributionSpecHash = (DistributionSpecHash) spec;
List<Expr> partitionExprs = Lists.newArrayList();
for (int i = 0; i < distributionSpecHash.getEquivalenceExprIds().size(); i++) {
Set<ExprId> equivalenceExprId = distributionSpecHash.getEquivalenceExprIds().get(i);
for (ExprId exprId : equivalenceExprId) {
if (childOutputIds.contains(exprId)) {
partitionExprs.add(context.findSlotRef(exprId));
break;
}
}
}
return partitionExprs;
}
return Lists.newArrayList();
}
}

View File

@ -153,6 +153,8 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
protected List<Expr> projectList;
private List<List<Expr>> distributeExprLists = new ArrayList<>();
protected PlanNode(PlanNodeId id, ArrayList<TupleId> tupleIds, String planNodeName,
StatisticalType statisticalType) {
this.id = id;
@ -526,6 +528,12 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
expBuilder.append(detailPrefix).append("project output tuple id: ")
.append(outputTupleDesc.getId().asInt()).append("\n");
}
if (!CollectionUtils.isEmpty(distributeExprLists)) {
for (List<Expr> distributeExprList : distributeExprLists) {
expBuilder.append(detailPrefix).append("distribute expr lists: ")
.append(getExplainString(distributeExprList)).append("\n");
}
}
// Output Tuple Ids only when explain plan level is set to verbose
if (detailLevel.equals(TExplainLevel.VERBOSE)) {
expBuilder.append(detailPrefix + "tuple ids: ");
@ -618,6 +626,14 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
msg.addToOutputSlotIds(slotId.asInt());
}
}
if (!CollectionUtils.isEmpty(distributeExprLists)) {
for (List<Expr> exprList : distributeExprLists) {
msg.addToDistributeExprLists(new ArrayList<>());
for (Expr expr : exprList) {
msg.distribute_expr_lists.get(msg.distribute_expr_lists.size() - 1).add(expr.treeToThrift());
}
}
}
toThrift(msg);
container.addToNodes(msg);
if (projectList != null) {
@ -1174,6 +1190,10 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
this.pushDownAggNoGroupingOp = pushDownAggNoGroupingOp;
}
public void setDistributeExprLists(List<List<Expr>> distributeExprLists) {
this.distributeExprLists = distributeExprLists;
}
public TPushAggOp getPushDownAggNoGroupingOp() {
return pushDownAggNoGroupingOp;
}

View File

@ -71,6 +71,7 @@ public class SortNode extends PlanNode {
private boolean isDefaultLimit;
// if true, the output of this node feeds an AnalyticNode
private boolean isAnalyticSort;
private boolean isColocate = false;
private DataPartition inputPartition;
private boolean isUnusedExprRemoved = false;
@ -318,6 +319,8 @@ public class SortNode extends PlanNode {
msg.sort_node.setOffset(offset);
msg.sort_node.setUseTopnOpt(useTopnOpt);
msg.sort_node.setMergeByExchange(this.mergeByexchange);
msg.sort_node.setIsAnalyticSort(isAnalyticSort);
msg.sort_node.setIsColocate(isColocate);
}
@Override
@ -339,4 +342,8 @@ public class SortNode extends PlanNode {
Expr.getIds(materializedTupleExprs, null, result);
return new HashSet<>(result);
}
public void setColocate(boolean colocate) {
isColocate = colocate;
}
}