[fix](nereids) fix partition dest overwrite bug when cte as bc right (#22177)
In current cte multicast fragment param computing logic in coordinator, if shared hash table for bc opened, its destination's number will be the same as be hosts'. But the judgment of falling into shared hash table bc part code is wrong, which will cause when a multicast's target is fixed with both bc and partition, the first bc info will overwrite the following partition's, i.e, the destination info will be the host level, which should be per instance. This will cause the hash partition part hang.
This commit is contained in:
@ -1014,11 +1014,11 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
hashJoinNode.setColocate(true, "");
|
||||
leftFragment.setHasColocatePlanNode(true);
|
||||
} else if (JoinUtils.shouldBroadcastJoin(physicalHashJoin)) {
|
||||
Preconditions.checkState(rightFragment.getPlanRoot() instanceof ExchangeNode,
|
||||
Preconditions.checkState(rightPlanRoot instanceof ExchangeNode,
|
||||
"right child of broadcast join must be ExchangeNode but it is " + rightFragment.getPlanRoot());
|
||||
Preconditions.checkState(rightFragment.getChildren().size() == 1,
|
||||
"right child of broadcast join must have 1 child, but meet " + rightFragment.getChildren().size());
|
||||
rightFragment.getChild(0).setRightChildOfBroadcastHashJoin(true);
|
||||
((ExchangeNode) rightPlanRoot).setRightChildOfBroadcastHashJoin(true);
|
||||
hashJoinNode.setDistributionMode(DistributionMode.BROADCAST);
|
||||
} else if (JoinUtils.shouldBucketShuffleJoin(physicalHashJoin)) {
|
||||
hashJoinNode.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
|
||||
|
||||
@ -388,7 +388,7 @@ public class DistributedPlanner {
|
||||
node.setChild(0, leftChildFragment.getPlanRoot());
|
||||
connectChildFragment(node, 1, leftChildFragment, rightChildFragment);
|
||||
leftChildFragment.setPlanRoot(node);
|
||||
rightChildFragment.setRightChildOfBroadcastHashJoin(true);
|
||||
((ExchangeNode) node.getChild(1)).setRightChildOfBroadcastHashJoin(true);
|
||||
return leftChildFragment;
|
||||
} else {
|
||||
node.setDistributionMode(HashJoinNode.DistributionMode.PARTITIONED);
|
||||
|
||||
@ -63,6 +63,8 @@ public class ExchangeNode extends PlanNode {
|
||||
// exchange node. Null if this exchange does not merge sorted streams
|
||||
private SortInfo mergeInfo;
|
||||
|
||||
private boolean isRightChildOfBroadcastHashJoin = false;
|
||||
|
||||
/**
|
||||
* use for Nereids only.
|
||||
*/
|
||||
@ -184,4 +186,12 @@ public class ExchangeNode extends PlanNode {
|
||||
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
|
||||
return prefix + "offset: " + offset + "\n";
|
||||
}
|
||||
|
||||
public boolean isRightChildOfBroadcastHashJoin() {
|
||||
return isRightChildOfBroadcastHashJoin;
|
||||
}
|
||||
|
||||
public void setRightChildOfBroadcastHashJoin(boolean value) {
|
||||
isRightChildOfBroadcastHashJoin = value;
|
||||
}
|
||||
}
|
||||
|
||||
@ -145,8 +145,6 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
// has colocate plan node
|
||||
private boolean hasColocatePlanNode = false;
|
||||
|
||||
private boolean isRightChildOfBroadcastHashJoin = false;
|
||||
|
||||
/**
|
||||
* C'tor for fragment with specific partition; the output is by default broadcast.
|
||||
*/
|
||||
@ -434,14 +432,6 @@ public class PlanFragment extends TreeNode<PlanFragment> {
|
||||
return transferQueryStatisticsWithEveryBatch;
|
||||
}
|
||||
|
||||
public boolean isRightChildOfBroadcastHashJoin() {
|
||||
return isRightChildOfBroadcastHashJoin;
|
||||
}
|
||||
|
||||
public void setRightChildOfBroadcastHashJoin(boolean value) {
|
||||
isRightChildOfBroadcastHashJoin = value;
|
||||
}
|
||||
|
||||
public int getFragmentSequenceNum() {
|
||||
if (ConnectContext.get().getSessionVariable().isEnableNereidsPlanner()) {
|
||||
return fragmentSequenceNum;
|
||||
|
||||
@ -958,6 +958,25 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* find planNode recursively based on the planNodeId
|
||||
*/
|
||||
public static PlanNode findPlanNodeFromPlanNodeId(PlanNode root, PlanNodeId id) {
|
||||
if (root == null || root.getId() == null || id == null) {
|
||||
return null;
|
||||
} else if (root.getId().equals(id)) {
|
||||
return root;
|
||||
} else {
|
||||
for (PlanNode child : root.getChildren()) {
|
||||
PlanNode retNode = findPlanNodeFromPlanNodeId(child, id);
|
||||
if (retNode != null) {
|
||||
return retNode;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public String getPlanTreeExplainStr() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("[").append(getId().asInt()).append(": ").append(getPlanNodeName()).append("]");
|
||||
|
||||
@ -1294,6 +1294,9 @@ public class Coordinator {
|
||||
// compute destinations and # senders per exchange node
|
||||
// (the root fragment doesn't have a destination)
|
||||
for (FragmentExecParams params : fragmentExecParamsMap.values()) {
|
||||
if (params.fragment instanceof MultiCastPlanFragment) {
|
||||
continue;
|
||||
}
|
||||
PlanFragment destFragment = params.fragment.getDestFragment();
|
||||
if (destFragment == null) {
|
||||
// root plan fragment
|
||||
@ -1308,6 +1311,10 @@ public class Coordinator {
|
||||
// output at the moment
|
||||
|
||||
PlanNodeId exchId = sink.getExchNodeId();
|
||||
PlanNode exchNode = PlanNode.findPlanNodeFromPlanNodeId(destFragment.getPlanRoot(), exchId);
|
||||
Preconditions.checkState(exchNode != null, "exchNode is null");
|
||||
Preconditions.checkState(exchNode instanceof ExchangeNode,
|
||||
"exchNode is not ExchangeNode" + exchNode.getId().toString());
|
||||
// we might have multiple fragments sending to this exchange node
|
||||
// (distributed MERGE), which is why we need to add up the #senders
|
||||
if (destParams.perExchNumSenders.get(exchId.asInt()) == null) {
|
||||
@ -1357,7 +1364,7 @@ public class Coordinator {
|
||||
}
|
||||
} else {
|
||||
if (enablePipelineEngine && enableShareHashTableForBroadcastJoin
|
||||
&& params.fragment.isRightChildOfBroadcastHashJoin()) {
|
||||
&& ((ExchangeNode) exchNode).isRightChildOfBroadcastHashJoin()) {
|
||||
// here choose the first instance to build hash table.
|
||||
Map<TNetworkAddress, FInstanceExecParam> destHosts = new HashMap<>();
|
||||
destParams.instanceExecParams.forEach(param -> {
|
||||
@ -1412,7 +1419,11 @@ public class Coordinator {
|
||||
multi.getDestFragmentList().get(i).setOutputPartition(params.fragment.getOutputPartition());
|
||||
|
||||
PlanNodeId exchId = sink.getExchNodeId();
|
||||
PlanNode exchNode = PlanNode.findPlanNodeFromPlanNodeId(destFragment.getPlanRoot(), exchId);
|
||||
Preconditions.checkState(!destParams.perExchNumSenders.containsKey(exchId.asInt()));
|
||||
Preconditions.checkState(exchNode != null, "exchNode is null");
|
||||
Preconditions.checkState(exchNode instanceof ExchangeNode,
|
||||
"exchNode is not ExchangeNode" + exchNode.getId().toString());
|
||||
if (destParams.perExchNumSenders.get(exchId.asInt()) == null) {
|
||||
destParams.perExchNumSenders.put(exchId.asInt(), params.instanceExecParams.size());
|
||||
} else {
|
||||
@ -1422,7 +1433,7 @@ public class Coordinator {
|
||||
|
||||
List<TPlanFragmentDestination> destinations = multiSink.getDestinations().get(i);
|
||||
if (enablePipelineEngine && enableShareHashTableForBroadcastJoin
|
||||
&& params.fragment.isRightChildOfBroadcastHashJoin()) {
|
||||
&& ((ExchangeNode) exchNode).isRightChildOfBroadcastHashJoin()) {
|
||||
// here choose the first instance to build hash table.
|
||||
Map<TNetworkAddress, FInstanceExecParam> destHosts = new HashMap<>();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user