From fc690167025a9ea294b422d7df7773d61aa4c79b Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 9 Feb 2023 14:39:44 +0000 Subject: [PATCH] submit clear log with the larger scn than any commit log scn from pariticipants --- src/storage/tx/ob_trans_ctx_mgr_v4.cpp | 19 ++++++++ src/storage/tx/ob_trans_ctx_mgr_v4.h | 4 ++ src/storage/tx/ob_trans_part_ctx.cpp | 8 ++-- src/storage/tx/ob_trans_part_ctx.h | 2 + src/storage/tx/ob_trans_service.cpp | 25 ++++++++++ src/storage/tx/ob_trans_service.h | 1 + src/storage/tx/ob_tx_2pc_msg_handler.cpp | 24 ++++++++-- src/storage/tx/ob_tx_msg.cpp | 22 +++++---- src/storage/tx/ob_tx_msg.h | 10 ++-- .../storage/tx/mock_utils/basic_fake_define.h | 48 +++++++++++++++++-- 10 files changed, 142 insertions(+), 21 deletions(-) diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp index c798c29097..611e6781c6 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp @@ -1028,6 +1028,25 @@ int ObLSTxCtxMgr::check_scheduler_status(SCN &min_start_scn, MinStartScnStatus & return ret; } +int ObLSTxCtxMgr::get_max_decided_scn(share::SCN &scn) +{ + RLockGuard guard(rwlock_); + + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + TRANS_LOG(WARN, "ObLSTxCtxMgr not inited"); + ret = OB_NOT_INIT; + // There is no need to check whether it is master + // this interface is called by leader or follower + } else if (is_stopped_()) { + ret = OB_STATE_NOT_MATCH; + TRANS_LOG(WARN, "this ls has beend stopped", KPC(this)); + } else if (OB_FAIL(tx_log_adapter_->get_max_decided_scn(scn))) { + TRANS_LOG(WARN, "get max decided scn failed", K(ret)); + } + return ret; +} + int ObLSTxCtxMgr::check_modify_schema_elapsed(const ObTabletID &tablet_id, const int64_t schema_version, ObTransID &block_tx_id) diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.h b/src/storage/tx/ob_trans_ctx_mgr_v4.h index 9455e045b5..bc685ba3dc 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.h +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.h @@ -480,6 +480,8 @@ public: // ObPartTransCtx -> ObLSTxCtxMgr, It will be a deadlock with normal order. int update_aggre_log_ts_wo_lock(share::SCN rec_log_ts); + int get_max_decided_scn(share::SCN & scn); + TO_STRING_KV(KP(this), K_(ls_id), K_(tenant_id), @@ -943,6 +945,8 @@ public: // @param [in] ls_id: the specified ls_id int check_scheduler_status(share::ObLSID ls_id); + int get_max_decided_scn(const share::ObLSID &ls_id, share::SCN & scn); + private: int create_ls_(const int64_t tenant_id, const share::ObLSID &ls_id, diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 915a1891df..09369ab977 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -298,6 +298,7 @@ void ObPartTransCtx::default_init_() is_incomplete_replay_ctx_ = false; is_submitting_redo_log_for_freeze_ = false; start_working_log_ts_ = SCN::min_scn(); + max_2pc_commit_scn_.reset(); coord_prepare_info_arr_.reset(); reserve_allocator_.reset(); elr_handler_.reset(); @@ -3126,8 +3127,7 @@ int ObPartTransCtx::submit_clear_log_() ObTxClearLog clear_log(exec_info_.incremental_participants_); ObTxLogCb *log_cb = NULL; const int64_t replay_hint = static_cast(trans_id_.get_id()); - ObTxLogBlockHeader - log_block_header(cluster_id_, exec_info_.next_log_entry_no_, trans_id_); + ObTxLogBlockHeader log_block_header(cluster_id_, exec_info_.next_log_entry_no_, trans_id_); if (OB_FAIL(log_block.init(replay_hint, log_block_header))) { TRANS_LOG(WARN, "init log block failed", KR(ret), K(*this)); @@ -3147,7 +3147,9 @@ int ObPartTransCtx::submit_clear_log_() } else if (OB_FAIL(acquire_ctx_ref_())) { TRANS_LOG(ERROR, "acquire ctx ref failed", KR(ret), K(*this)); } else if (OB_FAIL(ls_tx_ctx_mgr_->get_ls_log_adapter()->submit_log( - log_block.get_buf(), log_block.get_size(), SCN::min_scn(), log_cb, false))) { + log_block.get_buf(), log_block.get_size(), + share::SCN::max(ctx_tx_data_.get_end_log_ts(), max_2pc_commit_scn_), log_cb, + false))) { TRANS_LOG(WARN, "submit log to clog adapter failed", KR(ret), K(*this)); return_log_cb_(log_cb); log_cb = NULL; diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 3db7f19754..de73b5068a 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -608,6 +608,7 @@ private: const ObTxMsg &recv_msg, const common::ObAddr &self_addr, ObITransRpc* rpc); + static int get_max_decided_scn_(const share::ObLSID &ls_id, share::SCN &scn); int get_2pc_participants_copy(share::ObLSArray ©_participants); // for xa int post_tx_sub_prepare_resp_(const int status); @@ -800,6 +801,7 @@ private: ObTxState upstream_state_; const ObTxMsg * msg_2pc_cache_; + share::SCN max_2pc_commit_scn_; ObLSLogInfoArray coord_prepare_info_arr_; TransModulePageAllocator reserve_allocator_; // tmp scheduler addr is used to post response for the second phase of xa commit/rollback diff --git a/src/storage/tx/ob_trans_service.cpp b/src/storage/tx/ob_trans_service.cpp index efb8407d09..8210ede7b7 100644 --- a/src/storage/tx/ob_trans_service.cpp +++ b/src/storage/tx/ob_trans_service.cpp @@ -553,6 +553,31 @@ int ObTransService::get_min_undecided_log_ts(const ObLSID &ls_id, SCN &log_ts) return ret; } +int ObTransService::get_max_decided_scn(const share::ObLSID &ls_id, share::SCN &scn) +{ + int ret = OB_SUCCESS; + + ObLSTxCtxMgr *ls_tx_mgr_ptr = nullptr; + if (IS_NOT_INIT) { + TRANS_LOG(WARN, "ObTransService not inited"); + ret = OB_NOT_INIT; + } else if (OB_UNLIKELY(!is_running_)) { + TRANS_LOG(WARN, "ObTransService is not running"); + ret = OB_NOT_RUNNING; + } else if (!ls_id.is_valid()) { + TRANS_LOG(WARN, "invalid argument", K(ls_id)); + ret = OB_INVALID_ARGUMENT; + } else if (OB_FAIL(tx_ctx_mgr_.get_ls_tx_ctx_mgr(ls_id, ls_tx_mgr_ptr))) { + TRANS_LOG(WARN, "get ls tx ctx mgr failed", K(ret)); + } else { + if (OB_FAIL(ls_tx_mgr_ptr->get_max_decided_scn(scn))) { + TRANS_LOG(WARN, "get max decided scn failed", K(ret)); + } + tx_ctx_mgr_.revert_ls_tx_ctx_mgr(ls_tx_mgr_ptr); + } + return ret; +} + int ObTransService::handle_redo_sync_task_(ObDupTableRedoSyncTask *task, bool &need_release_task) { UNUSED(task); diff --git a/src/storage/tx/ob_trans_service.h b/src/storage/tx/ob_trans_service.h index 9bbe88ab90..134a96d11e 100644 --- a/src/storage/tx/ob_trans_service.h +++ b/src/storage/tx/ob_trans_service.h @@ -224,6 +224,7 @@ public: const bool is_rollback, const int64_t expire_ts); int get_max_commit_version(share::SCN &commit_version) const; + int get_max_decided_scn(const share::ObLSID &ls_id, share::SCN & scn); #include "ob_trans_service_v4.h" #include "ob_tx_free_route_api.h" private: diff --git a/src/storage/tx/ob_tx_2pc_msg_handler.cpp b/src/storage/tx/ob_tx_2pc_msg_handler.cpp index b3142e3215..4111fb34ec 100644 --- a/src/storage/tx/ob_tx_2pc_msg_handler.cpp +++ b/src/storage/tx/ob_tx_2pc_msg_handler.cpp @@ -10,6 +10,7 @@ * See the Mulan PubL v2 for more details. */ +#include "storage/tx/ob_trans_service.h" #include "storage/tx/ob_trans_part_ctx.h" namespace oceanbase @@ -147,6 +148,12 @@ int ObPartTransCtx::post_msg_(const ObTwoPhaseCommitMsgType& msg_type, Ob2pcCommitRespMsg commit_resp; build_tx_common_msg_(receiver, commit_resp); commit_resp.commit_version_ = ctx_tx_data_.get_commit_version(); + if (max_2pc_commit_scn_.is_valid()) { + commit_resp.commit_log_scn_ = + share::SCN::max(max_2pc_commit_scn_, ctx_tx_data_.get_end_log_ts()); + } else { + commit_resp.commit_log_scn_ = ctx_tx_data_.get_end_log_ts(); + } if (OB_FAIL(post_msg_(receiver, commit_resp))) { TRANS_LOG(WARN, "rpc post msg failed", K(ret), K(*this), K(receiver), K(msg_type)); } @@ -172,6 +179,7 @@ int ObPartTransCtx::post_msg_(const ObTwoPhaseCommitMsgType& msg_type, case ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ: { Ob2pcClearReqMsg clear_req; build_tx_common_msg_(receiver, clear_req); + clear_req.max_commit_log_scn_ = share::SCN::max(max_2pc_commit_scn_, ctx_tx_data_.get_end_log_ts()); if (OB_FAIL(post_msg_(receiver, clear_req))) { TRANS_LOG(WARN, "rpc post msg failed", K(ret), K(*this), K(receiver), K(msg_type)); } @@ -283,7 +291,11 @@ int ObPartTransCtx::post_orphan_msg_(const ObTwoPhaseCommitMsgType &msg_type, build_tx_common_msg_(recv_msg, self_addr, clear_req); - ret = rpc->post_msg(recv_msg.get_sender_addr(), clear_req); + if (OB_FAIL(MTL(ObTransService*)->get_max_decided_scn(clear_req.sender_, clear_req.max_commit_log_scn_))) { + TRANS_LOG(WARN, "get max get_max_decided_scn failed", K(ret), K(clear_req)); + } else { + ret = rpc->post_msg(recv_msg.get_sender_addr(), clear_req); + } break; } @@ -539,7 +551,7 @@ int ObPartTransCtx::apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type) case ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_RESP: { const Ob2pcCommitRespMsg &msg = *(static_cast(msg_2pc_cache_)); - + max_2pc_commit_scn_ = share::SCN::max(msg.commit_log_scn_, max_2pc_commit_scn_); if (OB_FAIL(set_2pc_commit_version_(msg.commit_version_))) { TRANS_LOG(WARN, "set commit version failed", KR(ret), K(msg), K(*this)); } @@ -562,7 +574,13 @@ int ObPartTransCtx::apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type) } case ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ: { const Ob2pcClearReqMsg &msg = *(static_cast(msg_2pc_cache_)); - + if (msg.max_commit_log_scn_ < max_2pc_commit_scn_ + || msg.max_commit_log_scn_ < ctx_tx_data_.get_end_log_ts()) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "unexpected max commit log scn in clear request", K(ret), KPC(this)); + } else { + max_2pc_commit_scn_ = share::SCN::max(msg.max_commit_log_scn_, max_2pc_commit_scn_); + } break; } case ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_RESP: { diff --git a/src/storage/tx/ob_tx_msg.cpp b/src/storage/tx/ob_tx_msg.cpp index 6656101f3a..b681cc5ed2 100644 --- a/src/storage/tx/ob_tx_msg.cpp +++ b/src/storage/tx/ob_tx_msg.cpp @@ -44,10 +44,10 @@ OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareRespMsg, ObTxMsg, prepare_version_, prep OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPreCommitReqMsg, ObTxMsg, commit_version_); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPreCommitRespMsg, ObTxMsg, commit_version_); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcCommitReqMsg, ObTxMsg, commit_version_, prepare_info_array_); -OB_SERIALIZE_MEMBER_INHERIT(Ob2pcCommitRespMsg, ObTxMsg, commit_version_); +OB_SERIALIZE_MEMBER_INHERIT(Ob2pcCommitRespMsg, ObTxMsg, commit_version_, commit_log_scn_); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcAbortReqMsg, ObTxMsg, upstream_); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcAbortRespMsg, ObTxMsg); -OB_SERIALIZE_MEMBER_INHERIT(Ob2pcClearReqMsg, ObTxMsg); +OB_SERIALIZE_MEMBER_INHERIT(Ob2pcClearReqMsg, ObTxMsg, max_commit_log_scn_); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcClearRespMsg, ObTxMsg); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareRedoReqMsg, ObTxMsg, xid_, upstream_, app_trace_info_); OB_SERIALIZE_MEMBER_INHERIT(Ob2pcPrepareRedoRespMsg, ObTxMsg); @@ -288,13 +288,16 @@ bool Ob2pcCommitReqMsg::is_valid() const return ret; } -bool Ob2pcCommitRespMsg::is_valid() const -{ +bool Ob2pcCommitRespMsg::is_valid() const { bool ret = false; - if (ObTxMsg::is_valid() && type_ == TX_2PC_COMMIT_RESP - && commit_version_.is_valid()) { + if (ObTxMsg::is_valid() && type_ == TX_2PC_COMMIT_RESP && + commit_version_.is_valid()) { ret = true; } + if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_1_0_0 && + !commit_log_scn_.is_valid()) { + ret = false; + } return ret; } @@ -316,12 +319,15 @@ bool Ob2pcAbortRespMsg::is_valid() const return ret; } -bool Ob2pcClearReqMsg::is_valid() const -{ +bool Ob2pcClearReqMsg::is_valid() const { bool ret = false; if (ObTxMsg::is_valid() && type_ == TX_2PC_CLEAR_REQ) { ret = true; } + if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_1_0_0 && + !max_commit_log_scn_.is_valid()) { + ret = false; + } return ret; } diff --git a/src/storage/tx/ob_tx_msg.h b/src/storage/tx/ob_tx_msg.h index 2bd59a13ff..4b49e0b733 100644 --- a/src/storage/tx/ob_tx_msg.h +++ b/src/storage/tx/ob_tx_msg.h @@ -358,13 +358,13 @@ namespace transaction struct Ob2pcCommitRespMsg : public ObTxMsg { public: - Ob2pcCommitRespMsg() : - ObTxMsg(TX_2PC_COMMIT_RESP) - {} + Ob2pcCommitRespMsg() : ObTxMsg(TX_2PC_COMMIT_RESP) {} + public: bool is_valid() const; share::SCN commit_version_; - INHERIT_TO_STRING_KV("txMsg", ObTxMsg, K_(commit_version)); + share::SCN commit_log_scn_; + INHERIT_TO_STRING_KV("txMsg", ObTxMsg, K_(commit_version), K_(commit_log_scn)); OB_UNIS_VERSION(1); }; @@ -401,6 +401,8 @@ namespace transaction {} public: bool is_valid() const; + share::SCN max_commit_log_scn_; + INHERIT_TO_STRING_KV("txMsg", ObTxMsg, K_(max_commit_log_scn)); OB_UNIS_VERSION(1); }; diff --git a/unittest/storage/tx/mock_utils/basic_fake_define.h b/unittest/storage/tx/mock_utils/basic_fake_define.h index 4c155e3fdf..0ce56d896c 100644 --- a/unittest/storage/tx/mock_utils/basic_fake_define.h +++ b/unittest/storage/tx/mock_utils/basic_fake_define.h @@ -389,6 +389,7 @@ public: const static int64_t TASK_QUEUE_CNT = 128; ObSpScLinkQueue apply_task_queue_arr[TASK_QUEUE_CNT]; ObSpScLinkQueue replay_task_queue_arr[TASK_QUEUE_CNT]; + share::SCN max_submit_scn_ = share::SCN::invalid_scn(); void run1() { while(true) { @@ -466,6 +467,7 @@ public: ApplyCbTask *apply_task = new ApplyCbTask(); apply_task->replay_hint_ = replay_hint; apply_task->cb_ = cb; + max_submit_scn_ = share::SCN::max(max_submit_scn_, scn); apply_task_queue_arr[queue_idx].push(apply_task); ATOMIC_INC(&inflight_cnt_); @@ -499,9 +501,49 @@ public: return OB_SUCCESS; } - int get_max_decided_scn(share::SCN &scn) - { - UNUSED(scn); + int get_max_decided_scn(share::SCN &scn) { + int ret = OB_SUCCESS; + share::SCN min_unreplayed_scn; + share::SCN min_unapplyed_scn; + min_unreplayed_scn.invalid_scn(); + min_unapplyed_scn.invalid_scn(); + + for (int64_t i = 0; i < TASK_QUEUE_CNT; ++i) { + if (!replay_task_queue_arr[i].empty()) { + share::SCN tmp_scn; + tmp_scn.convert_for_gts( + static_cast(replay_task_queue_arr[i].top())->log_ts_); + if (min_unreplayed_scn.is_valid() && tmp_scn.is_valid()) { + min_unreplayed_scn = share::SCN::min(tmp_scn, min_unreplayed_scn); + } else if (tmp_scn.is_valid()) { + min_unreplayed_scn = tmp_scn; + } + } + } + + for (int64_t i = 0; i < TASK_QUEUE_CNT; ++i) { + if (!apply_task_queue_arr[i].empty()) { + share::SCN tmp_scn; + tmp_scn = (static_cast( + (static_cast(apply_task_queue_arr[i].top())) + ->cb_)) + ->get_log_ts(); + if (min_unapplyed_scn.is_valid() && tmp_scn.is_valid()) { + min_unapplyed_scn = share::SCN::min(tmp_scn, min_unapplyed_scn); + } else if (tmp_scn.is_valid()) { + min_unapplyed_scn = tmp_scn; + } + } + } + + if (min_unapplyed_scn.is_valid() && min_unapplyed_scn.is_valid()) { + scn = share::SCN::max(min_unapplyed_scn, min_unapplyed_scn); + } else { + scn = max_submit_scn_; + } + if (scn.is_valid()) { + share::SCN::minus(scn, 1); + } return OB_SUCCESS; }