[fix](meta) set parallel_pipeline_task_num when upgrading from 1.2 to 2.0 (#22618)
This commit is contained in:
@ -1365,6 +1365,18 @@ public class Env {
|
||||
editLog.logAddFirstFrontend(self);
|
||||
|
||||
initLowerCaseTableNames();
|
||||
} else {
|
||||
if (journalVersion <= FeMetaVersion.VERSION_114) {
|
||||
// if journal version is less than 114, which means it is upgraded from version before 2.0.
|
||||
// When upgrading from 1.2 to 2.0, we need to make sure that the parallelism of query remain unchanged
|
||||
// when switch to pipeline engine, otherwise it may impact the load of entire cluster
|
||||
// because the default parallelism of pipeline engine is higher than previous version.
|
||||
// so set parallel_pipeline_task_num to parallel_fragment_exec_instance_num
|
||||
int newVal = VariableMgr.newSessionVariable().parallelExecInstanceNum;
|
||||
VariableMgr.setGlobalPipelineTask(newVal);
|
||||
LOG.info("upgrade FE from 1.x to 2.0, set parallel_pipeline_task_num "
|
||||
+ "to parallel_fragment_exec_instance_num: {}", newVal);
|
||||
}
|
||||
}
|
||||
|
||||
getPolicyMgr().createDefaultStoragePolicy();
|
||||
|
||||
@ -367,6 +367,26 @@ public class VariableMgr {
|
||||
}
|
||||
}
|
||||
|
||||
public static void setGlobalPipelineTask(int instance) {
|
||||
wlock.lock();
|
||||
try {
|
||||
VarContext ctx = ctxByVarName.get(SessionVariable.PARALLEL_PIPELINE_TASK_NUM);
|
||||
try {
|
||||
setValue(ctx.getObj(), ctx.getField(), String.valueOf(instance));
|
||||
} catch (DdlException e) {
|
||||
LOG.warn("failed to set global variable: {}", SessionVariable.PARALLEL_PIPELINE_TASK_NUM, e);
|
||||
return;
|
||||
}
|
||||
|
||||
// write edit log
|
||||
GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable,
|
||||
Lists.newArrayList(SessionVariable.PARALLEL_PIPELINE_TASK_NUM));
|
||||
Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
|
||||
} finally {
|
||||
wlock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public static void setLowerCaseTableNames(int mode) throws DdlException {
|
||||
VarContext ctx = ctxByVarName.get(GlobalVariable.LOWER_CASE_TABLE_NAMES);
|
||||
setGlobalVarAndWriteEditLog(ctx, GlobalVariable.LOWER_CASE_TABLE_NAMES, "" + mode);
|
||||
|
||||
Reference in New Issue
Block a user