diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 0fbc023345..e723f3a759 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -2648,6 +2648,14 @@ public class Env { LOG.warn("replay journal cost too much time: {} replayedJournalId: {}", cost, replayedJournalId); } + if (replayedJournalId.get() != newToJournalId) { + String msg = "replayedJournalId:" + replayedJournalId.get() + " not equal with newToJournalId:" + + newToJournalId + " , will exit"; + LOG.error(msg); + Util.stdoutWithTime(msg); + System.exit(-1); + } + return hasLog; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java index f865bba4a2..dd574ab4d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java @@ -38,6 +38,7 @@ import com.sleepycat.je.OperationStatus; import com.sleepycat.je.rep.InsufficientLogException; import com.sleepycat.je.rep.NetworkRestore; import com.sleepycat.je.rep.NetworkRestoreConfig; +import com.sleepycat.je.rep.ReplicaWriteException; import com.sleepycat.je.rep.RollbackException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -155,6 +156,26 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B } break; } + } catch (ReplicaWriteException e) { + /** + * This exception indicates that an update operation or transaction commit + * or abort was attempted while in the + * {@link ReplicatedEnvironment.State#REPLICA} state. The transaction is marked + * as being invalid. + *

+ * The exception is the result of either an error in the application logic or + * the result of a transition of the node from Master to Replica while a + * transaction was in progress. + *

+ * The application must abort the current transaction and redirect all + * subsequent update operations to the Master. + */ + LOG.error("catch ReplicaWriteException when writing to database, will exit. journal id {}", id, e); + String msg = "write bdb failed. will exit. journalId: " + id + ", bdb database Name: " + + currentJournalDB.getDatabaseName(); + LOG.error(msg); + Util.stdoutWithTime(msg); + System.exit(-1); } catch (DatabaseException e) { LOG.error("catch an exception when writing to database. sleep and retry. journal id {}", id, e); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 4bf92a24ae..e3830eb678 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1131,6 +1131,15 @@ public class FrontendServiceImpl implements FrontendService.Iface { TLoadTxnBeginResult result = new TLoadTxnBeginResult(); TStatus status = new TStatus(TStatusCode.OK); result.setStatus(status); + + if (!Env.getCurrentEnv().isMaster()) { + status.setStatusCode(TStatusCode.NOT_MASTER); + status.addToErrorMsgs(NOT_MASTER_ERR_MSG); + LOG.error("failed to loadTxnBegin:{}, request:{}, backend:{}", + NOT_MASTER_ERR_MSG, request, clientAddr); + return result; + } + try { TLoadTxnBeginResult tmpRes = loadTxnBeginImpl(request, clientAddr); result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId());