From 839b4698791e94a998d2269feaa3736c4bb8819e Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Sat, 5 Aug 2023 11:04:39 +0800 Subject: [PATCH] [fix](meta) set parallel_pipeline_task_num when upgrading from 1.2 to 2.0 (#22618) --- .../java/org/apache/doris/catalog/Env.java | 12 +++++++++++ .../java/org/apache/doris/qe/VariableMgr.java | 20 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 50c094beb2..9a286ad98c 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -1365,6 +1365,18 @@ public class Env { editLog.logAddFirstFrontend(self); initLowerCaseTableNames(); + } else { + if (journalVersion <= FeMetaVersion.VERSION_114) { + // if journal version is less than 114, which means it is upgraded from version before 2.0. + // When upgrading from 1.2 to 2.0, we need to make sure that the parallelism of query remain unchanged + // when switch to pipeline engine, otherwise it may impact the load of entire cluster + // because the default parallelism of pipeline engine is higher than previous version. + // so set parallel_pipeline_task_num to parallel_fragment_exec_instance_num + int newVal = VariableMgr.newSessionVariable().parallelExecInstanceNum; + VariableMgr.setGlobalPipelineTask(newVal); + LOG.info("upgrade FE from 1.x to 2.0, set parallel_pipeline_task_num " + + "to parallel_fragment_exec_instance_num: {}", newVal); + } } getPolicyMgr().createDefaultStoragePolicy(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java index 24b7468e77..853e0679e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/VariableMgr.java @@ -367,6 +367,26 @@ public class VariableMgr { } } + public static void setGlobalPipelineTask(int instance) { + wlock.lock(); + try { + VarContext ctx = ctxByVarName.get(SessionVariable.PARALLEL_PIPELINE_TASK_NUM); + try { + setValue(ctx.getObj(), ctx.getField(), String.valueOf(instance)); + } catch (DdlException e) { + LOG.warn("failed to set global variable: {}", SessionVariable.PARALLEL_PIPELINE_TASK_NUM, e); + return; + } + + // write edit log + GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable, + Lists.newArrayList(SessionVariable.PARALLEL_PIPELINE_TASK_NUM)); + Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info); + } finally { + wlock.unlock(); + } + } + public static void setLowerCaseTableNames(int mode) throws DdlException { VarContext ctx = ctxByVarName.get(GlobalVariable.LOWER_CASE_TABLE_NAMES); setGlobalVarAndWriteEditLog(ctx, GlobalVariable.LOWER_CASE_TABLE_NAMES, "" + mode);