[exec](pipeline) revert FE pipeline instance num pr (#22617)

* Revert "[fix](executor) only mysql connect to set GlobalPipelineTask (#22205)"
* Revert "[feature](executor) using fe version to set instance_num (#22047)"
This commit is contained in:
HappenLee
2023-08-04 19:07:14 +08:00
committed by GitHub
parent d974af5feb
commit 872280135d
5 changed files with 2 additions and 100 deletions

View File

@ -99,7 +99,6 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase;
import org.apache.doris.common.ConfigException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.EnvUtils;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
@ -108,7 +107,6 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.Daemon;
@ -297,13 +295,10 @@ public class Env {
public static final String CLIENT_NODE_HOST_KEY = "CLIENT_NODE_HOST";
public static final String CLIENT_NODE_PORT_KEY = "CLIENT_NODE_PORT";
private static final String VERSION_DIR = "/VERSION";
private String latestFeVersion;
private String previousFeVersion;
private String metaDir;
private String bdbDir;
private String imageDir;
private String versionDir;
private MetaContext metaContext;
private long epoch = 0;
@ -868,7 +863,6 @@ public class Env {
this.metaDir = Config.meta_dir;
this.bdbDir = this.metaDir + BDB_DIR;
this.imageDir = this.metaDir + IMAGE_DIR;
this.versionDir = EnvUtils.getDorisHome() + VERSION_DIR;
// 0. get local node and helper node info
getSelfHostPort();
@ -888,21 +882,12 @@ public class Env {
bdbDir.mkdirs();
}
}
File imageDir = new File(this.imageDir);
if (!imageDir.exists()) {
imageDir.mkdirs();
}
File verDir = new File(this.versionDir);
if (!verDir.exists()) {
verDir.mkdirs();
}
// init plugin manager
initVersionInfo();
pluginMgr.init();
auditEventProcessor.start();
@ -5456,64 +5441,6 @@ public class Env {
}
}
public void writeVersionFile(String version, int seq) {
String versionName = versionDir + "/" + version + "-commitid-" + seq + "-version";
File versionFile = new File(versionName);
try {
versionFile.createNewFile();
} catch (Exception e) {
LOG.error(e.toString());
}
}
public boolean isMajorVersionUpgrade() {
if (previousFeVersion == null) {
// There are two possible scenarios when there is no 'previousFeVersion':
// If 'image' is empty, it indicates a completely new FE.
// If 'image' is not empty, it means an upgrade from a lower version.
File imageDir = new File(this.imageDir);
File[] files = imageDir.listFiles();
if (files == null || files.length == 0) {
return false;
}
return true;
}
return previousFeVersion.charAt(0) != latestFeVersion.charAt(0);
}
private void initVersionInfo() {
latestFeVersion = Version.DORIS_BUILD_VERSION_MAJOR + "_" + Version.DORIS_BUILD_VERSION_MINOR + "_"
+ Version.DORIS_BUILD_VERSION_PATCH;
File folder = new File(versionDir);
File[] files = folder.listFiles();
int previousSeq = 0;
if (files != null) {
// Every part meaning (2_0_0-commitid-1-version)
// [version] - [commitid] - [seq]
// 'VersionFile' can be transformed like this.
// 2_0_0-commitid-1-version -> 2_1_0-commitid-2-version ->
// 2_3_0-commitid-3-version -> 2_0_0-commitid-4-version
// You can observe the process of FE upgrades through these files.
for (File file : files) {
String[] splitArr = file.getName().split("-");
String version = splitArr[0];
int seq = Integer.parseInt(splitArr[2]);
if (seq > previousSeq) {
previousSeq = seq;
previousFeVersion = version;
}
}
}
if (previousFeVersion == null) {
writeVersionFile(latestFeVersion, 1);
} else if (!previousFeVersion.equals(latestFeVersion)) {
writeVersionFile(latestFeVersion, previousSeq + 1);
}
if (isMajorVersionUpgrade()) {
ConnectContext.isMajorVersionUpgrade = true;
}
}
public int getFollowerCount() {
int count = 0;
for (Frontend fe : frontends.values()) {

View File

@ -68,7 +68,7 @@ import java.util.Set;
public class ConnectContext {
private static final Logger LOG = LogManager.getLogger(ConnectContext.class);
protected static ThreadLocal<ConnectContext> threadLocalInfo = new ThreadLocal<>();
public static boolean isMajorVersionUpgrade = false;
private static final String SSL_PROTOCOL = "TLS";
// set this id before analyze
@ -263,10 +263,6 @@ public class ConnectContext {
mysqlChannel = new DummyMysqlChannel();
}
sessionVariable = VariableMgr.newSessionVariable();
if (connection != null && isMajorVersionUpgrade) {
VariableMgr.setGlobalPipelineTask(sessionVariable.parallelExecInstanceNum);
sessionVariable = VariableMgr.newSessionVariable();
}
command = MysqlCommand.COM_SLEEP;
if (Config.use_fuzzy_session_variable) {
sessionVariable.initFuzzyModeVariables();

View File

@ -367,25 +367,6 @@ public class VariableMgr {
}
}
public static void setGlobalPipelineTask(int instance) {
wlock.lock();
try {
String name = "parallel_pipeline_task_num";
String value = instance + "";
VarContext ctx = ctxByVarName.get(name);
try {
setValue(ctx.getObj(), ctx.getField(), value);
} catch (DdlException e) {
LOG.error(e.toString());
}
// write edit log
GlobalVarPersistInfo info = new GlobalVarPersistInfo(defaultSessionVariable, Lists.newArrayList(name));
Env.getCurrentEnv().getEditLog().logGlobalVariableV2(info);
} finally {
wlock.unlock();
}
}
public static void setLowerCaseTableNames(int mode) throws DdlException {
VarContext ctx = ctxByVarName.get(GlobalVariable.LOWER_CASE_TABLE_NAMES);
setGlobalVarAndWriteEditLog(ctx, GlobalVariable.LOWER_CASE_TABLE_NAMES, "" + mode);