[fix](Nereids)join output order need same with child plan node output when translate (#12130)

In BE, There is an implicit convention that HashJoinNode's left child's output Slot must before right child's output slot in intermediateTuple.

However, after we do commute rule on join plan in Nereids, this convention will be broken and cause core dump in BE.

There are two way to fix this problem:
1. add a project on join after we do commute
2. reorder output of join node when we do translate

Since we cannot translate project yet because BE projection support is on going(#11842). So we use second way to fix it now. After the project translation could work correctly, we should use the first way to fix it.
This commit is contained in:
morrySnow
2022-08-29 15:32:55 +08:00
committed by GitHub
parent 454e21daca
commit 1d9d99c8ec
3 changed files with 69 additions and 32 deletions

View File

@ -73,9 +73,9 @@ public class NereidsPlanner extends Planner {
PlanTranslatorContext planTranslatorContext = new PlanTranslatorContext();
PlanFragment root = physicalPlanTranslator.translatePlan(physicalPlan, planTranslatorContext);
scanNodeList = planTranslatorContext.getScanNodeList();
scanNodeList = planTranslatorContext.getScanNodes();
descTable = planTranslatorContext.getDescTable();
fragments = new ArrayList<>(planTranslatorContext.getPlanFragmentList());
fragments = new ArrayList<>(planTranslatorContext.getPlanFragments());
// set output exprs
logicalPlanAdapter.setResultExprs(root.getOutputExprs());

View File

@ -73,6 +73,7 @@ import org.apache.doris.planner.SortNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
@ -80,8 +81,11 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Used to translate to physical plan generated by new optimizer to the plan fragments.
@ -136,10 +140,10 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
rootFragment.setOutputExprs(outputExprs);
}
rootFragment.getPlanRoot().convertToVectoriezd();
for (PlanFragment fragment : context.getPlanFragmentList()) {
for (PlanFragment fragment : context.getPlanFragments()) {
fragment.finalize(null);
}
Collections.reverse(context.getPlanFragmentList());
Collections.reverse(context.getPlanFragments());
return rootFragment;
}
@ -363,32 +367,47 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
// NOTICE: We must visit from right to left, to ensure the last fragment is root fragment
PlanFragment rightFragment = hashJoin.child(1).accept(this, context);
PlanFragment leftFragment = hashJoin.child(0).accept(this, context);
PlanNode leftFragmentPlanRoot = leftFragment.getPlanRoot();
PlanNode rightFragmentPlanRoot = rightFragment.getPlanRoot();
JoinType joinType = hashJoin.getJoinType();
if (JoinUtils.shouldNestedLoopJoin(hashJoin)) {
throw new RuntimeException("Physical hash join could not execute without equal join condition.");
}
List<Expr> execEqConjunctList = hashJoin.getHashJoinConjuncts().stream()
PlanNode leftPlanRoot = leftFragment.getPlanRoot();
PlanNode rightPlanRoot = rightFragment.getPlanRoot();
JoinType joinType = hashJoin.getJoinType();
List<Expr> execEqConjuncts = hashJoin.getHashJoinConjuncts().stream()
.map(EqualTo.class::cast)
.map(e -> swapEqualToForChildrenOrder(e, hashJoin.left().getOutput()))
.map(e -> ExpressionTranslator.translate(e, context))
.collect(Collectors.toList());
TupleDescriptor leftTuple = context.getTupleDesc(leftPlanRoot);
TupleDescriptor rightTuple = context.getTupleDesc(rightPlanRoot);
// Nereids does not care about output order of join,
// but BE need left child's output must be before right child's output.
// So we need to swap the output order of left and right child if necessary.
// TODO: revert this after Nereids could ensure the output order is correct.
TupleDescriptor outputDescriptor = context.generateTupleDesc();
List<Expr> srcToOutput = hashJoin.getOutput().stream()
Map<ExprId, SlotReference> slotReferenceMap = Maps.newHashMap();
hashJoin.getOutput().stream()
.map(SlotReference.class::cast)
.forEach(s -> slotReferenceMap.put(s.getExprId(), s));
List<Expr> srcToOutput = Stream.concat(leftTuple.getSlots().stream(), rightTuple.getSlots().stream())
.map(sd -> context.findExprId(sd.getId()))
.map(slotReferenceMap::get)
.filter(Objects::nonNull)
.peek(s -> context.createSlotDesc(outputDescriptor, s))
.map(e -> ExpressionTranslator.translate(e, context))
.collect(Collectors.toList());
HashJoinNode hashJoinNode = new HashJoinNode(context.nextPlanNodeId(), leftFragmentPlanRoot,
rightFragmentPlanRoot, JoinType.toJoinOperator(joinType), execEqConjunctList, Lists.newArrayList(),
HashJoinNode hashJoinNode = new HashJoinNode(context.nextPlanNodeId(), leftPlanRoot,
rightPlanRoot, JoinType.toJoinOperator(joinType), execEqConjuncts, Lists.newArrayList(),
srcToOutput, outputDescriptor, outputDescriptor);
hashJoinNode.setDistributionMode(DistributionMode.BROADCAST);
hashJoinNode.setChild(0, leftFragmentPlanRoot);
hashJoinNode.setChild(0, leftPlanRoot);
connectChildFragment(hashJoinNode, 1, leftFragment, rightFragment, context);
leftFragment.setPlanRoot(hashJoinNode);
return leftFragment;
@ -426,7 +445,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
// TODO: handle p.child(0) is not NamedExpression.
project.getProjects().stream().filter(Alias.class::isInstance).forEach(p -> {
SlotRef ref = context.findSlotRef(((NamedExpression) p.child(0)).getExprId());
context.addExprIdPair(p.getExprId(), ref);
context.addExprIdSlotRefPair(p.getExprId(), ref);
});
List<Expr> execExprList = project.getProjects()

View File

@ -19,9 +19,9 @@ package org.apache.doris.nereids.glue.translator;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotId;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.nereids.trees.expressions.ExprId;
@ -29,13 +29,13 @@ import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -43,23 +43,28 @@ import java.util.Map;
* Context of physical plan.
*/
public class PlanTranslatorContext {
private final List<PlanFragment> planFragmentList = Lists.newArrayList();
private final List<PlanFragment> planFragments = Lists.newArrayList();
private final DescriptorTable descTable = new DescriptorTable();
/**
* Map expressions of new optimizer to the stale expr.
* index from Nereids' slot to legacy slot.
*/
private final Map<ExprId, SlotRef> exprIdSlotRefMap = new HashMap<>();
private final Map<ExprId, SlotRef> exprIdToSlotRef = Maps.newHashMap();
private final List<ScanNode> scanNodeList = new ArrayList<>();
/**
* Inverted index from legacy slot to Nereids' slot.
*/
private final Map<SlotId, ExprId> slotIdToExprId = Maps.newHashMap();
private final List<ScanNode> scanNodes = Lists.newArrayList();
private final IdGenerator<PlanFragmentId> fragmentIdGenerator = PlanFragmentId.createGenerator();
private final IdGenerator<PlanNodeId> nodeIdGenerator = PlanNodeId.createGenerator();
public List<PlanFragment> getPlanFragmentList() {
return planFragmentList;
public List<PlanFragment> getPlanFragments() {
return planFragments;
}
public TupleDescriptor generateTupleDesc() {
@ -79,23 +84,29 @@ public class PlanTranslatorContext {
}
public void addPlanFragment(PlanFragment planFragment) {
this.planFragmentList.add(planFragment);
this.planFragments.add(planFragment);
}
public void addExprIdPair(ExprId exprId, SlotRef slotRef) {
exprIdSlotRefMap.put(exprId, slotRef);
public void addExprIdSlotRefPair(ExprId exprId, SlotRef slotRef) {
exprIdToSlotRef.put(exprId, slotRef);
slotIdToExprId.put(slotRef.getDesc().getId(), exprId);
}
public SlotRef findSlotRef(ExprId exprId) {
return exprIdSlotRefMap.get(exprId);
return exprIdToSlotRef.get(exprId);
}
public void addScanNode(ScanNode scanNode) {
scanNodeList.add(scanNode);
scanNodes.add(scanNode);
}
public List<ScanNode> getScanNodeList() {
return scanNodeList;
public ExprId findExprId(SlotId slotId) {
return slotIdToExprId.get(slotId);
}
public List<ScanNode> getScanNodes() {
return scanNodes;
}
/**
@ -110,7 +121,7 @@ public class PlanTranslatorContext {
}
slotDescriptor.setType(slotReference.getDataType().toCatalogDataType());
slotDescriptor.setIsMaterialized(true);
this.addExprIdPair(slotReference.getExprId(), new SlotRef(slotDescriptor));
this.addExprIdSlotRefPair(slotReference.getExprId(), new SlotRef(slotDescriptor));
return slotDescriptor;
}
@ -122,8 +133,15 @@ public class PlanTranslatorContext {
slotDescriptor.setType(expression.getDataType().toCatalogDataType());
}
public TupleDescriptor getTupleDesc(TupleId tupleId) {
return descTable.getTupleDesc(tupleId);
/**
* in Nereids, all node only has one TupleDescriptor, so we can use the first one.
*
* @param planNode the node to get the TupleDescriptor
*
* @return plan node's tuple descriptor
*/
public TupleDescriptor getTupleDesc(PlanNode planNode) {
return descTable.getTupleDesc(planNode.getOutputTupleIds().get(0));
}
public DescriptorTable getDescTable() {