diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index cc7993d6dd..76211137c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -1014,11 +1014,11 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor { // has colocate plan node private boolean hasColocatePlanNode = false; - private boolean isRightChildOfBroadcastHashJoin = false; - /** * C'tor for fragment with specific partition; the output is by default broadcast. */ @@ -434,14 +432,6 @@ public class PlanFragment extends TreeNode { return transferQueryStatisticsWithEveryBatch; } - public boolean isRightChildOfBroadcastHashJoin() { - return isRightChildOfBroadcastHashJoin; - } - - public void setRightChildOfBroadcastHashJoin(boolean value) { - isRightChildOfBroadcastHashJoin = value; - } - public int getFragmentSequenceNum() { if (ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) { return fragmentSequenceNum; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 568975b93d..a279af676e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -958,6 +958,25 @@ public abstract class PlanNode extends TreeNode implements PlanStats { } } + /** + * find planNode recursively based on the planNodeId + */ + public static PlanNode findPlanNodeFromPlanNodeId(PlanNode root, PlanNodeId id) { + if (root == null || root.getId() == null || id == null) { + return null; + } else if (root.getId().equals(id)) { + return root; + } else { + for (PlanNode child : root.getChildren()) { + PlanNode retNode = findPlanNodeFromPlanNodeId(child, id); + if (retNode != null) { + return retNode; + } + } + return null; + } + } + public String getPlanTreeExplainStr() { StringBuilder sb = new StringBuilder(); sb.append("[").append(getId().asInt()).append(": ").append(getPlanNodeName()).append("]"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 7f0abff227..39c94320c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1294,6 +1294,9 @@ public class Coordinator { // compute destinations and # senders per exchange node // (the root fragment doesn't have a destination) for (FragmentExecParams params : fragmentExecParamsMap.values()) { + if (params.fragment instanceof MultiCastPlanFragment) { + continue; + } PlanFragment destFragment = params.fragment.getDestFragment(); if (destFragment == null) { // root plan fragment @@ -1308,6 +1311,10 @@ public class Coordinator { // output at the moment PlanNodeId exchId = sink.getExchNodeId(); + PlanNode exchNode = PlanNode.findPlanNodeFromPlanNodeId(destFragment.getPlanRoot(), exchId); + Preconditions.checkState(exchNode != null, "exchNode is null"); + Preconditions.checkState(exchNode instanceof ExchangeNode, + "exchNode is not ExchangeNode" + exchNode.getId().toString()); // we might have multiple fragments sending to this exchange node // (distributed MERGE), which is why we need to add up the #senders if (destParams.perExchNumSenders.get(exchId.asInt()) == null) { @@ -1357,7 +1364,7 @@ public class Coordinator { } } else { if (enablePipelineEngine && enableShareHashTableForBroadcastJoin - && params.fragment.isRightChildOfBroadcastHashJoin()) { + && ((ExchangeNode) exchNode).isRightChildOfBroadcastHashJoin()) { // here choose the first instance to build hash table. Map destHosts = new HashMap<>(); destParams.instanceExecParams.forEach(param -> { @@ -1412,7 +1419,11 @@ public class Coordinator { multi.getDestFragmentList().get(i).setOutputPartition(params.fragment.getOutputPartition()); PlanNodeId exchId = sink.getExchNodeId(); + PlanNode exchNode = PlanNode.findPlanNodeFromPlanNodeId(destFragment.getPlanRoot(), exchId); Preconditions.checkState(!destParams.perExchNumSenders.containsKey(exchId.asInt())); + Preconditions.checkState(exchNode != null, "exchNode is null"); + Preconditions.checkState(exchNode instanceof ExchangeNode, + "exchNode is not ExchangeNode" + exchNode.getId().toString()); if (destParams.perExchNumSenders.get(exchId.asInt()) == null) { destParams.perExchNumSenders.put(exchId.asInt(), params.instanceExecParams.size()); } else { @@ -1422,7 +1433,7 @@ public class Coordinator { List destinations = multiSink.getDestinations().get(i); if (enablePipelineEngine && enableShareHashTableForBroadcastJoin - && params.fragment.isRightChildOfBroadcastHashJoin()) { + && ((ExchangeNode) exchNode).isRightChildOfBroadcastHashJoin()) { // here choose the first instance to build hash table. Map destHosts = new HashMap<>();