diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index f11ddd1faf..4bdd96aad6 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1606,9 +1606,6 @@ public class Config extends ConfigBase { @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL) public static boolean enable_cpu_hard_limit = false; - @ConfField(mutable = true) - public static boolean disable_shared_scan = false; - @ConfField(mutable = false, masterOnly = true) public static int backend_rpc_timeout_ms = 60000; // 1 min diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 7167563d8d..45d5910f83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -1267,11 +1267,9 @@ public class OlapScanNode extends ScanNode { public int getNumInstances() { // In pipeline exec engine, the instance num equals be_num * parallel instance. // so here we need count distinct be_num to do the work. make sure get right instance - if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()) { - int parallelInstance = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); - long numBackend = scanRangeLocations.stream().flatMap(rangeLoc -> rangeLoc.getLocations().stream()) - .map(loc -> loc.backend_id).distinct().count(); - return (int) (parallelInstance * numBackend); + if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine() + && ConnectContext.get().getSessionVariable().getEnableSharedScan()) { + return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); } return scanRangeLocations.size(); } @@ -1328,7 +1326,7 @@ public class OlapScanNode extends ScanNode { // If scan is key search, should not enable the shared scan opt to prevent the performance problem // 1. where contain the eq or in expr of key column slot // 2. key column slot is distribution column and first column - public boolean isKeySearch() { + protected boolean isKeySearch() { List whereSlot = Lists.newArrayList(); for (Expr conjunct : conjuncts) { if (conjunct instanceof BinaryPredicate) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 72f00e68c9..6a409975ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -43,6 +43,7 @@ import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.planner.external.FederationBackendPolicy; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.spi.Split; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.statistics.query.StatsDelta; @@ -148,6 +149,13 @@ public abstract class ScanNode extends PlanNode { */ public abstract List getScanRangeLocations(long maxScanRangeLength); + // If scan is key search, should not enable the shared scan opt to prevent the performance problem + // 1. where contain the eq or in expr of key column slot + // 2. key column slot is distribution column and first column + protected boolean isKeySearch() { + return false; + } + /** * Update required_slots in scan node contexts. This is called after Nereids planner do the projection. * In the projection process, some slots may be removed. So call this to update the slots info. @@ -653,7 +661,14 @@ public abstract class ScanNode extends PlanNode { return scanRangeLocation; } - public boolean isKeySearch() { - return false; + // some scan should not enable the shared scan opt to prevent the performance problem + // 1. is key search + // 2. session variable not enable_shared_scan + public boolean shouldDisableSharedScan(ConnectContext context) { + return isKeySearch() || !context.getSessionVariable().getEnableSharedScan(); + } + + public boolean haveLimitAndConjunts() { + return hasLimit() && !conjuncts.isEmpty(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 56ca362e69..6475230b63 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1955,20 +1955,24 @@ public class Coordinator implements CoordInterface { // disable shared scan optimization if one of conditions below is met: // 1. Use non-pipeline or pipelineX engine - // 2. Number of scan ranges is larger than instances - // 3. This fragment has a colocated scan node - // 4. This fragment has a FileScanNode - // 5. Disable shared scan optimization by session variable - if (!enablePipelineEngine || perNodeScanRanges.size() > parallelExecInstanceNum - || (node.isPresent() && node.get().getShouldColoScan()) + // 2. This fragment has a colocated scan node + // 3. This fragment has a FileScanNode + // 4. Disable shared scan optimization by session variable + if (!enablePipelineEngine || (node.isPresent() && node.get().getShouldColoScan()) || (node.isPresent() && node.get() instanceof FileScanNode) - || (node.isPresent() && node.get().isKeySearch()) - || Config.disable_shared_scan || enablePipelineXEngine) { + || (node.isPresent() && node.get().shouldDisableSharedScan(context)) + || enablePipelineXEngine) { int expectedInstanceNum = 1; if (parallelExecInstanceNum > 1) { //the scan instance num should not larger than the tablets num expectedInstanceNum = Math.min(perNodeScanRanges.size(), parallelExecInstanceNum); } + // if have limit and conjunts, only need 1 instance to save cpu and + // mem resource + if (node.isPresent() && node.get().haveLimitAndConjunts()) { + expectedInstanceNum = 1; + } + perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges, expectedInstanceNum); sharedScanOpts = Collections.nCopies(perInstanceScanRanges.size(), false); @@ -1976,6 +1980,12 @@ public class Coordinator implements CoordInterface { int expectedInstanceNum = Math.min(parallelExecInstanceNum, leftMostNode.getNumInstances()); expectedInstanceNum = Math.max(expectedInstanceNum, 1); + // if have limit and conjunts, only need 1 instance to save cpu and + // mem resource + if (node.isPresent() && node.get().haveLimitAndConjunts()) { + expectedInstanceNum = 1; + } + perInstanceScanRanges = Collections.nCopies(expectedInstanceNum, perNodeScanRanges); sharedScanOpts = Collections.nCopies(perInstanceScanRanges.size(), true); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 955450a15c..7a4cef6e40 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -214,6 +214,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_PIPELINE_X_ENGINE = "enable_pipeline_x_engine"; + public static final String ENABLE_SHARED_SCAN = "enable_shared_scan"; + public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle"; public static final String ENABLE_AGG_STATE = "enable_agg_state"; @@ -738,6 +740,11 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_PIPELINE_X_ENGINE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL) private boolean enablePipelineXEngine = false; + + @VariableMgr.VarAttr(name = ENABLE_SHARED_SCAN, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, + needForward = true) + private boolean enableSharedScan = false; + @VariableMgr.VarAttr(name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL) private boolean enableLocalShuffle = false; @@ -2832,6 +2839,10 @@ public class SessionVariable implements Serializable, Writable { return enablePipelineEngine || enablePipelineXEngine; } + public boolean getEnableSharedScan() { + return enableSharedScan; + } + public boolean getEnablePipelineXEngine() { return enablePipelineXEngine; }