From eaa35649bc1d6eeed50c5bab2d8e5cac308dd278 Mon Sep 17 00:00:00 2001 From: Lei Zhang <27994433+SWJTU-ZhangLei@users.noreply.github.com> Date: Thu, 14 Sep 2023 17:49:28 +0800 Subject: [PATCH] [fix](bdbje) handle `ReplicaWriteException` in `BDBJEJournal.write` (#24259) * When BDBJEJournal.write meet `ReplicaWriteException`, we should not retry. Because at the monment the bdbje node state is `REPLICA` (not `MASTER`) if we still retry write, at the same time trigger election, the orgin `REPLICA` node may transfer to `MASTER` and will cause incorrect journalId Co-authored-by: yiguolei <676222867@qq.com> --- .../java/org/apache/doris/catalog/Env.java | 8 +++++++ .../doris/journal/bdbje/BDBJEJournal.java | 21 +++++++++++++++++++ .../doris/service/FrontendServiceImpl.java | 9 ++++++++ 3 files changed, 38 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 0fbc023345..e723f3a759 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -2648,6 +2648,14 @@ public class Env { LOG.warn("replay journal cost too much time: {} replayedJournalId: {}", cost, replayedJournalId); } + if (replayedJournalId.get() != newToJournalId) { + String msg = "replayedJournalId:" + replayedJournalId.get() + " not equal with newToJournalId:" + + newToJournalId + " , will exit"; + LOG.error(msg); + Util.stdoutWithTime(msg); + System.exit(-1); + } + return hasLog; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java index f865bba4a2..dd574ab4d6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java @@ -38,6 +38,7 @@ import com.sleepycat.je.OperationStatus; import com.sleepycat.je.rep.InsufficientLogException; import com.sleepycat.je.rep.NetworkRestore; import com.sleepycat.je.rep.NetworkRestoreConfig; +import com.sleepycat.je.rep.ReplicaWriteException; import com.sleepycat.je.rep.RollbackException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -155,6 +156,26 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B } break; } + } catch (ReplicaWriteException e) { + /** + * This exception indicates that an update operation or transaction commit + * or abort was attempted while in the + * {@link ReplicatedEnvironment.State#REPLICA} state. The transaction is marked + * as being invalid. + *
+ * The exception is the result of either an error in the application logic or + * the result of a transition of the node from Master to Replica while a + * transaction was in progress. + *
+ * The application must abort the current transaction and redirect all + * subsequent update operations to the Master. + */ + LOG.error("catch ReplicaWriteException when writing to database, will exit. journal id {}", id, e); + String msg = "write bdb failed. will exit. journalId: " + id + ", bdb database Name: " + + currentJournalDB.getDatabaseName(); + LOG.error(msg); + Util.stdoutWithTime(msg); + System.exit(-1); } catch (DatabaseException e) { LOG.error("catch an exception when writing to database. sleep and retry. journal id {}", id, e); try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 4bf92a24ae..e3830eb678 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1131,6 +1131,15 @@ public class FrontendServiceImpl implements FrontendService.Iface { TLoadTxnBeginResult result = new TLoadTxnBeginResult(); TStatus status = new TStatus(TStatusCode.OK); result.setStatus(status); + + if (!Env.getCurrentEnv().isMaster()) { + status.setStatusCode(TStatusCode.NOT_MASTER); + status.addToErrorMsgs(NOT_MASTER_ERR_MSG); + LOG.error("failed to loadTxnBegin:{}, request:{}, backend:{}", + NOT_MASTER_ERR_MSG, request, clientAddr); + return result; + } + try { TLoadTxnBeginResult tmpRes = loadTxnBeginImpl(request, clientAddr); result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId());