Optimize the query plan so that UnionNode can be executed distributedly (#2150)

This commit is contained in:
Mingyu Chen
2019-11-07 19:41:06 +08:00
committed by GitHub
parent c25e826dce
commit 2efd9e54ea

View File

@ -833,18 +833,21 @@ public class Coordinator {
}
PlanNode leftMostNode = findLeftmostNode(fragment.getPlanRoot());
// When fragment contains UnionNode, because the fragment may has child
// and not all BE will receive the fragment, child fragment's dest must
// be BE that fragment's scannode locates, avoid less data.
// chenhao added
boolean hasUnionNode = containsUnionNode(fragment.getPlanRoot());
if (!(leftMostNode instanceof ScanNode) && !hasUnionNode) {
/*
* Case A:
* if the left most is ScanNode, which means there is no child fragment,
* we should assign fragment instances on every scan node hosts.
* Case B:
* if not, there should be exchange nodes to collect all data from child fragments(input fragments),
* so we should assign fragment instances corresponding to the child fragments' host
*/
if (!(leftMostNode instanceof ScanNode)) {
// (Case B)
// there is no leftmost scan; we assign the same hosts as those of our
// leftmost input fragment (so that a partitioned aggregation
// fragment runs on the hosts that provide the input data)
PlanFragmentId inputFragmentIdx =
fragments.get(i).getChild(0).getFragmentId();
PlanFragmentId inputFragmentIdx = fragments.get(i).getChild(0).getFragmentId();
// AddAll() soft copy()
int exchangeInstances = -1;
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
@ -893,7 +896,7 @@ public class Coordinator {
params.instanceExecParams.add(instanceParam);
}
} else {
// normal fragment
// case A
Iterator iter = fragmentExecParamsMap.get(fragment.getFragmentId()).scanRangeAssignment.entrySet().iterator();
int parallelExecInstanceNum = fragment.getParallel_exec_num();
while (iter.hasNext()) {