[pipelineX](localexchange) Adjust local exchange plan rule (#30393)

This commit is contained in:
Gabriel
2024-01-26 10:10:51 +08:00
committed by yiguolei
parent 6f8c133a37
commit 09ae37dcc5
2 changed files with 13 additions and 10 deletions

View File

@ -720,12 +720,13 @@ public abstract class ScanNode extends PlanNode {
|| getShouldColoScan();
}
public boolean ignoreStorageDataDistribution(ConnectContext context) {
public boolean ignoreStorageDataDistribution(ConnectContext context, int numBackends) {
return context != null
&& context.getSessionVariable().isIgnoreStorageDataDistribution()
&& context.getSessionVariable().getEnablePipelineXEngine()
&& !fragment.isHasNullAwareLeftAntiJoin()
&& getScanRangeNum() < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
&& getScanRangeNum()
< ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numBackends;
}
public int getScanRangeNum() {

View File

@ -2036,15 +2036,16 @@ public class Coordinator implements CoordInterface {
/**
* Ignore storage data distribution iff:
* 1. Current fragment is not forced to use data distribution.
* 2. `parallelExecInstanceNum` is larger than scan ranges.
* 3. Use Nereids planner.
* 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges.
* 2. Use Nereids planner.
*/
boolean sharedScan = true;
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
boolean ignoreStorageDataDistribution = scanNodes.stream()
.allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context)) && useNereids;
.allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context,
fragmentExecParamsMap.get(scanNode.getFragment().getFragmentId())
.scanRangeAssignment.size())) && useNereids;
if (node.isPresent() && (!node.get().shouldDisableSharedScan(context)
|| ignoreStorageDataDistribution)) {
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
@ -2913,12 +2914,13 @@ public class Coordinator implements CoordInterface {
/**
* Ignore storage data distribution iff:
* 1. Current fragment is not forced to use data distribution.
* 2. `parallelExecInstanceNum` is larger than scan ranges.
* 3. Use Nereids planner.
* 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges.
* 2. Use Nereids planner.
*/
boolean ignoreStorageDataDistribution = scanNodes.stream()
.allMatch(node -> node.ignoreStorageDataDistribution(context))
.allMatch(node -> node.ignoreStorageDataDistribution(context,
fragmentExecParamsMap.get(node.getFragment().getFragmentId())
.scanRangeAssignment.size()))
&& addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> {
return addressScanRange.getValue().size() < parallelExecInstanceNum;
}) && useNereids;