diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index d21fab7ac9..885bb335e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -724,7 +724,9 @@ public abstract class ScanNode extends PlanNode { return !isKeySearch() && context != null && context.getSessionVariable().isIgnoreStorageDataDistribution() && context.getSessionVariable().getEnablePipelineXEngine() - && !fragment.isHasNullAwareLeftAntiJoin(); + && !fragment.isHasNullAwareLeftAntiJoin() + && ((this instanceof OlapScanNode) && ((OlapScanNode) this).getScanTabletIds().size() + < ConnectContext.get().getSessionVariable().getParallelExecInstanceNum()); } public boolean haveLimitAndConjunts() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 9367316b20..efee918dfd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -2044,9 +2044,10 @@ public class Coordinator implements CoordInterface { boolean sharedScan = true; int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); + boolean ignoreStorageDataDistribution = scanNodes.stream() + .allMatch(scanNode -> scanNode.ignoreStorageDataDistribution(context)) && useNereids; if (node.isPresent() && (!node.get().shouldDisableSharedScan(context) - || (node.get().ignoreStorageDataDistribution(context) - && expectedInstanceNum > perNodeScanRanges.size() && useNereids))) { + || ignoreStorageDataDistribution)) { expectedInstanceNum = Math.max(expectedInstanceNum, 1); // if have limit and conjunts, only need 1 instance to save cpu and // mem resource @@ -2906,9 +2907,8 @@ public class Coordinator implements CoordInterface { * 2. `parallelExecInstanceNum` is larger than scan ranges. * 3. Use Nereids planner. */ - boolean ignoreStorageDataDistribution = scanNodes.stream().filter(scanNode -> { - return scanNodeIds.contains(scanNode.getId().asInt()); - }).allMatch(node -> node.ignoreStorageDataDistribution(context)) + boolean ignoreStorageDataDistribution = scanNodes.stream() + .allMatch(node -> node.ignoreStorageDataDistribution(context)) && addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> { return addressScanRange.getValue().size() < parallelExecInstanceNum; }) && useNereids;