[feature](executor) using max_instance_num to limit automatically instance (#22521)
This commit is contained in:
@ -19,7 +19,6 @@ package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.UserException;
|
||||
@ -179,22 +178,7 @@ public class SetVar {
|
||||
this.value = new StringLiteral(TimeUtils.checkTimeZoneValidAndStandardize(getResult().getStringValue()));
|
||||
this.result = (LiteralExpr) this.value;
|
||||
}
|
||||
if (getVariable().equalsIgnoreCase(SessionVariable.PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM)) {
|
||||
int instanceNum = Integer.parseInt(getResult().getStringValue());
|
||||
if (instanceNum > Config.max_instance_num) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR,
|
||||
SessionVariable.PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM,
|
||||
instanceNum + "(Should not be set to more than " + Config.max_instance_num + ")");
|
||||
}
|
||||
}
|
||||
if (getVariable().equalsIgnoreCase(SessionVariable.PARALLEL_PIPELINE_TASK_NUM)) {
|
||||
int instanceNum = Integer.parseInt(getValue().getStringValue());
|
||||
if (instanceNum > Config.max_instance_num) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_FOR_VAR,
|
||||
SessionVariable.PARALLEL_PIPELINE_TASK_NUM,
|
||||
instanceNum + "(Should not be set to more than " + Config.max_instance_num + ")");
|
||||
}
|
||||
}
|
||||
|
||||
if (getVariable().equalsIgnoreCase(SessionVariable.EXEC_MEM_LIMIT)) {
|
||||
this.value = new StringLiteral(Long.toString(ParseUtil.analyzeDataVolumn(getResult().getStringValue())));
|
||||
this.result = (LiteralExpr) this.value;
|
||||
|
||||
@ -106,6 +106,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public static final String ENABLE_BUCKET_SHUFFLE_JOIN = "enable_bucket_shuffle_join";
|
||||
public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = "parallel_fragment_exec_instance_num";
|
||||
public static final String PARALLEL_PIPELINE_TASK_NUM = "parallel_pipeline_task_num";
|
||||
public static final String MAX_INSTANCE_NUM = "max_instance_num";
|
||||
public static final String ENABLE_INSERT_STRICT = "enable_insert_strict";
|
||||
public static final String ENABLE_SPILLING = "enable_spilling";
|
||||
public static final String ENABLE_EXCHANGE_NODE_PARALLEL_MERGE = "enable_exchange_node_parallel_merge";
|
||||
@ -566,6 +567,9 @@ public class SessionVariable implements Serializable, Writable {
|
||||
@VariableMgr.VarAttr(name = PARALLEL_PIPELINE_TASK_NUM, fuzzy = true)
|
||||
public int parallelPipelineTaskNum = 0;
|
||||
|
||||
@VariableMgr.VarAttr(name = MAX_INSTANCE_NUM)
|
||||
public int maxInstanceNum = 64;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_INSERT_STRICT, needForward = true)
|
||||
public boolean enableInsertStrict = true;
|
||||
|
||||
@ -1476,7 +1480,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public int getParallelExecInstanceNum() {
|
||||
if (enablePipelineEngine && parallelPipelineTaskNum == 0) {
|
||||
int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize();
|
||||
return (size + 1) / 2;
|
||||
int autoInstance = (size + 1) / 2;
|
||||
return Math.min(autoInstance, maxInstanceNum);
|
||||
} else if (enablePipelineEngine) {
|
||||
return parallelPipelineTaskNum;
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user