diff --git a/fe/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/src/main/java/org/apache/doris/qe/Coordinator.java index 19042a3e9e..9d6103239b 100644 --- a/fe/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/src/main/java/org/apache/doris/qe/Coordinator.java @@ -833,18 +833,21 @@ public class Coordinator { } PlanNode leftMostNode = findLeftmostNode(fragment.getPlanRoot()); - // When fragment contains UnionNode, because the fragment may has child - // and not all BE will receive the fragment, child fragment's dest must - // be BE that fragment's scannode locates, avoid less data. - // chenhao added - boolean hasUnionNode = containsUnionNode(fragment.getPlanRoot()); - if (!(leftMostNode instanceof ScanNode) && !hasUnionNode) { + /* + * Case A: + * if the left most is ScanNode, which means there is no child fragment, + * we should assign fragment instances on every scan node hosts. + * Case B: + * if not, there should be exchange nodes to collect all data from child fragments(input fragments), + * so we should assign fragment instances corresponding to the child fragments' host + */ + if (!(leftMostNode instanceof ScanNode)) { + // (Case B) // there is no leftmost scan; we assign the same hosts as those of our // leftmost input fragment (so that a partitioned aggregation // fragment runs on the hosts that provide the input data) - PlanFragmentId inputFragmentIdx = - fragments.get(i).getChild(0).getFragmentId(); + PlanFragmentId inputFragmentIdx = fragments.get(i).getChild(0).getFragmentId(); // AddAll() soft copy() int exchangeInstances = -1; if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) { @@ -893,7 +896,7 @@ public class Coordinator { params.instanceExecParams.add(instanceParam); } } else { - // normal fragment + // case A Iterator iter = fragmentExecParamsMap.get(fragment.getFragmentId()).scanRangeAssignment.entrySet().iterator(); int parallelExecInstanceNum = fragment.getParallel_exec_num(); while (iter.hasNext()) {