[fix](env) state listener avoid endless waiting (#27881)
This commit is contained in:
@ -962,8 +962,7 @@ public class Env {
|
||||
createTxnCleaner();
|
||||
|
||||
// 6. start state listener thread
|
||||
createStateListener();
|
||||
listener.start();
|
||||
startStateListener();
|
||||
|
||||
// 7. create fe disk updater
|
||||
createFeDiskUpdater();
|
||||
@ -1504,11 +1503,22 @@ public class Env {
|
||||
* 2. register some hook.
|
||||
* If there is, add them here.
|
||||
*/
|
||||
public void postProcessAfterMetadataReplayed(boolean waitCatalogReady) {
|
||||
public boolean postProcessAfterMetadataReplayed(boolean waitCatalogReady) {
|
||||
if (waitCatalogReady) {
|
||||
while (!isReady()) {
|
||||
// Avoid endless waiting if the state has changed.
|
||||
//
|
||||
// Consider the following situation:
|
||||
// 1. The follower replay journals and is not set to ready because the synchronization internval
|
||||
// exceeds meta delay toleration seconds.
|
||||
// 2. The underlying BEBJE node of this follower is selected as the master, but the state listener
|
||||
// thread is waiting for catalog ready.
|
||||
if (typeTransferQueue.peek() != null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
try {
|
||||
Thread.sleep(10 * 1000);
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("", e);
|
||||
}
|
||||
@ -1517,6 +1527,7 @@ public class Env {
|
||||
|
||||
auth.rectifyPrivs();
|
||||
catalogMgr.registerCatalogRefreshListener(this);
|
||||
return true;
|
||||
}
|
||||
|
||||
// start all daemon threads only running on Master
|
||||
@ -1630,7 +1641,10 @@ public class Env {
|
||||
}
|
||||
|
||||
// 'isReady' will be set to true in 'setCanRead()' method
|
||||
postProcessAfterMetadataReplayed(true);
|
||||
if (!postProcessAfterMetadataReplayed(true)) {
|
||||
// the state has changed, exit early.
|
||||
return;
|
||||
}
|
||||
|
||||
checkLowerCaseTableNames();
|
||||
|
||||
@ -2483,7 +2497,7 @@ public class Env {
|
||||
}
|
||||
}
|
||||
|
||||
public void createStateListener() {
|
||||
public void startStateListener() {
|
||||
listener = new Daemon("stateListener", STATE_CHANGE_CHECK_INTERVAL_MS) {
|
||||
@Override
|
||||
protected synchronized void runOneCycle() {
|
||||
@ -2591,6 +2605,7 @@ public class Env {
|
||||
};
|
||||
|
||||
listener.setMetaContext(metaContext);
|
||||
listener.start();
|
||||
}
|
||||
|
||||
public synchronized boolean replayJournal(long toJournalId) {
|
||||
|
||||
Reference in New Issue
Block a user