diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 9881fe6f6e..3633f2d1fd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -720,12 +720,13 @@ public abstract class ScanNode extends PlanNode { || getShouldColoScan(); } - public boolean ignoreStorageDataDistribution(ConnectContext context) { + public boolean ignoreStorageDataDistribution(ConnectContext context, int numBackends) { return context != null && context.getSessionVariable().isIgnoreStorageDataDistribution() && context.getSessionVariable().getEnablePipelineXEngine() && !fragment.isHasNullAwareLeftAntiJoin() - && getScanRangeNum() < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); + && getScanRangeNum() + < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * numBackends; } public int getScanRangeNum() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 222fc37a42..099aedd7e1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2036,15 +2036,16 @@ public class Coordinator implements CoordInterface { /** * Ignore storage data distribution iff: - * 1. Current fragment is not forced to use data distribution. - * 2. `parallelExecInstanceNum` is larger than scan ranges. - * 3. Use Nereids planner. + * 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges. + * 2. Use Nereids planner. */ boolean sharedScan = true; int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); boolean ignoreStorageDataDistribution = scanNodes.stream() - .allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context)) && useNereids; + .allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context, + fragmentExecParamsMap.get(scanNode.getFragment().getFragmentId()) + .scanRangeAssignment.size())) && useNereids; if (node.isPresent() && (!node.get().shouldDisableSharedScan(context) || ignoreStorageDataDistribution)) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); @@ -2913,12 +2914,13 @@ public class Coordinator implements CoordInterface { /** * Ignore storage data distribution iff: - * 1. Current fragment is not forced to use data distribution. - * 2. `parallelExecInstanceNum` is larger than scan ranges. - * 3. Use Nereids planner. + * 1. `parallelExecInstanceNum * numBackends` is larger than scan ranges. + * 2. Use Nereids planner. */ boolean ignoreStorageDataDistribution = scanNodes.stream() - .allMatch(node -> node.ignoreStorageDataDistribution(context)) + .allMatch(node -> node.ignoreStorageDataDistribution(context, + fragmentExecParamsMap.get(node.getFragment().getFragmentId()) + .scanRangeAssignment.size())) && addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> { return addressScanRange.getValue().size() < parallelExecInstanceNum; }) && useNereids;