[improve](env) Improve catalog not ready tips (#27715)

This commit is contained in:
walter
2023-12-01 22:52:43 +08:00
committed by GitHub
parent b74388c3b1
commit d9bbeca431
8 changed files with 87 additions and 15 deletions

View File

@ -281,11 +281,13 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
@ -381,6 +383,7 @@ public class Env {
// canRead can be true even if isReady is false.
// for example: OBSERVER transfer to UNKNOWN, then isReady will be set to false, but canRead can still be true
private AtomicBoolean canRead = new AtomicBoolean(false);
private String toMasterProgress = "";
private BlockingQueue<FrontendNodeType> typeTransferQueue;
// node name is used for bdbje NodeName.
@ -1386,6 +1389,7 @@ public class Env {
isReady.set(false);
canRead.set(false);
toMasterProgress = "open editlog";
editLog.open();
if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
@ -1395,6 +1399,7 @@ public class Env {
}
}
toMasterProgress = "replay journal";
long replayStartTime = System.currentTimeMillis();
// replay journals. -1 means replay all the journals larger than current journal id.
replayJournal(-1);
@ -1405,11 +1410,13 @@ public class Env {
checkBeExecVersion();
toMasterProgress = "roll editlog";
editLog.rollEditLog();
// Log meta_version
long journalVersion = MetaContext.get().getMetaVersion();
if (journalVersion < FeConstants.meta_version) {
toMasterProgress = "log meta version";
editLog.logMetaVersion(FeConstants.meta_version);
MetaContext.get().setMetaVersion(FeConstants.meta_version);
}
@ -1451,6 +1458,7 @@ public class Env {
// MUST set master ip before starting checkpoint thread.
// because checkpoint thread need this info to select non-master FE to push image
toMasterProgress = "log master info";
this.masterInfo = new MasterInfo(Env.getCurrentEnv().getSelfNode().getHost(),
Config.http_port,
Config.rpc_port);
@ -1464,6 +1472,8 @@ public class Env {
// so no need to check 'isReady' flag in this method
postProcessAfterMetadataReplayed(false);
toMasterProgress = "start daemon threads";
// start all daemon threads that only running on MASTER FE
startMasterOnlyDaemonThreads();
// start other daemon threads that should running on all FE
@ -1471,6 +1481,7 @@ public class Env {
MetricRepo.init();
toMasterProgress = "finished";
canRead.set(true);
isReady.set(true);
checkLowerCaseTableNames();
@ -5814,4 +5825,57 @@ public class Env {
alter.setRelation(relation);
this.alter.processAlterMTMV(alter, false);
}
// Ensure the env is ready, otherwise throw an exception.
public void checkReadyOrThrow() throws Exception {
if (isReady()) {
return;
}
StringBuilder sb = new StringBuilder();
sb.append("Node catalog is not ready, please wait for a while. ")
.append("To master progress: " + toMasterProgress + ".\n")
.append("Frontends: \n");
for (String name : frontends.keySet()) {
Frontend frontend = frontends.get(name);
if (name == null) {
continue;
}
sb.append(frontend.toString()).append("\n");
}
if (haProtocol instanceof BDBHA) {
try {
BDBHA ha = (BDBHA) haProtocol;
List<InetSocketAddress> electableNodes = ha.getElectableNodes(true);
if (!electableNodes.isEmpty()) {
sb.append("Electable nodes: \n");
for (InetSocketAddress addr : electableNodes) {
sb.append(addr.toString()).append("\n");
}
}
List<InetSocketAddress> observerNodes = ha.getObserverNodes();
if (!observerNodes.isEmpty()) {
sb.append("Observer nodes: \n");
for (InetSocketAddress addr : electableNodes) {
sb.append(addr.toString()).append("\n");
}
}
} catch (Exception e) {
// pass
}
}
throw new Exception(sb.toString());
}
public void checkReadyOrThrowTException() throws TException {
try {
checkReadyOrThrow();
} catch (Exception e) {
throw new TException(e);
}
}
}

View File

@ -119,9 +119,7 @@ public class RestBaseController extends BaseController {
if (env.isMaster()) {
return null;
}
if (!env.isReady()) {
throw new Exception("Node catalog is not ready, please wait for a while.");
}
env.checkReadyOrThrow();
return redirectTo(request, new TNetworkAddress(env.getMasterHost(), env.getMasterHttpPort()));
}

View File

@ -36,10 +36,12 @@ import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DatabaseNotFoundException;
import com.sleepycat.je.LockMode;
import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.StatsConfig;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;
import com.sleepycat.je.rep.ReplicaWriteException;
import com.sleepycat.je.rep.ReplicatedEnvironment;
import com.sleepycat.je.rep.RollbackException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -518,4 +520,17 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
public BDBEnvironment getBDBEnvironment() {
return this.bdbEnvironment;
}
public String getBDBStats() {
if (bdbEnvironment == null) {
return "";
}
ReplicatedEnvironment repEnv = bdbEnvironment.getReplicatedEnvironment();
if (repEnv == null) {
return "";
}
return repEnv.getRepStats(StatsConfig.DEFAULT).toString();
}
}

View File

@ -122,9 +122,7 @@ public class TokenManager {
private TNetworkAddress getMasterAddress() throws TException {
if (!Env.getCurrentEnv().isReady()) {
throw new TException("Node catalog is not ready, please wait for a while.");
}
Env.getCurrentEnv().checkReadyOrThrowTException();
String masterHost = Env.getCurrentEnv().getMasterHost();
int masterRpcPort = Env.getCurrentEnv().getMasterRpcPort();
return new TNetworkAddress(masterHost, masterRpcPort);

View File

@ -1183,6 +1183,9 @@ public class EditLog {
} catch (Throwable t) {
// Throwable contains all Exception and Error, such as IOException and
// OutOfMemoryError
if (journal instanceof BDBJEJournal) {
LOG.error("BDBJE stats : {}", ((BDBJEJournal) journal).getBDBStats());
}
LOG.error("Fatal Error : write stream Exception", t);
System.exit(-1);
}

View File

@ -45,9 +45,7 @@ public class MasterCatalogExecutor {
}
public void forward(long catalogId, long dbId) throws Exception {
if (!Env.getCurrentEnv().isReady()) {
throw new Exception("Current catalog is not ready, please wait for a while.");
}
Env.getCurrentEnv().checkReadyOrThrow();
String masterHost = Env.getCurrentEnv().getMasterHost();
int masterRpcPort = Env.getCurrentEnv().getMasterRpcPort();
TNetworkAddress thriftAddress = new TNetworkAddress(masterHost, masterRpcPort);

View File

@ -88,9 +88,7 @@ public class MasterOpExecutor {
// Send request to Master
private TMasterOpResult forward(TMasterOpRequest params) throws Exception {
if (!ctx.getEnv().isReady()) {
throw new Exception("Node catalog is not ready, please wait for a while.");
}
ctx.getEnv().checkReadyOrThrow();
String masterHost = ctx.getEnv().getMasterHost();
int masterRpcPort = ctx.getEnv().getMasterRpcPort();
TNetworkAddress thriftAddress = new TNetworkAddress(masterHost, masterRpcPort);

View File

@ -46,9 +46,7 @@ public class MasterTxnExecutor {
}
private TNetworkAddress getMasterAddress() throws TException {
if (!ctx.getEnv().isReady()) {
throw new TException("Node catalog is not ready, please wait for a while.");
}
ctx.getEnv().checkReadyOrThrowTException();
String masterHost = ctx.getEnv().getMasterHost();
int masterRpcPort = ctx.getEnv().getMasterRpcPort();
return new TNetworkAddress(masterHost, masterRpcPort);