[pipeline](exec) disable shared scan in default and disable shared scan in limit with where scan (#25952)
This commit is contained in:
@ -1606,9 +1606,6 @@ public class Config extends ConfigBase {
|
||||
@ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL)
|
||||
public static boolean enable_cpu_hard_limit = false;
|
||||
|
||||
@ConfField(mutable = true)
|
||||
public static boolean disable_shared_scan = false;
|
||||
|
||||
@ConfField(mutable = false, masterOnly = true)
|
||||
public static int backend_rpc_timeout_ms = 60000; // 1 min
|
||||
|
||||
|
||||
@ -1267,11 +1267,9 @@ public class OlapScanNode extends ScanNode {
|
||||
public int getNumInstances() {
|
||||
// In pipeline exec engine, the instance num equals be_num * parallel instance.
|
||||
// so here we need count distinct be_num to do the work. make sure get right instance
|
||||
if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()) {
|
||||
int parallelInstance = ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
|
||||
long numBackend = scanRangeLocations.stream().flatMap(rangeLoc -> rangeLoc.getLocations().stream())
|
||||
.map(loc -> loc.backend_id).distinct().count();
|
||||
return (int) (parallelInstance * numBackend);
|
||||
if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
|
||||
&& ConnectContext.get().getSessionVariable().getEnableSharedScan()) {
|
||||
return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
|
||||
}
|
||||
return scanRangeLocations.size();
|
||||
}
|
||||
@ -1328,7 +1326,7 @@ public class OlapScanNode extends ScanNode {
|
||||
// If scan is key search, should not enable the shared scan opt to prevent the performance problem
|
||||
// 1. where contain the eq or in expr of key column slot
|
||||
// 2. key column slot is distribution column and first column
|
||||
public boolean isKeySearch() {
|
||||
protected boolean isKeySearch() {
|
||||
List<SlotRef> whereSlot = Lists.newArrayList();
|
||||
for (Expr conjunct : conjuncts) {
|
||||
if (conjunct instanceof BinaryPredicate) {
|
||||
|
||||
@ -43,6 +43,7 @@ import org.apache.doris.common.NotImplementedException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
|
||||
import org.apache.doris.planner.external.FederationBackendPolicy;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.spi.Split;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
import org.apache.doris.statistics.query.StatsDelta;
|
||||
@ -148,6 +149,13 @@ public abstract class ScanNode extends PlanNode {
|
||||
*/
|
||||
public abstract List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength);
|
||||
|
||||
// If scan is key search, should not enable the shared scan opt to prevent the performance problem
|
||||
// 1. where contain the eq or in expr of key column slot
|
||||
// 2. key column slot is distribution column and first column
|
||||
protected boolean isKeySearch() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update required_slots in scan node contexts. This is called after Nereids planner do the projection.
|
||||
* In the projection process, some slots may be removed. So call this to update the slots info.
|
||||
@ -653,7 +661,14 @@ public abstract class ScanNode extends PlanNode {
|
||||
return scanRangeLocation;
|
||||
}
|
||||
|
||||
public boolean isKeySearch() {
|
||||
return false;
|
||||
// some scan should not enable the shared scan opt to prevent the performance problem
|
||||
// 1. is key search
|
||||
// 2. session variable not enable_shared_scan
|
||||
public boolean shouldDisableSharedScan(ConnectContext context) {
|
||||
return isKeySearch() || !context.getSessionVariable().getEnableSharedScan();
|
||||
}
|
||||
|
||||
public boolean haveLimitAndConjunts() {
|
||||
return hasLimit() && !conjuncts.isEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1955,20 +1955,24 @@ public class Coordinator implements CoordInterface {
|
||||
|
||||
// disable shared scan optimization if one of conditions below is met:
|
||||
// 1. Use non-pipeline or pipelineX engine
|
||||
// 2. Number of scan ranges is larger than instances
|
||||
// 3. This fragment has a colocated scan node
|
||||
// 4. This fragment has a FileScanNode
|
||||
// 5. Disable shared scan optimization by session variable
|
||||
if (!enablePipelineEngine || perNodeScanRanges.size() > parallelExecInstanceNum
|
||||
|| (node.isPresent() && node.get().getShouldColoScan())
|
||||
// 2. This fragment has a colocated scan node
|
||||
// 3. This fragment has a FileScanNode
|
||||
// 4. Disable shared scan optimization by session variable
|
||||
if (!enablePipelineEngine || (node.isPresent() && node.get().getShouldColoScan())
|
||||
|| (node.isPresent() && node.get() instanceof FileScanNode)
|
||||
|| (node.isPresent() && node.get().isKeySearch())
|
||||
|| Config.disable_shared_scan || enablePipelineXEngine) {
|
||||
|| (node.isPresent() && node.get().shouldDisableSharedScan(context))
|
||||
|| enablePipelineXEngine) {
|
||||
int expectedInstanceNum = 1;
|
||||
if (parallelExecInstanceNum > 1) {
|
||||
//the scan instance num should not larger than the tablets num
|
||||
expectedInstanceNum = Math.min(perNodeScanRanges.size(), parallelExecInstanceNum);
|
||||
}
|
||||
// if have limit and conjunts, only need 1 instance to save cpu and
|
||||
// mem resource
|
||||
if (node.isPresent() && node.get().haveLimitAndConjunts()) {
|
||||
expectedInstanceNum = 1;
|
||||
}
|
||||
|
||||
perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges,
|
||||
expectedInstanceNum);
|
||||
sharedScanOpts = Collections.nCopies(perInstanceScanRanges.size(), false);
|
||||
@ -1976,6 +1980,12 @@ public class Coordinator implements CoordInterface {
|
||||
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
|
||||
leftMostNode.getNumInstances());
|
||||
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
|
||||
// if have limit and conjunts, only need 1 instance to save cpu and
|
||||
// mem resource
|
||||
if (node.isPresent() && node.get().haveLimitAndConjunts()) {
|
||||
expectedInstanceNum = 1;
|
||||
}
|
||||
|
||||
perInstanceScanRanges = Collections.nCopies(expectedInstanceNum, perNodeScanRanges);
|
||||
sharedScanOpts = Collections.nCopies(perInstanceScanRanges.size(), true);
|
||||
}
|
||||
|
||||
@ -214,6 +214,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
public static final String ENABLE_PIPELINE_X_ENGINE = "enable_pipeline_x_engine";
|
||||
|
||||
public static final String ENABLE_SHARED_SCAN = "enable_shared_scan";
|
||||
|
||||
public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle";
|
||||
|
||||
public static final String ENABLE_AGG_STATE = "enable_agg_state";
|
||||
@ -738,6 +740,11 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_PIPELINE_X_ENGINE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL)
|
||||
private boolean enablePipelineXEngine = false;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_SHARED_SCAN, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
|
||||
needForward = true)
|
||||
private boolean enableSharedScan = false;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL)
|
||||
private boolean enableLocalShuffle = false;
|
||||
|
||||
@ -2832,6 +2839,10 @@ public class SessionVariable implements Serializable, Writable {
|
||||
return enablePipelineEngine || enablePipelineXEngine;
|
||||
}
|
||||
|
||||
public boolean getEnableSharedScan() {
|
||||
return enableSharedScan;
|
||||
}
|
||||
|
||||
public boolean getEnablePipelineXEngine() {
|
||||
return enablePipelineXEngine;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user