[opt](scanner) Control the degree of parallelism of scanner when only limit involved #39927 (#40357)

cherry pick from #39927
This commit is contained in:
zhiqiang
2024-09-09 10:42:19 +08:00
committed by GitHub
parent 1c91fbc167
commit a963709fed
5 changed files with 246 additions and 8 deletions

View File

@ -741,8 +741,20 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator {
}
public boolean shouldUseOneInstance(ConnectContext ctx) {
long limitRowsForSingleInstance = ctx == null ? 10000 : ctx.getSessionVariable().limitRowsForSingleInstance;
return hasLimit() && getLimit() < limitRowsForSingleInstance && conjuncts.isEmpty();
int adaptivePipelineTaskSerialReadOnLimit = 10000;
if (ctx != null) {
if (ctx.getSessionVariable().enableAdaptivePipelineTaskSerialReadOnLimit) {
adaptivePipelineTaskSerialReadOnLimit = ctx.getSessionVariable().adaptivePipelineTaskSerialReadOnLimit;
} else {
return false;
}
} else {
// No connection context, typically for broker load.
}
// For UniqueKey table, we will use multiple instance.
return hasLimit() && getLimit() <= adaptivePipelineTaskSerialReadOnLimit && conjuncts.isEmpty();
}
public long getSelectedPartitionNum() {

View File

@ -624,6 +624,11 @@ public class SessionVariable implements Serializable, Writable {
public static final String IN_LIST_VALUE_COUNT_THRESHOLD = "in_list_value_count_threshold";
public static final String ENABLE_ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT =
"enable_adaptive_pipeline_task_serial_read_on_limit";
public static final String ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT =
"adaptive_pipeline_task_serial_read_on_limit";
/**
* If set false, user couldn't submit analyze SQL and FE won't allocate any related resources.
*/
@ -2025,6 +2030,7 @@ public class SessionVariable implements Serializable, Writable {
})
public boolean enableFallbackOnMissingInvertedIndex = true;
@VariableMgr.VarAttr(name = IN_LIST_VALUE_COUNT_THRESHOLD, description = {
"in条件value数量大于这个threshold后将不会走fast_execute",
"When the number of values in the IN condition exceeds this threshold,"
@ -2032,6 +2038,22 @@ public class SessionVariable implements Serializable, Writable {
})
public int inListValueCountThreshold = 10;
@VariableMgr.VarAttr(name = ENABLE_ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT, needForward = true, description = {
"开启后将会允许自动调整 pipeline task 的并发数。当 scan 节点没有过滤条件,且 limit 参数小于"
+ "adaptive_pipeline_task_serial_read_on_limit 中指定的行数时,scan 的并行度将会被设置为 1",
"When enabled, the pipeline task concurrency will be adjusted automatically. When the scan node has no filter "
+ "conditions and the limit parameter is less than the number of rows specified in "
+ "adaptive_pipeline_task_serial_read_on_limit, the parallelism of the scan will be set to 1."
})
public boolean enableAdaptivePipelineTaskSerialReadOnLimit = true;
@VariableMgr.VarAttr(name = ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT, needForward = true, description = {
"当 enable_adaptive_pipeline_task_serial_read_on_limit 开启时,scan 的并行度将会被设置为 1 的行数阈值",
"When enable_adaptive_pipeline_task_serial_read_on_limit is enabled, "
+ "the number of rows at which the parallelism of the scan will be set to 1."
})
public int adaptivePipelineTaskSerialReadOnLimit = 10000;
public void setEnableEsParallelScroll(boolean enableESParallelScroll) {
this.enableESParallelScroll = enableESParallelScroll;
}
@ -3535,6 +3557,8 @@ public class SessionVariable implements Serializable, Writable {
tResult.setEnableFallbackOnMissingInvertedIndex(enableFallbackOnMissingInvertedIndex);
tResult.setKeepCarriageReturn(keepCarriageReturn);
tResult.setEnableAdaptivePipelineTaskSerialReadOnLimit(enableAdaptivePipelineTaskSerialReadOnLimit);
tResult.setAdaptivePipelineTaskSerialReadOnLimit(adaptivePipelineTaskSerialReadOnLimit);
tResult.setInListValueCountThreshold(inListValueCountThreshold);
return tResult;
}