diff --git a/src/logservice/ob_log_handler.cpp b/src/logservice/ob_log_handler.cpp index 957d16dc4a..e5cfc2bcd8 100755 --- a/src/logservice/ob_log_handler.cpp +++ b/src/logservice/ob_log_handler.cpp @@ -290,6 +290,28 @@ int ObLogHandler::get_access_mode(int64_t &mode_version, palf::AccessMode &acces return ret; } +int ObLogHandler::get_append_mode_initial_scn(share::SCN &ref_scn) const +{ + int ret = OB_SUCCESS; + int64_t mode_version = INVALID_PROPOSAL_ID; + AccessMode access_mode = AccessMode::INVALID_ACCESS_MODE; + share::SCN curr_ref_scn; + ref_scn.reset(); + RLockGuard guard(lock_); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + } else if (is_in_stop_state_) { + ret = OB_NOT_RUNNING; + } else if (OB_FAIL(palf_handle_.get_access_mode_ref_scn(mode_version, access_mode, curr_ref_scn))) { + CLOG_LOG(WARN, "get_access_mode_ref_scn failed", K(ret), K_(id)); + } else if (AccessMode::APPEND == access_mode) { + ref_scn = curr_ref_scn; + } else { + ret = OB_STATE_NOT_MATCH; + } + return ret; +} + int ObLogHandler::change_access_mode(const int64_t mode_version, const palf::AccessMode &access_mode, const SCN &ref_scn) diff --git a/src/logservice/ob_log_handler.h b/src/logservice/ob_log_handler.h index c2e912c9ce..89b4b017a4 100755 --- a/src/logservice/ob_log_handler.h +++ b/src/logservice/ob_log_handler.h @@ -82,6 +82,7 @@ public: const palf::AccessMode &access_mode, const share::SCN &ref_scn) = 0; virtual int get_access_mode(int64_t &mode_version, palf::AccessMode &access_mode) const = 0; + virtual int get_append_mode_initial_scn(SCN &ref_scn) const = 0; virtual int seek(const palf::LSN &lsn, palf::PalfBufferIterator &iter) = 0; virtual int seek(const palf::LSN &lsn, palf::PalfGroupBufferIterator &iter) = 0; virtual int seek(const share::SCN &scn, palf::PalfGroupBufferIterator &iter) = 0; @@ -245,6 +246,11 @@ public: // @retval // OB_SUCCESS int get_access_mode(int64_t &mode_version, palf::AccessMode &access_mode) const override final; + // @description: get ref_scn of APPEND mode + // @return + // - OB_SUCCESS + // - OB_STATE_NOT_MATCH: current access mode is not APPEND + int get_append_mode_initial_scn(share::SCN &initial_scn) const override final; // @brief change AccessMode of palf. // @param[in] const int64_t &mode_version: mode_version corresponding to AccessMode, // can be gotted by get_access_mode diff --git a/src/logservice/palf/log_mode_mgr.cpp b/src/logservice/palf/log_mode_mgr.cpp index 1403495d7d..2a953ecc9d 100644 --- a/src/logservice/palf/log_mode_mgr.cpp +++ b/src/logservice/palf/log_mode_mgr.cpp @@ -146,7 +146,9 @@ int LogModeMgr::get_access_mode(int64_t &mode_version, AccessMode &access_mode) return ret; } -int LogModeMgr::get_ref_scn(int64_t &mode_version, SCN &ref_scn) const +int LogModeMgr::get_access_mode_ref_scn(int64_t &mode_version, + AccessMode &access_mode, + SCN &ref_scn) const { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -154,6 +156,7 @@ int LogModeMgr::get_ref_scn(int64_t &mode_version, SCN &ref_scn) const PALF_LOG(WARN, "LogModeMgr has inited", K(ret)); } else { mode_version = applied_mode_meta_.mode_version_; + access_mode = applied_mode_meta_.access_mode_; ref_scn = applied_mode_meta_.ref_scn_; } return ret; diff --git a/src/logservice/palf/log_mode_mgr.h b/src/logservice/palf/log_mode_mgr.h index eb24289fab..5107c640e5 100644 --- a/src/logservice/palf/log_mode_mgr.h +++ b/src/logservice/palf/log_mode_mgr.h @@ -58,7 +58,9 @@ public: virtual int get_access_mode(AccessMode &access_mode) const; virtual int get_mode_version(int64_t &mode_version) const; virtual int get_access_mode(int64_t &mode_version, AccessMode &access_mode) const; - virtual int get_ref_scn(int64_t &mode_version, share::SCN &ref_scn) const; + virtual int get_access_mode_ref_scn(int64_t &mode_version, + AccessMode &access_mode, + share::SCN &ref_scn) const; bool can_append() const; bool can_raw_write() const; bool can_receive_log() const; diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index 02ee79fbc0..b06ea053e4 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -2551,22 +2551,17 @@ int LogSlidingWindow::to_leader_active() // Resize group_buffer int ret = OB_SUCCESS; SCN ref_scn; - int64_t mode_version1 = INVALID_PROPOSAL_ID, mode_version2 = INVALID_PROPOSAL_ID; + int64_t mode_version = INVALID_PROPOSAL_ID; AccessMode access_mode = AccessMode::INVALID_ACCESS_MODE; const int64_t curr_proposal_id = state_mgr_->get_proposal_id(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; - } else if (OB_FAIL(mode_mgr_->get_ref_scn(mode_version1, ref_scn))) { - PALF_LOG(INFO, "get_ref_scn failed", K(ret), K_(palf_id), K_(self)); - } else if (OB_FAIL(mode_mgr_->get_access_mode(mode_version2, access_mode))) { - PALF_LOG(INFO, "get_ref_scn failed", K(ret), K_(palf_id), K_(self)); - } else if (mode_version1 != mode_version2) { - ret = OB_ERR_UNEXPECTED; - PALF_LOG(ERROR, "mode_version has been changed", K(ret), K(mode_version1), K(mode_version2)); - } else if (curr_proposal_id < mode_version1) { + } else if (OB_FAIL(mode_mgr_->get_access_mode_ref_scn(mode_version, access_mode, ref_scn))) { + PALF_LOG(INFO, "get_access_mode_ref_scn failed", K(ret), K_(palf_id), K_(self)); + } else if (curr_proposal_id < mode_version) { ret = OB_ERR_UNEXPECTED; PALF_LOG(ERROR, "curr_proposal_id is less than proposal_id in ModeMeta", K(ret), - K_(palf_id), K_(self), K(mode_version1), K(curr_proposal_id)); + K_(palf_id), K_(self), K(mode_version), K(curr_proposal_id)); } else if (!is_all_log_flushed_()) { ret = OB_EAGAIN; PALF_LOG(WARN, "to_leader_active need retry, because there is some log has not been flushed", K(ret), diff --git a/src/logservice/palf/palf_handle.cpp b/src/logservice/palf/palf_handle.cpp index c89615b9d7..154efb43e8 100755 --- a/src/logservice/palf/palf_handle.cpp +++ b/src/logservice/palf/palf_handle.cpp @@ -495,6 +495,16 @@ int PalfHandle::get_access_mode(AccessMode &access_mode) const return ret; } +int PalfHandle::get_access_mode_ref_scn(int64_t &mode_version, + AccessMode &access_mode, + SCN &ref_scn) const +{ + int ret = OB_SUCCESS; + CHECK_VALID; + ret = palf_handle_impl_->get_access_mode_ref_scn(mode_version, access_mode, ref_scn); + return ret; +} + int PalfHandle::disable_vote(const bool need_check_log_missing) { int ret = OB_SUCCESS; diff --git a/src/logservice/palf/palf_handle.h b/src/logservice/palf/palf_handle.h index bff9f7092b..d97a22ecfe 100755 --- a/src/logservice/palf/palf_handle.h +++ b/src/logservice/palf/palf_handle.h @@ -447,6 +447,9 @@ public: // OB_SUCCESS int get_access_mode(int64_t &mode_version, AccessMode &access_mode) const; int get_access_mode(AccessMode &access_mode) const; + int get_access_mode_ref_scn(int64_t &mode_version, + AccessMode &access_mode, + SCN &ref_scn) const; // @brief: check whether the palf instance is allowed to vote for logs // By default, return true; diff --git a/src/logservice/palf/palf_handle_impl.cpp b/src/logservice/palf/palf_handle_impl.cpp index 4b8a1e33ef..4fa13f7c92 100755 --- a/src/logservice/palf/palf_handle_impl.cpp +++ b/src/logservice/palf/palf_handle_impl.cpp @@ -2218,6 +2218,21 @@ int PalfHandleImpl::get_access_mode(int64_t &mode_version, AccessMode &access_mo return ret; } +int PalfHandleImpl::get_access_mode_ref_scn(int64_t &mode_version, + AccessMode &access_mode, + SCN &ref_scn) const +{ + int ret = OB_SUCCESS; + RLockGuard guard(lock_); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + PALF_LOG(WARN, "PalfHandleImpl is not inited", K(ret), KPC(this)); + } else if (OB_FAIL(mode_mgr_.get_access_mode_ref_scn(mode_version, access_mode, ref_scn))) { + PALF_LOG(WARN, "get_access_mode_ref_scn failed", K(ret), KPC(this)); + } + return ret; +} + int PalfHandleImpl::alloc_palf_buffer_iterator(const LSN &offset, PalfBufferIterator &iterator) { diff --git a/src/logservice/palf/palf_handle_impl.h b/src/logservice/palf/palf_handle_impl.h index e52fa94e8c..0ec8ed436e 100755 --- a/src/logservice/palf/palf_handle_impl.h +++ b/src/logservice/palf/palf_handle_impl.h @@ -752,6 +752,9 @@ public: const share::SCN &ref_scn) = 0; virtual int get_access_mode(int64_t &mode_version, AccessMode &access_mode) const = 0; virtual int get_access_mode(AccessMode &access_mode) const = 0; + virtual int get_access_mode_ref_scn(int64_t &mode_version, + AccessMode &access_mode, + SCN &ref_scn) const = 0; virtual int handle_committed_info(const common::ObAddr &server, const int64_t &msg_proposal_id, const int64_t prev_log_id, @@ -969,6 +972,9 @@ public: int get_access_mode(int64_t &mode_version, AccessMode &access_mode) const override final; int get_access_mode(AccessMode &access_mode) const override final; int get_access_mode_version(int64_t &mode_version) const; + int get_access_mode_ref_scn(int64_t &mode_version, + AccessMode &access_mode, + SCN &ref_scn) const override final; // =========================== Iterator start ============================ int alloc_palf_buffer_iterator(const LSN &offset, PalfBufferIterator &iterator) override final; int alloc_palf_group_buffer_iterator(const LSN &offset, PalfGroupBufferIterator &iterator) override final; diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 7b1399699e..79aa8c518e 100755 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -5440,6 +5440,9 @@ int ObPartTransCtx::switch_to_leader(const SCN &start_working_ts) { int ret = OB_SUCCESS; + share::SCN append_mode_initial_scn; + append_mode_initial_scn.set_invalid(); + common::ObTimeGuard timeguard("switch to leader", 10 * 1000); CtxLockGuard guard(lock_); TxCtxStateHelper state_helper(role_state_); @@ -5449,23 +5452,39 @@ int ObPartTransCtx::switch_to_leader(const SCN &start_working_ts) ret = OB_NOT_INIT; } else if (OB_UNLIKELY(is_exiting_)) { TRANS_LOG(DEBUG, "transaction is exiting", "context", *this); + } else if (OB_FAIL(ls_tx_ctx_mgr_->get_ls_log_adapter()->get_append_mode_initial_scn( + append_mode_initial_scn))) { + /* We can not ensure whether there are some redo logs after the append_mode_initial_scn. + * All running trans must be be killed by the append_mode_initial_scn.*/ + TRANS_LOG(WARN, "get append mode initial scn from the palf failed", K(ret), KPC(this)); } else if (OB_FAIL(state_helper.switch_state(TxCtxOps::TAKEOVER))) { TRANS_LOG(WARN, "switch role state error", KR(ret), K(*this)); } else { - const bool need_kill_tx = is_contain_mds_type_(ObTxDataSourceType::TABLE_LOCK) - || is_contain_mds_type_(ObTxDataSourceType::START_TRANSFER_OUT); + const bool contain_mds_table_lock = is_contain_mds_type_(ObTxDataSourceType::TABLE_LOCK); + const bool contain_mds_transfer_out = + is_contain_mds_type_(ObTxDataSourceType::START_TRANSFER_OUT); + bool kill_by_append_mode_initial_scn = false; + if (append_mode_initial_scn.is_valid()) { + kill_by_append_mode_initial_scn = exec_info_.max_applying_log_ts_ <= append_mode_initial_scn; + } + if (ObTxState::INIT == exec_info_.state_) { - if (exec_info_.data_complete_ && !need_kill_tx) { + if (exec_info_.data_complete_ && !contain_mds_table_lock && !contain_mds_transfer_out + && !kill_by_append_mode_initial_scn) { if (OB_FAIL(mt_ctx_.replay_to_commit(false /*is_resume*/))) { TRANS_LOG(WARN, "replay to commit failed", KR(ret), K(*this)); } } else { - TRANS_LOG(WARN, "txn data incomplete, will be aborted", K(need_kill_tx), KPC(this)); + TRANS_LOG(WARN, "txn data incomplete, will be aborted", K(contain_mds_table_lock), + K(contain_mds_transfer_out), K(kill_by_append_mode_initial_scn), + K(append_mode_initial_scn), KPC(this)); if (has_persisted_log_()) { - if (ObPartTransAction::COMMIT == part_trans_action_ || get_upstream_state() >= ObTxState::REDO_COMPLETE) { + if (ObPartTransAction::COMMIT == part_trans_action_ + || get_upstream_state() >= ObTxState::REDO_COMPLETE) { - TRANS_LOG(WARN, "abort self instantly with a tx_commit request", K(need_kill_tx), - KPC(this)); + TRANS_LOG(WARN, "abort self instantly with a tx_commit request", + K(contain_mds_table_lock), K(contain_mds_transfer_out), + K(kill_by_append_mode_initial_scn), K(append_mode_initial_scn), KPC(this)); if (OB_FAIL(do_local_tx_end_(TxEndAction::ABORT_TX))) { TRANS_LOG(WARN, "abort tx failed", KR(ret), KPC(this)); } @@ -5525,10 +5544,8 @@ int ObPartTransCtx::switch_to_leader(const SCN &start_working_ts) TRANS_LOG(INFO, "switch to leader succeed", KPC(this)); #endif } - REC_TRANS_TRACE_EXT2(tlog_, switch_to_leader, - OB_ID(ret), ret, OB_ID(used), - timeguard.get_diff(), - OB_ID(ctx_ref), get_ref()); + REC_TRANS_TRACE_EXT2(tlog_, switch_to_leader, OB_ID(ret), ret, OB_ID(used), timeguard.get_diff(), + OB_ID(ctx_ref), get_ref()); return ret; } diff --git a/src/storage/tx/ob_tx_log_adapter.cpp b/src/storage/tx/ob_tx_log_adapter.cpp index 2bde0d4559..9af00b86d1 100644 --- a/src/storage/tx/ob_tx_log_adapter.cpp +++ b/src/storage/tx/ob_tx_log_adapter.cpp @@ -169,6 +169,18 @@ int ObLSTxLogAdapter::get_max_decided_scn(SCN &scn) return ret; } +int ObLSTxLogAdapter::get_append_mode_initial_scn(share::SCN &ref_scn) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(log_handler_) || !log_handler_->is_valid()) { + ret = OB_INVALID_ARGUMENT; + TRANS_LOG(WARN, "invalid argument", K(ret), KP(log_handler_)); + } else { + ret = log_handler_->get_append_mode_initial_scn(ref_scn); + } + return ret; +} + int ObLSTxLogAdapter::block_confirm_with_dup_tablet_change_snapshot( share::SCN &dup_tablet_change_snapshot) { diff --git a/src/storage/tx/ob_tx_log_adapter.h b/src/storage/tx/ob_tx_log_adapter.h index bc4b0a5867..39ae30ac36 100644 --- a/src/storage/tx/ob_tx_log_adapter.h +++ b/src/storage/tx/ob_tx_log_adapter.h @@ -65,6 +65,8 @@ public: virtual int get_role(bool &is_leader, int64_t &epoch) = 0; virtual int get_max_decided_scn(share::SCN &scn) = 0; + virtual int get_append_mode_initial_scn(share::SCN &ref_scn) = 0; + /** * Dup Table Inerface * */ @@ -104,6 +106,8 @@ public: int get_role(bool &is_leader, int64_t &epoch); int get_max_decided_scn(share::SCN &scn); + int get_append_mode_initial_scn(share::SCN &ref_scn); + /** * Dup Table Inerface * */ diff --git a/unittest/storage/mock_ob_log_handler.h b/unittest/storage/mock_ob_log_handler.h index dc44b49031..7ce9113d01 100644 --- a/unittest/storage/mock_ob_log_handler.h +++ b/unittest/storage/mock_ob_log_handler.h @@ -82,6 +82,12 @@ public: return OB_SUCCESS; } + virtual int get_append_mode_initial_scn(SCN &initial_scn) const + { + UNUSED(initial_scn); + return OB_SUCCESS; + } + virtual int change_access_mode(const int64_t mode_version, const AccessMode &access_mode, const share::SCN &ref_scn) diff --git a/unittest/storage/tx/mock_utils/basic_fake_define.h b/unittest/storage/tx/mock_utils/basic_fake_define.h index 122aee970f..c23851426b 100644 --- a/unittest/storage/tx/mock_utils/basic_fake_define.h +++ b/unittest/storage/tx/mock_utils/basic_fake_define.h @@ -550,6 +550,12 @@ public: return OB_SUCCESS; } + int get_append_mode_initial_scn(share::SCN &ref_scn) { + int ret = OB_SUCCESS; + ref_scn = share::SCN::invalid_scn(); + return ret; + } + int get_inflight_cnt() { return ATOMIC_LOAD(&inflight_cnt_); } diff --git a/unittest/storage/tx/ob_mock_tx_log_adapter.h b/unittest/storage/tx/ob_mock_tx_log_adapter.h index a9fbf98908..391cbac498 100644 --- a/unittest/storage/tx/ob_mock_tx_log_adapter.h +++ b/unittest/storage/tx/ob_mock_tx_log_adapter.h @@ -75,6 +75,11 @@ public: UNUSED(scn); return OB_SUCCESS; } + int get_append_mode_initial_scn(share::SCN &ref_scn) { + int ret = OB_SUCCESS; + ref_scn = share::SCN::invalid_scn(); + return ret; + } void invoke_all_cbs();