[opt](fe) exit FE when transfer to (non)master failed (#34809) (#35158)

bp #34809
This commit is contained in:
Mingyu Chen
2024-05-21 22:31:47 +08:00
committed by GitHub
parent 98f8eb5c43
commit 903ff32021
3 changed files with 157 additions and 141 deletions

View File

@ -1444,137 +1444,147 @@ public class Env {
@SuppressWarnings({"checkstyle:WhitespaceAfter", "checkstyle:LineLength"})
private void transferToMaster() {
// stop replayer
if (replayer != null) {
replayer.exit();
try {
replayer.join();
} catch (InterruptedException e) {
LOG.warn("got exception when stopping the replayer thread", e);
try {
// stop replayer
if (replayer != null) {
replayer.exit();
try {
replayer.join();
} catch (InterruptedException e) {
LOG.warn("got exception when stopping the replayer thread", e);
}
replayer = null;
}
replayer = null;
}
// set this after replay thread stopped. to avoid replay thread modify them.
isReady.set(false);
canRead.set(false);
// set this after replay thread stopped. to avoid replay thread modify them.
isReady.set(false);
canRead.set(false);
toMasterProgress = "open editlog";
editLog.open();
toMasterProgress = "open editlog";
editLog.open();
if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
if (!haProtocol.fencing()) {
LOG.error("fencing failed. will exit.");
System.exit(-1);
}
}
toMasterProgress = "replay journal";
long replayStartTime = System.currentTimeMillis();
// replay journals. -1 means replay all the journals larger than current journal id.
replayJournal(-1);
long replayEndTime = System.currentTimeMillis();
LOG.info("finish replay in " + (replayEndTime - replayStartTime) + " msec");
checkCurrentNodeExist();
checkBeExecVersion();
toMasterProgress = "roll editlog";
editLog.rollEditLog();
// Log meta_version
long journalVersion = MetaContext.get().getMetaVersion();
if (journalVersion < FeConstants.meta_version) {
toMasterProgress = "log meta version";
editLog.logMetaVersion(FeConstants.meta_version);
MetaContext.get().setMetaVersion(FeConstants.meta_version);
}
// Log the first frontend
if (isFirstTimeStartUp) {
// if isFirstTimeStartUp is true, frontends must contains this Node.
Frontend self = frontends.get(nodeName);
Preconditions.checkNotNull(self);
// OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE even if canWrite is false
editLog.logAddFirstFrontend(self);
initLowerCaseTableNames();
// Set initial root password if master FE first time launch.
auth.setInitialRootPassword(Config.initial_root_password);
} else {
if (journalVersion <= FeMetaVersion.VERSION_114) {
// if journal version is less than 114, which means it is upgraded from version before 2.0.
// When upgrading from 1.2 to 2.0, we need to make sure that the parallelism of query remain unchanged
// when switch to pipeline engine, otherwise it may impact the load of entire cluster
// because the default parallelism of pipeline engine is higher than previous version.
// so set parallel_pipeline_task_num to parallel_fragment_exec_instance_num
int newVal = VariableMgr.newSessionVariable().parallelExecInstanceNum;
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.PARALLEL_PIPELINE_TASK_NUM,
String.valueOf(newVal));
// similar reason as above, need to upgrade broadcast scale factor during 1.2 to 2.x
// if the default value has been upgraded
double newBcFactorVal = VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor();
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR,
String.valueOf(newBcFactorVal));
// similar reason as above, need to upgrade enable_nereids_planner to true
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.ENABLE_NEREIDS_PLANNER,
"true");
}
if (journalVersion <= FeMetaVersion.VERSION_123) {
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", SessionVariable.ENABLE_NEREIDS_DML, "true");
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none");
if (VariableMgr.newSessionVariable().nereidsTimeoutSecond == 5) {
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.NEREIDS_TIMEOUT_SECOND, "30");
if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
if (!haProtocol.fencing()) {
LOG.error("fencing failed. will exit.");
System.exit(-1);
}
}
}
getPolicyMgr().createDefaultStoragePolicy();
toMasterProgress = "replay journal";
long replayStartTime = System.currentTimeMillis();
// replay journals. -1 means replay all the journals larger than current journal id.
replayJournal(-1);
long replayEndTime = System.currentTimeMillis();
LOG.info("finish replay in " + (replayEndTime - replayStartTime) + " msec");
// MUST set master ip before starting checkpoint thread.
// because checkpoint thread need this info to select non-master FE to push image
checkCurrentNodeExist();
toMasterProgress = "log master info";
this.masterInfo = new MasterInfo(Env.getCurrentEnv().getSelfNode().getHost(),
Config.http_port,
Config.rpc_port);
editLog.logMasterInfo(masterInfo);
LOG.info("logMasterInfo:{}", masterInfo);
checkBeExecVersion();
// for master, the 'isReady' is set behind.
// but we are sure that all metadata is replayed if we get here.
// so no need to check 'isReady' flag in this method
postProcessAfterMetadataReplayed(false);
toMasterProgress = "roll editlog";
editLog.rollEditLog();
insertOverwriteManager.allTaskFail();
// Log meta_version
long journalVersion = MetaContext.get().getMetaVersion();
if (journalVersion < FeConstants.meta_version) {
toMasterProgress = "log meta version";
editLog.logMetaVersion(FeConstants.meta_version);
MetaContext.get().setMetaVersion(FeConstants.meta_version);
}
toMasterProgress = "start daemon threads";
// Log the first frontend
if (isFirstTimeStartUp) {
// if isFirstTimeStartUp is true, frontends must contains this Node.
Frontend self = frontends.get(nodeName);
Preconditions.checkNotNull(self);
// OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE even if canWrite is false
editLog.logAddFirstFrontend(self);
// start all daemon threads that only running on MASTER FE
startMasterOnlyDaemonThreads();
// start other daemon threads that should running on all FE
startNonMasterDaemonThreads();
initLowerCaseTableNames();
// Set initial root password if master FE first time launch.
auth.setInitialRootPassword(Config.initial_root_password);
} else {
if (journalVersion <= FeMetaVersion.VERSION_114) {
// if journal version is less than 114, which means it is upgraded from version before 2.0.
// When upgrading from 1.2 to 2.0,
// we need to make sure that the parallelism of query remain unchanged
// when switch to pipeline engine, otherwise it may impact the load of entire cluster
// because the default parallelism of pipeline engine is higher than previous version.
// so set parallel_pipeline_task_num to parallel_fragment_exec_instance_num
int newVal = VariableMgr.newSessionVariable().parallelExecInstanceNum;
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.PARALLEL_PIPELINE_TASK_NUM,
String.valueOf(newVal));
MetricRepo.init();
// similar reason as above, need to upgrade broadcast scale factor during 1.2 to 2.x
// if the default value has been upgraded
double newBcFactorVal = VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor();
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR,
String.valueOf(newBcFactorVal));
toMasterProgress = "finished";
canRead.set(true);
isReady.set(true);
checkLowerCaseTableNames();
// similar reason as above, need to upgrade enable_nereids_planner to true
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.ENABLE_NEREIDS_PLANNER,
"true");
}
if (journalVersion <= FeMetaVersion.VERSION_123) {
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", SessionVariable.ENABLE_NEREIDS_DML,
"true");
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none");
if (VariableMgr.newSessionVariable().nereidsTimeoutSecond == 5) {
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.NEREIDS_TIMEOUT_SECOND, "30");
}
}
}
String msg = "master finished to replay journal, can write now.";
Util.stdoutWithTime(msg);
LOG.info(msg);
// for master, there are some new thread pools need to register metric
ThreadPoolManager.registerAllThreadPoolMetric();
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
getPolicyMgr().createDefaultStoragePolicy();
// MUST set master ip before starting checkpoint thread.
// because checkpoint thread need this info to select non-master FE to push image
toMasterProgress = "log master info";
this.masterInfo = new MasterInfo(Env.getCurrentEnv().getSelfNode().getHost(),
Config.http_port,
Config.rpc_port);
editLog.logMasterInfo(masterInfo);
LOG.info("logMasterInfo:{}", masterInfo);
// for master, the 'isReady' is set behind.
// but we are sure that all metadata is replayed if we get here.
// so no need to check 'isReady' flag in this method
postProcessAfterMetadataReplayed(false);
insertOverwriteManager.allTaskFail();
toMasterProgress = "start daemon threads";
// start all daemon threads that only running on MASTER FE
startMasterOnlyDaemonThreads();
// start other daemon threads that should running on all FE
startNonMasterDaemonThreads();
MetricRepo.init();
toMasterProgress = "finished";
canRead.set(true);
isReady.set(true);
checkLowerCaseTableNames();
String msg = "master finished to replay journal, can write now.";
Util.stdoutWithTime(msg);
LOG.info(msg);
// for master, there are some new thread pools need to register metric
ThreadPoolManager.registerAllThreadPoolMetric();
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
}
} catch (Throwable e) {
// When failed to transfer to master, we need to exit the process.
// Otherwise, the process will be in an unknown state.
LOG.error("failed to transfer to master. progress: {}", toMasterProgress, e);
System.exit(-1);
}
}
@ -1722,36 +1732,43 @@ public class Env {
private void transferToNonMaster(FrontendNodeType newType) {
isReady.set(false);
if (feType == FrontendNodeType.OBSERVER || feType == FrontendNodeType.FOLLOWER) {
Preconditions.checkState(newType == FrontendNodeType.UNKNOWN);
LOG.warn("{} to UNKNOWN, still offer read service", feType.name());
// not set canRead here, leave canRead as what is was.
// if meta out of date, canRead will be set to false in replayer thread.
metaReplayState.setTransferToUnknown();
return;
}
try {
if (feType == FrontendNodeType.OBSERVER || feType == FrontendNodeType.FOLLOWER) {
Preconditions.checkState(newType == FrontendNodeType.UNKNOWN);
LOG.warn("{} to UNKNOWN, still offer read service", feType.name());
// not set canRead here, leave canRead as what is was.
// if meta out of date, canRead will be set to false in replayer thread.
metaReplayState.setTransferToUnknown();
return;
}
// transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER
// transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER
if (replayer == null) {
createReplayer();
replayer.start();
}
if (replayer == null) {
createReplayer();
replayer.start();
}
// 'isReady' will be set to true in 'setCanRead()' method
if (!postProcessAfterMetadataReplayed(true)) {
// the state has changed, exit early.
return;
}
// 'isReady' will be set to true in 'setCanRead()' method
if (!postProcessAfterMetadataReplayed(true)) {
// the state has changed, exit early.
return;
}
checkLowerCaseTableNames();
checkLowerCaseTableNames();
startNonMasterDaemonThreads();
startNonMasterDaemonThreads();
MetricRepo.init();
MetricRepo.init();
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
}
} catch (Throwable e) {
// When failed to transfer to non-master, we need to exit the process.
// Otherwise, the process will be in an unknown state.
LOG.error("failed to transfer to non-master.", e);
System.exit(-1);
}
}

View File

@ -44,7 +44,7 @@ public class FeServer {
public void start() throws IOException {
FrontendServiceImpl service = new FrontendServiceImpl(ExecuteEnv.getInstance());
Logger feServiceLogger = LogManager.getLogger(FrontendServiceImpl.class);
Logger feServiceLogger = LogManager.getLogger(FeServer.class);
FrontendService.Iface instance = (FrontendService.Iface) Proxy.newProxyInstance(
FrontendServiceImpl.class.getClassLoader(),
FrontendServiceImpl.class.getInterfaces(),