[refact](bdbje) Refact BDBEnvironment and BDBJEJournal (#27778)
* Add more ut about "org.apache.doris.journal.bdbje" * Make tiny refactor about "org.apache.doris.journal.bdbje"
This commit is contained in:
@ -152,7 +152,7 @@ public class DorisFE {
|
||||
|
||||
if (Config.enable_bdbje_debug_mode) {
|
||||
// Start in BDB Debug mode
|
||||
BDBDebugger.get().startDebugMode(dorisHomeDir);
|
||||
BDBDebugger.get().startDebugMode(Config.meta_dir + "/bdb");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@ -28,8 +28,10 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
public class BDBStateChangeListener implements StateChangeListener {
|
||||
public static final Logger LOG = LogManager.getLogger(BDBStateChangeListener.class);
|
||||
private final boolean isElectable;
|
||||
|
||||
public BDBStateChangeListener() {
|
||||
public BDBStateChangeListener(boolean isElectable) {
|
||||
this.isElectable = isElectable;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -41,7 +43,7 @@ public class BDBStateChangeListener implements StateChangeListener {
|
||||
break;
|
||||
}
|
||||
case REPLICA: {
|
||||
if (Env.getCurrentEnv().isElectable()) {
|
||||
if (isElectable) {
|
||||
newType = FrontendNodeType.FOLLOWER;
|
||||
} else {
|
||||
newType = FrontendNodeType.OBSERVER;
|
||||
|
||||
@ -58,6 +58,10 @@ public class BDBDebugger {
|
||||
private static final Logger LOG = LogManager.getLogger(BDBDebugger.class);
|
||||
private BDBDebugEnv debugEnv;
|
||||
|
||||
private static class SingletonHolder {
|
||||
private static final BDBDebugger INSTANCE = new BDBDebugger();
|
||||
}
|
||||
|
||||
public static BDBDebugger get() {
|
||||
return SingletonHolder.INSTANCE;
|
||||
}
|
||||
@ -65,10 +69,10 @@ public class BDBDebugger {
|
||||
/**
|
||||
* Start in BDB Debug mode.
|
||||
*/
|
||||
public void startDebugMode(String dorisHomeDir) {
|
||||
public void startDebugMode(String bdbHome) {
|
||||
try {
|
||||
initDebugEnv();
|
||||
startService(dorisHomeDir);
|
||||
initDebugEnv(bdbHome);
|
||||
startService();
|
||||
while (true) {
|
||||
Thread.sleep(2000);
|
||||
}
|
||||
@ -78,8 +82,13 @@ public class BDBDebugger {
|
||||
}
|
||||
}
|
||||
|
||||
private void initDebugEnv(String bdbHome) throws BDBDebugException {
|
||||
debugEnv = new BDBDebugEnv(bdbHome);
|
||||
debugEnv.init();
|
||||
}
|
||||
|
||||
// Only start MySQL and HttpServer
|
||||
private void startService(String dorisHomeDir) throws Exception {
|
||||
private void startService() throws Exception {
|
||||
// HTTP server
|
||||
|
||||
HttpServer httpServer = new HttpServer();
|
||||
@ -94,19 +103,10 @@ public class BDBDebugger {
|
||||
ThreadPoolManager.registerAllThreadPoolMetric();
|
||||
}
|
||||
|
||||
private void initDebugEnv() throws BDBDebugException {
|
||||
debugEnv = new BDBDebugEnv(Config.meta_dir + "/bdb/");
|
||||
debugEnv.init();
|
||||
}
|
||||
|
||||
public BDBDebugEnv getEnv() {
|
||||
return debugEnv;
|
||||
}
|
||||
|
||||
private static class SingletonHolder {
|
||||
private static final BDBDebugger INSTANCE = new BDBDebugger();
|
||||
}
|
||||
|
||||
/**
|
||||
* A wrapper class of the BDBJE environment, used to obtain information in bdbje.
|
||||
*/
|
||||
@ -144,7 +144,9 @@ public class BDBDebugger {
|
||||
dbConfig.setAllowCreate(false);
|
||||
dbConfig.setReadOnly(true);
|
||||
Database db = env.openDatabase(null, dbName, dbConfig);
|
||||
return db.count();
|
||||
long journalNumber = db.count();
|
||||
db.close();
|
||||
return journalNumber;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -172,6 +174,8 @@ public class BDBDebugger {
|
||||
Long id = idBinding.entryToObject(key);
|
||||
journalIds.add(id);
|
||||
}
|
||||
cursor.close();
|
||||
db.close();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("failed to get journal ids of {}", dbName, e);
|
||||
throw new BDBDebugException("failed to get journal ids of database " + dbName, e);
|
||||
@ -205,6 +209,7 @@ public class BDBDebugger {
|
||||
|
||||
// get the journal
|
||||
OperationStatus status = db.get(null, key, value, LockMode.READ_COMMITTED);
|
||||
db.close();
|
||||
if (status == OperationStatus.SUCCESS) {
|
||||
byte[] retData = value.getData();
|
||||
DataInputStream in = new DataInputStream(new ByteArrayInputStream(retData));
|
||||
@ -223,6 +228,14 @@ public class BDBDebugger {
|
||||
MetaContext.remove();
|
||||
return entityWrapper;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
try {
|
||||
env.close();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("exception:", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class JournalEntityWrapper {
|
||||
|
||||
@ -19,7 +19,6 @@ package org.apache.doris.journal.bdbje;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.ha.BDBHA;
|
||||
import org.apache.doris.ha.BDBStateChangeListener;
|
||||
import org.apache.doris.ha.FrontendNodeType;
|
||||
@ -72,10 +71,8 @@ public class BDBEnvironment {
|
||||
private static final int MEMORY_CACHE_PERCENT = 20;
|
||||
private static final List<String> BDBJE_LOG_LEVEL = ImmutableList.of("OFF", "SEVERE", "WARNING",
|
||||
"INFO", "CONFIG", "FINE", "FINER", "FINEST", "ALL");
|
||||
|
||||
public static final String PALO_JOURNAL_GROUP = "PALO_JOURNAL_GROUP";
|
||||
|
||||
|
||||
private ReplicatedEnvironment replicatedEnvironment;
|
||||
private EnvironmentConfig environmentConfig;
|
||||
private ReplicationConfig replicationConfig;
|
||||
@ -84,15 +81,19 @@ public class BDBEnvironment {
|
||||
private ReentrantReadWriteLock lock;
|
||||
private List<Database> openedDatabases;
|
||||
|
||||
public BDBEnvironment() {
|
||||
private final boolean isElectable;
|
||||
private final boolean metadataFailureRecovery;
|
||||
|
||||
public BDBEnvironment(boolean isElectable, boolean metadataFailureRecovery) {
|
||||
openedDatabases = new ArrayList<Database>();
|
||||
this.lock = new ReentrantReadWriteLock(true);
|
||||
this.isElectable = isElectable;
|
||||
this.metadataFailureRecovery = metadataFailureRecovery;
|
||||
}
|
||||
|
||||
// The setup() method opens the environment and database
|
||||
public void setup(File envHome, String selfNodeName, String selfNodeHostPort,
|
||||
String helperHostPort, boolean isElectable) {
|
||||
boolean metadataFailureRecovery = null != System.getProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY);
|
||||
String helperHostPort) {
|
||||
// Almost never used, just in case the master can not restart
|
||||
if (metadataFailureRecovery) {
|
||||
if (!isElectable) {
|
||||
@ -112,7 +113,7 @@ public class BDBEnvironment {
|
||||
replicationConfig.setNodeHostPort(selfNodeHostPort);
|
||||
replicationConfig.setHelperHosts(helperHostPort);
|
||||
replicationConfig.setGroupName(PALO_JOURNAL_GROUP);
|
||||
replicationConfig.setConfigParam(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "10");
|
||||
replicationConfig.setConfigParam(ReplicationConfig.ENV_UNKNOWN_STATE_TIMEOUT, "10 s");
|
||||
replicationConfig.setMaxClockDelta(Config.max_bdbje_clock_delta_ms, TimeUnit.MILLISECONDS);
|
||||
replicationConfig.setConfigParam(ReplicationConfig.TXN_ROLLBACK_LIMIT,
|
||||
String.valueOf(Config.txn_rollback_limit));
|
||||
@ -170,6 +171,9 @@ public class BDBEnvironment {
|
||||
// open environment and epochDB
|
||||
for (int i = 0; i < RETRY_TIME; i++) {
|
||||
try {
|
||||
if (replicatedEnvironment != null) {
|
||||
this.close();
|
||||
}
|
||||
// open the environment
|
||||
replicatedEnvironment = new ReplicatedEnvironment(envHome, replicationConfig, environmentConfig);
|
||||
|
||||
@ -178,12 +182,13 @@ public class BDBEnvironment {
|
||||
Env.getCurrentEnv().setHaProtocol(protocol);
|
||||
|
||||
// start state change listener
|
||||
StateChangeListener listener = new BDBStateChangeListener();
|
||||
StateChangeListener listener = new BDBStateChangeListener(isElectable);
|
||||
replicatedEnvironment.setStateChangeListener(listener);
|
||||
// open epochDB. the first parameter null means auto-commit
|
||||
epochDB = replicatedEnvironment.openDatabase(null, "epochDB", dbConfig);
|
||||
break;
|
||||
} catch (InsufficientLogException insufficientLogEx) {
|
||||
LOG.info("i:{} insufficientLogEx:", i, insufficientLogEx);
|
||||
NetworkRestore restore = new NetworkRestore();
|
||||
NetworkRestoreConfig config = new NetworkRestoreConfig();
|
||||
config.setRetainLogFiles(false); // delete obsolete log files.
|
||||
@ -193,6 +198,7 @@ public class BDBEnvironment {
|
||||
// default selection of providers is not suitable.
|
||||
restore.execute(insufficientLogEx, config);
|
||||
} catch (DatabaseException e) {
|
||||
LOG.info("i:{} exception:", i, e);
|
||||
if (i < RETRY_TIME - 1) {
|
||||
try {
|
||||
Thread.sleep(5 * 1000);
|
||||
@ -381,6 +387,7 @@ public class BDBEnvironment {
|
||||
if (epochDB != null) {
|
||||
try {
|
||||
epochDB.close();
|
||||
epochDB = null;
|
||||
} catch (DatabaseException exception) {
|
||||
LOG.error("Error closing db {} will exit", epochDB.getDatabaseName(), exception);
|
||||
}
|
||||
@ -390,19 +397,7 @@ public class BDBEnvironment {
|
||||
try {
|
||||
// Finally, close the store and environment.
|
||||
replicatedEnvironment.close();
|
||||
} catch (DatabaseException exception) {
|
||||
LOG.error("Error closing replicatedEnvironment", exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Close environment
|
||||
public void closeReplicatedEnvironment() {
|
||||
if (replicatedEnvironment != null) {
|
||||
try {
|
||||
openedDatabases.clear();
|
||||
// Finally, close the store and environment.
|
||||
replicatedEnvironment.close();
|
||||
replicatedEnvironment = null;
|
||||
} catch (DatabaseException exception) {
|
||||
LOG.error("Error closing replicatedEnvironment", exception);
|
||||
}
|
||||
@ -413,18 +408,22 @@ public class BDBEnvironment {
|
||||
public void openReplicatedEnvironment(File envHome) {
|
||||
for (int i = 0; i < RETRY_TIME; i++) {
|
||||
try {
|
||||
if (replicatedEnvironment != null) {
|
||||
this.close();
|
||||
}
|
||||
// open the environment
|
||||
replicatedEnvironment =
|
||||
new ReplicatedEnvironment(envHome, replicationConfig, environmentConfig);
|
||||
|
||||
// start state change listener
|
||||
StateChangeListener listener = new BDBStateChangeListener();
|
||||
StateChangeListener listener = new BDBStateChangeListener(isElectable);
|
||||
replicatedEnvironment.setStateChangeListener(listener);
|
||||
|
||||
// open epochDB. the first parameter null means auto-commit
|
||||
epochDB = replicatedEnvironment.openDatabase(null, "epochDB", dbConfig);
|
||||
break;
|
||||
} catch (DatabaseException e) {
|
||||
LOG.info("i:{} exception:", i, e);
|
||||
if (i < RETRY_TIME - 1) {
|
||||
try {
|
||||
Thread.sleep(5 * 1000);
|
||||
@ -439,7 +438,7 @@ public class BDBEnvironment {
|
||||
}
|
||||
}
|
||||
|
||||
private SyncPolicy getSyncPolicy(String policy) {
|
||||
private static SyncPolicy getSyncPolicy(String policy) {
|
||||
if (policy.equalsIgnoreCase("SYNC")) {
|
||||
return Durability.SyncPolicy.SYNC;
|
||||
}
|
||||
@ -450,7 +449,7 @@ public class BDBEnvironment {
|
||||
return Durability.SyncPolicy.WRITE_NO_SYNC;
|
||||
}
|
||||
|
||||
private ReplicaAckPolicy getAckPolicy(String policy) {
|
||||
private static ReplicaAckPolicy getAckPolicy(String policy) {
|
||||
if (policy.equalsIgnoreCase("ALL")) {
|
||||
return Durability.ReplicaAckPolicy.ALL;
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.journal.bdbje;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.io.DataOutputBuffer;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.NetUtils;
|
||||
@ -75,14 +76,6 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
|
||||
private AtomicLong nextJournalId = new AtomicLong(1);
|
||||
|
||||
public BDBJEJournal(String nodeName) {
|
||||
initBDBEnv(nodeName);
|
||||
}
|
||||
|
||||
/*
|
||||
* Initialize bdb environment.
|
||||
* node name is ip_port (the port is edit_log_port)
|
||||
*/
|
||||
private void initBDBEnv(String nodeName) {
|
||||
environmentPath = Env.getServingEnv().getBdbDir();
|
||||
HostInfo selfNode = Env.getServingEnv().getSelfNode();
|
||||
selfNodeName = nodeName;
|
||||
@ -327,13 +320,14 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
|
||||
public synchronized void open() {
|
||||
if (bdbEnvironment == null) {
|
||||
File dbEnv = new File(environmentPath);
|
||||
bdbEnvironment = new BDBEnvironment();
|
||||
|
||||
boolean metadataFailureRecovery = null != System.getProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY);
|
||||
bdbEnvironment = new BDBEnvironment(Env.getServingEnv().isElectable(), metadataFailureRecovery);
|
||||
|
||||
HostInfo helperNode = Env.getServingEnv().getHelperNode();
|
||||
String helperHostPort = NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), helperNode.getPort());
|
||||
try {
|
||||
bdbEnvironment.setup(dbEnv, selfNodeName, selfNodeHostPort, helperHostPort,
|
||||
Env.getServingEnv().isElectable());
|
||||
bdbEnvironment.setup(dbEnv, selfNodeName, selfNodeHostPort, helperHostPort);
|
||||
} catch (Exception e) {
|
||||
if (e instanceof DatabaseNotFoundException) {
|
||||
LOG.error("It is not allowed to set metadata_failure_recovery"
|
||||
@ -380,7 +374,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
|
||||
reSetupBdbEnvironment(insufficientLogEx);
|
||||
} catch (RollbackException rollbackEx) {
|
||||
LOG.warn("catch rollback log exception. will reopen the ReplicatedEnvironment.", rollbackEx);
|
||||
bdbEnvironment.closeReplicatedEnvironment();
|
||||
bdbEnvironment.close();
|
||||
bdbEnvironment.openReplicatedEnvironment(new File(environmentPath));
|
||||
}
|
||||
}
|
||||
@ -414,8 +408,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
|
||||
|
||||
bdbEnvironment.close();
|
||||
bdbEnvironment.setup(new File(environmentPath), selfNodeName, selfNodeHostPort,
|
||||
NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), helperNode.getPort()),
|
||||
Env.getServingEnv().isElectable());
|
||||
NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), helperNode.getPort()));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -506,7 +499,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B
|
||||
} catch (RollbackException rollbackEx) {
|
||||
if (!Env.isCheckpointThread()) {
|
||||
LOG.warn("catch rollback log exception. will reopen the ReplicatedEnvironment.", rollbackEx);
|
||||
bdbEnvironment.closeReplicatedEnvironment();
|
||||
bdbEnvironment.close();
|
||||
bdbEnvironment.openReplicatedEnvironment(new File(environmentPath));
|
||||
} else {
|
||||
throw rollbackEx;
|
||||
|
||||
Reference in New Issue
Block a user