[fix](nereids) fix union all instance number (#40099)
pick from https://github.com/apache/doris/pull/39999 Co-authored-by: xiongzhongjian <xiongzhongjian@selectdb.com>
This commit is contained in:
@ -2043,7 +2043,6 @@ public class Coordinator implements CoordInterface {
|
||||
}
|
||||
|
||||
Pair<PlanNode, PlanNode> pairNodes = findLeftmostNode(fragment.getPlanRoot());
|
||||
PlanNode fatherNode = pairNodes.first;
|
||||
PlanNode leftMostNode = pairNodes.second;
|
||||
|
||||
/*
|
||||
@ -2058,25 +2057,8 @@ public class Coordinator implements CoordInterface {
|
||||
// (Case B)
|
||||
// there is no leftmost scan; we assign the same hosts as those of our
|
||||
// input fragment which has a higher instance_number
|
||||
|
||||
int inputFragmentIndex = 0;
|
||||
int maxParallelism = 0;
|
||||
// If the fragment has three children, then the first child and the second child are
|
||||
// the children(both exchange node) of shuffle HashJoinNode,
|
||||
// and the third child is the right child(ExchangeNode) of broadcast HashJoinNode.
|
||||
// We only need to pay attention to the maximum parallelism among
|
||||
// the two ExchangeNodes of shuffle HashJoinNode.
|
||||
int childrenCount = (fatherNode != null) ? fatherNode.getChildren().size() : 1;
|
||||
for (int j = 0; j < childrenCount; j++) {
|
||||
int currentChildFragmentParallelism
|
||||
= fragmentExecParamsMap.get(fragment.getChild(j).getFragmentId()).instanceExecParams.size();
|
||||
if (currentChildFragmentParallelism > maxParallelism) {
|
||||
maxParallelism = currentChildFragmentParallelism;
|
||||
inputFragmentIndex = j;
|
||||
}
|
||||
}
|
||||
|
||||
PlanFragmentId inputFragmentId = fragment.getChild(inputFragmentIndex).getFragmentId();
|
||||
int maxParallelFragmentIndex = findMaxParallelFragmentIndex(fragment);
|
||||
PlanFragmentId inputFragmentId = fragment.getChild(maxParallelFragmentIndex).getFragmentId();
|
||||
// AddAll() soft copy()
|
||||
int exchangeInstances = -1;
|
||||
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
|
||||
@ -2234,6 +2216,27 @@ public class Coordinator implements CoordInterface {
|
||||
}
|
||||
}
|
||||
|
||||
private int findMaxParallelFragmentIndex(PlanFragment fragment) {
|
||||
Preconditions.checkState(!fragment.getChildren().isEmpty(), "fragment has no children");
|
||||
|
||||
// exclude broadcast join right side's child fragments
|
||||
List<PlanFragment> childFragmentCandidates = fragment.getChildren().stream()
|
||||
.filter(e -> e.getOutputPartition() != DataPartition.UNPARTITIONED)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
int maxParallelism = 0;
|
||||
int maxParaIndex = 0;
|
||||
for (int i = 0; i < childFragmentCandidates.size(); i++) {
|
||||
PlanFragmentId childFragmentId = childFragmentCandidates.get(i).getFragmentId();
|
||||
int currentChildFragmentParallelism = fragmentExecParamsMap.get(childFragmentId).instanceExecParams.size();
|
||||
if (currentChildFragmentParallelism > maxParallelism) {
|
||||
maxParallelism = currentChildFragmentParallelism;
|
||||
maxParaIndex = i;
|
||||
}
|
||||
}
|
||||
return maxParaIndex;
|
||||
}
|
||||
|
||||
private TNetworkAddress getGroupCommitBackend(Map<TNetworkAddress, Long> addressToBackendID) {
|
||||
// Used for Nereids planner Group commit insert BE select.
|
||||
TNetworkAddress execHostport = new TNetworkAddress(groupCommitBackend.getHost(),
|
||||
|
||||
Reference in New Issue
Block a user