From bbc7b627852bc78eef9bf1aa40c7571af7735586 Mon Sep 17 00:00:00 2001 From: chinaxing Date: Sun, 24 Dec 2023 15:12:44 +0000 Subject: [PATCH] fix switch to follower forcedly will failed due to no memory --- src/storage/tx/ob_trans_ctx.cpp | 18 ++++++ src/storage/tx/ob_trans_ctx.h | 8 +-- src/storage/tx/ob_trans_ctx_mgr_v4.cpp | 38 ++++++------ src/storage/tx/ob_trans_ctx_mgr_v4.h | 2 +- .../tx/ob_trans_end_trans_callback.cpp | 35 +++++++++++ src/storage/tx/ob_trans_end_trans_callback.h | 59 ++++++++++++++++--- src/storage/tx/ob_trans_functor.h | 23 ++++---- src/storage/tx/ob_trans_part_ctx.cpp | 51 ++++++++-------- src/storage/tx/ob_trans_part_ctx.h | 7 +-- src/storage/tx/ob_tx_2pc_msg_handler.cpp | 13 +++- unittest/storage/tx/it/test_tx.cpp | 9 +-- unittest/storage/tx/it/tx_node.h | 7 ++- 12 files changed, 183 insertions(+), 87 deletions(-) diff --git a/src/storage/tx/ob_trans_ctx.cpp b/src/storage/tx/ob_trans_ctx.cpp index 061cefc13..3ee104554 100644 --- a/src/storage/tx/ob_trans_ctx.cpp +++ b/src/storage/tx/ob_trans_ctx.cpp @@ -214,6 +214,19 @@ int ObTransCtx::defer_callback_scheduler_(const int retcode, const SCN &commit_v return ret; } +int ObTransCtx::prepare_commit_cb_for_role_change_(const int cb_ret, ObTxCommitCallback *&cb_list) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(commit_cb_.init(trans_service_, trans_id_, cb_ret, share::SCN()))) { + TRANS_LOG(WARN, "init commit cb fail", K(ret), KPC(this)); + } else if (OB_FAIL(commit_cb_.link(this, cb_list))) { + TRANS_LOG(WARN, "link commit cb fail", K(ret), KPC(this)); + } else { + cb_list = &commit_cb_; + } + return ret; +} + void ObTransCtx::generate_request_id_() { const int64_t request_id = ObClockGenerator::getClock(); @@ -337,5 +350,10 @@ int ObTransCtx::set_app_trace_id_(const ObString &app_trace_id) return ret; } +void ObTransCtx::release_ctx_ref() +{ + ls_tx_ctx_mgr_->revert_tx_ctx_without_lock(this); +} + } // transaction } // oceanbase diff --git a/src/storage/tx/ob_trans_ctx.h b/src/storage/tx/ob_trans_ctx.h index 14472bbba..10f71e85a 100644 --- a/src/storage/tx/ob_trans_ctx.h +++ b/src/storage/tx/ob_trans_ctx.h @@ -161,14 +161,14 @@ public: const ObAddr &get_addr() const { return addr_; } virtual int64_t get_part_trans_action() const { return part_trans_action_; } int acquire_ctx_ref() { return acquire_ctx_ref_(); } - + void release_ctx_ref(); ObITransRpc *get_trans_rpc() const { return rpc_; } public: virtual bool is_inited() const = 0; virtual int handle_timeout(const int64_t delay) = 0; - virtual int kill(const KillTransArg &arg, ObIArray &cb_array) = 0; + virtual int kill(const KillTransArg &arg, ObTxCommitCallback *&cb_list) = 0; // thread unsafe - VIRTUAL_TO_STRING_KV(KP(this), + VIRTUAL_TO_STRING_KV(KP(this), K_(ref), K_(trans_id), K_(tenant_id), K_(is_exiting), @@ -189,6 +189,7 @@ protected: ObITsMgr *get_ts_mgr_(); bool has_callback_scheduler_(); int defer_callback_scheduler_(const int ret, const share::SCN &commit_version); + int prepare_commit_cb_for_role_change_(const int cb_ret, ObTxCommitCallback *&cb_list); int64_t get_remaining_wait_interval_us_() { return trans_need_wait_wrap_.get_remaining_wait_interval_us(); @@ -210,7 +211,6 @@ protected: ObLightHashLink::dec_ref(1); TRANS_LOG(DEBUG, "dec tx ctx ref", KPC(this)); } - virtual int register_timeout_task_(const int64_t interval_us); virtual int unregister_timeout_task_(); void generate_request_id_(); diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp index feee1d24d..f978a3f53 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp @@ -246,11 +246,13 @@ int ObLSTxCtxMgr::offline() return ret; } -int ObLSTxCtxMgr::process_callback_(ObIArray &cb_array) const +int ObLSTxCtxMgr::process_callback_(ObTxCommitCallback *&cb_list) const { int ret = OB_SUCCESS; - for (int64_t i = 0; i < cb_array.count(); i++) { - cb_array.at(i).callback(); + ObTxCommitCallback *next = NULL; + for (ObTxCommitCallback *iter = cb_list; iter != NULL; iter = next) { + ObTxCommitCallback *next = iter->get_link_next(); + iter->callback(); } return ret; } @@ -727,7 +729,7 @@ int ObLSTxCtxMgr::switch_to_follower_forcedly() { int ret = OB_SUCCESS; ObTimeGuard timeguard("ObLSTxCtxMgr::switch_to_follower_forcedly"); - ObSEArray cb_array; + ObTxCommitCallback *cb_list = NULL; { WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US); StateHelper state_helper(ls_id_, state_); @@ -736,13 +738,10 @@ int ObLSTxCtxMgr::switch_to_follower_forcedly() ret = OB_NOT_INIT; } else if (is_follower_()) { // already follower, do nothing - } else if (OB_FAIL(cb_array.reserve(ls_tx_ctx_map_.count()))) { - TRANS_LOG(ERROR, "reserve callback array error", KR(ret)); - ret = OB_EAGAIN; } else if (OB_FAIL(state_helper.switch_state(Ops::LEADER_REVOKE))) { TRANS_LOG(ERROR, "switch state error", KR(ret), "manager", *this); } else { - SwitchToFollowerForcedlyFunctor fn(cb_array); + SwitchToFollowerForcedlyFunctor fn(cb_list); if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) { TRANS_LOG(ERROR, "for each transaction context error", KR(ret), "manager", *this); } else { @@ -756,7 +755,7 @@ int ObLSTxCtxMgr::switch_to_follower_forcedly() } timeguard.click(); // run callback out of lock, ignore ret - (void)process_callback_(cb_array); + (void)process_callback_(cb_list); if (timeguard.get_diff() > 3 * 1000000) { TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "switch_to_follower_forcedly use too much time", K(timeguard), "manager", *this); } @@ -838,7 +837,7 @@ int ObLSTxCtxMgr::switch_to_follower_gracefully() } timeguard.click(); - ObSEArray cb_array; + ObTxCommitCallback *cb_list = NULL; { WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US); timeguard.click(); @@ -850,14 +849,11 @@ int ObLSTxCtxMgr::switch_to_follower_gracefully() TRANS_LOG(WARN, "not init", KR(ret), K(ls_id_)); } else if (OB_FAIL(state_helper.switch_state(Ops::LEADER_REVOKE))) { TRANS_LOG(WARN, "switch state error", KR(ret), K(tenant_id_), K(ls_id_), K(state_)); - } else if (OB_FAIL(cb_array.reserve(ls_tx_ctx_map_.count()))) { - TRANS_LOG(ERROR, "reserve callback array error", KR(ret)); - ret = OB_EAGAIN; } else { timeguard.click(); // TODO const int64_t abs_expired_time = INT64_MAX; - SwitchToFollowerGracefullyFunctor fn(abs_expired_time, cb_array); + SwitchToFollowerGracefullyFunctor fn(abs_expired_time, cb_list); if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) { TRANS_LOG(WARN, "for each tx ctx error", KR(ret), "manager", *this); ret = fn.get_ret(); @@ -882,7 +878,7 @@ int ObLSTxCtxMgr::switch_to_follower_gracefully() timeguard.click(); } } - (void)process_callback_(cb_array); + (void)process_callback_(cb_list); timeguard.click(); TRANS_LOG(INFO, "[LsTxCtxMgr] switch_to_follower_gracefully", K(ret), KPC(this), K(process_count)); if (timeguard.get_diff() > 1000000) { @@ -941,7 +937,7 @@ int ObLSTxCtxMgr::stop(const bool graceful) { int ret = OB_SUCCESS; StateHelper state_helper(ls_id_, state_); - ObSEArray cb_array; + ObTxCommitCallback *cb_list = NULL; const KillTransArg arg(graceful); ObTimeGuard timeguard("ctxmgr stop"); { @@ -961,7 +957,7 @@ int ObLSTxCtxMgr::stop(const bool graceful) } if (OB_SUCC(ret)) { - KillTxCtxFunctor fn(arg, cb_array); + KillTxCtxFunctor fn(arg, cb_list); fn.set_release_audit_mgr_lock(true); if (OB_FAIL(ls_retain_ctx_mgr_.force_gc_retain_ctx())) { TRANS_LOG(WARN, "force gc retain ctx mgr", K(ret)); @@ -977,7 +973,7 @@ int ObLSTxCtxMgr::stop(const bool graceful) if (timeguard.get_diff() > 3 * 1000000) { TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "stop trans use too much time", K(timeguard), "manager", *this); } - process_callback_(cb_array); + process_callback_(cb_list); TRANS_LOG(INFO, "[LsTxCtxMgr] stop done", K(timeguard), "manager", *this); return ret; } @@ -986,12 +982,12 @@ int ObLSTxCtxMgr::kill_all_tx(const bool graceful, bool &is_all_tx_cleaned_up) { int ret = OB_SUCCESS; ObTimeGuard timeguard("ctxmgr kill_all_tx"); - ObSEArray cb_array; + ObTxCommitCallback *cb_list = NULL; const KillTransArg arg(graceful); { WLockGuardWithRetryInterval guard(rwlock_, TRY_THRESOLD_US, RETRY_INTERVAL_US); const int64_t total_active_readonly_request_count = get_total_active_readonly_request_count(); - KillTxCtxFunctor fn(arg, cb_array); + KillTxCtxFunctor fn(arg, cb_list); if (OB_FAIL(ls_retain_ctx_mgr_.force_gc_retain_ctx())) { TRANS_LOG(WARN, "force gc retain ctx mgr", K(ret)); } else if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) { @@ -1002,7 +998,7 @@ int ObLSTxCtxMgr::kill_all_tx(const bool graceful, bool &is_all_tx_cleaned_up) if (timeguard.get_diff() > 3 * 1000000) { TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "kill_all_tx use too much time", K(timeguard), "manager", *this); } - (void)process_callback_(cb_array); + (void)process_callback_(cb_list); TRANS_LOG(INFO, "[LsTxCtxMgr] kill_all_tx done", K(timeguard), "manager", *this); return ret; } diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.h b/src/storage/tx/ob_trans_ctx_mgr_v4.h index 2cb81aebb..155dcad6a 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.h +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.h @@ -585,7 +585,7 @@ private: static const int64_t RETRY_INTERVAL_US = 10 *1000; private: - int process_callback_(ObIArray &cb_array) const; + int process_callback_(ObTxCommitCallback *&cb_list) const; void print_all_tx_ctx_(const int64_t max_print, const bool verbose); int64_t get_tx_ctx_count_() const { return ATOMIC_LOAD(&total_tx_ctx_count_); } int create_tx_ctx_(const ObTxCreateArg &arg, diff --git a/src/storage/tx/ob_trans_end_trans_callback.cpp b/src/storage/tx/ob_trans_end_trans_callback.cpp index 7fbc2a5d4..d4e80f999 100644 --- a/src/storage/tx/ob_trans_end_trans_callback.cpp +++ b/src/storage/tx/ob_trans_end_trans_callback.cpp @@ -27,10 +27,35 @@ void ObTxCommitCallback::reset() enable_ = false; inited_ = false; callback_count_ = 0; + if (linked_) { + TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "should not be linked", KP(tx_ctx_), K(tx_id_), K(ret_)); + if (tx_ctx_ && tx_ctx_->get_ref() > 0) { + tx_ctx_->release_ctx_ref(); + } + linked_ = false; + } + tx_ctx_ = NULL; txs_ = NULL; tx_id_.reset(); ret_ = OB_ERR_UNEXPECTED; commit_version_.reset(); + link_next_ = NULL; +} + +int ObTxCommitCallback::link(ObTransCtx *tx_ctx, ObTxCommitCallback *link_next) +{ + int ret = OB_SUCCESS; + TRANS_LOG(DEBUG, "", KPC(tx_ctx), KP(link_next)); + if (linked_) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "already linked", KPC(this), KPC(tx_ctx), KP(link_next)); + } else { + tx_ctx->acquire_ctx_ref(); + tx_ctx_ = tx_ctx; + link_next_ = link_next; + linked_ = true; + } + return ret; } int ObTxCommitCallback::callback() @@ -46,6 +71,16 @@ int ObTxCommitCallback::callback() ++callback_count_; txs_->handle_tx_commit_result(tx_id_, ret_, commit_version_); } + if (linked_) { + TRANS_LOG(DEBUG, "linked commit cb", KPC(tx_ctx_), K(ret_)); + if (OB_ISNULL(tx_ctx_)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "tx ctx should not be null for linked commit cb", K(ret), KPC(this)); + } else { + tx_ctx_->release_ctx_ref(); + } + linked_ = false; + } return ret; } diff --git a/src/storage/tx/ob_trans_end_trans_callback.h b/src/storage/tx/ob_trans_end_trans_callback.h index 3d65d6f9c..3174e1952 100644 --- a/src/storage/tx/ob_trans_end_trans_callback.h +++ b/src/storage/tx/ob_trans_end_trans_callback.h @@ -26,33 +26,74 @@ class ObTransService; struct ObTxCommitCallback { public: - ObTxCommitCallback() { reset(); } + ObTxCommitCallback() : + enable_(false), + inited_(false), + linked_(false), + callback_count_(0), + txs_(NULL), + tx_ctx_(NULL), + tx_id_(), + ret_(OB_ERR_UNEXPECTED), + commit_version_(), + link_next_(NULL) + {} ~ObTxCommitCallback() { reset(); } - int init(ObTransService* txs, const ObTransID tx_id, const int ret, const share::SCN commit_version) + int init(ObTransService* txs, const ObTransID tx_id, const int cb_ret, const share::SCN commit_version) { - txs_ = txs; - tx_id_ = tx_id; - ret_ = ret; - commit_version_ = commit_version; - inited_ = true; - return OB_SUCCESS; + int ret = OB_SUCCESS; + if (inited_) { + ret = OB_INIT_TWICE; + } else { + txs_ = txs; + tx_id_ = tx_id; + ret_ = cb_ret; + commit_version_ = commit_version; + inited_ = true; + } + return ret; } + int link(ObTransCtx *tx_ctx, ObTxCommitCallback *link_next); bool is_inited() { return inited_; } void disable() { enable_ = false; } void enable() { enable_ = true; } bool is_enabled() { return enable_; } + int get_cb_ret() const { return ret_; } + ObTxCommitCallback *get_link_next() const { return link_next_; } void reset(); void destroy() { reset(); } int callback(); - TO_STRING_KV(K_(inited), K_(enable), KP_(txs), K_(tx_id), K_(ret), K_(commit_version), K_(callback_count)); + ObTxCommitCallback &operator=(const ObTxCommitCallback &right) + { + enable_ = right.enable_; + inited_ = right.inited_; + callback_count_ = right.callback_count_; + txs_ = right.txs_; + tx_id_ = right.tx_id_; + ret_ = right.ret_; + return *this; + } + TO_STRING_KV(K_(inited), + K_(enable), + K_(linked), + KP_(txs), + KP_(tx_ctx), + K_(tx_id), + K_(ret), + K_(commit_version), + K_(callback_count), + KP_(link_next)); public: bool enable_; bool inited_; + bool linked_; int64_t callback_count_; ObTransService* txs_; + ObTransCtx *tx_ctx_; ObTransID tx_id_; int ret_; share::SCN commit_version_; + ObTxCommitCallback *link_next_; }; class ObTxCommitCallbackTask : public ObTransTask diff --git a/src/storage/tx/ob_trans_functor.h b/src/storage/tx/ob_trans_functor.h index 34ea8d0a6..fba244226 100644 --- a/src/storage/tx/ob_trans_functor.h +++ b/src/storage/tx/ob_trans_functor.h @@ -160,7 +160,7 @@ private: class SwitchToFollowerForcedlyFunctor { public: - SwitchToFollowerForcedlyFunctor(ObIArray &cb_array) : cb_array_(cb_array) + SwitchToFollowerForcedlyFunctor(ObTxCommitCallback *&cb_list) : cb_list_(cb_list) { SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/) } @@ -172,7 +172,7 @@ public: if (!tx_id.is_valid() || OB_ISNULL(tx_ctx)) { tmp_ret = common::OB_INVALID_ARGUMENT; TRANS_LOG_RET(WARN, tmp_ret, "invalid argument", K(tx_id), "ctx", OB_P(tx_ctx)); - } else if (common::OB_SUCCESS != (tmp_ret = tx_ctx->switch_to_follower_forcedly(cb_array_))) { + } else if (common::OB_SUCCESS != (tmp_ret = tx_ctx->switch_to_follower_forcedly(cb_list_))) { TRANS_LOG_RET(ERROR, tmp_ret, "leader revoke failed", K(tx_id), K(*tx_ctx)); } @@ -180,7 +180,7 @@ public: } private: - ObIArray &cb_array_; + ObTxCommitCallback *&cb_list_; }; class SwitchToLeaderFunctor @@ -217,9 +217,8 @@ private: class SwitchToFollowerGracefullyFunctor { public: - SwitchToFollowerGracefullyFunctor(const int64_t abs_expired_time, - ObIArray &cb_array) - : abs_expired_time_(abs_expired_time), count_(0), ret_(OB_SUCCESS), cb_array_(cb_array) + SwitchToFollowerGracefullyFunctor(const int64_t abs_expired_time, ObTxCommitCallback *&cb_list) + : abs_expired_time_(abs_expired_time), count_(0), ret_(OB_SUCCESS), cb_list_(cb_list) { SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/); } @@ -242,7 +241,7 @@ public: } } if (OB_SUCC(ret)) { - if (OB_FAIL(tx_ctx->switch_to_follower_gracefully(cb_array_))) { + if (OB_FAIL(tx_ctx->switch_to_follower_gracefully(cb_list_))) { TRANS_LOG(WARN, "switch to follower gracefully failed", KR(ret), K(*tx_ctx)); ret_ = ret; } else { @@ -259,7 +258,7 @@ private: int64_t abs_expired_time_; int64_t count_; int ret_; - ObIArray &cb_array_; + ObTxCommitCallback *&cb_list_; }; class ResumeLeaderFunctor @@ -320,8 +319,8 @@ private: class KillTxCtxFunctor { public: - KillTxCtxFunctor(const KillTransArg &arg, ObIArray &cb_array) - : arg_(arg), release_audit_mgr_lock_(false), cb_array(cb_array) + KillTxCtxFunctor(const KillTransArg &arg, ObTxCommitCallback *&cb_list) + : arg_(arg), release_audit_mgr_lock_(false), cb_list_(cb_list) { SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/); @@ -340,7 +339,7 @@ public: TRANS_LOG(WARN, "invalid argument", K(tx_id), "ctx", OB_P(tx_ctx)); tmp_ret = common::OB_INVALID_ARGUMENT; } else { - if (OB_SUCC(tx_ctx->kill(arg_, cb_array))) { + if (OB_SUCC(tx_ctx->kill(arg_, cb_list_))) { TRANS_LOG(INFO, "kill transaction success", K(tx_id), K_(arg)); } else if (common::OB_TRANS_CANNOT_BE_KILLED == ret) { TRANS_LOG(INFO, "transaction can not be killed", K(tx_id), "context", *tx_ctx); @@ -355,7 +354,7 @@ public: private: KillTransArg arg_; bool release_audit_mgr_lock_; - ObIArray &cb_array; + ObTxCommitCallback *&cb_list_; }; class TransferOutTxOpFunctor diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index c11db2405..550163ae2 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -694,7 +694,7 @@ int ObPartTransCtx::handle_timeout(const int64_t delay) return ret; } -int ObPartTransCtx::kill(const KillTransArg &arg, ObIArray &cb_array) +int ObPartTransCtx::kill(const KillTransArg &arg, ObTxCommitCallback *&cb_list) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; @@ -743,12 +743,9 @@ int ObPartTransCtx::kill(const KillTransArg &arg, ObIArray & // notify scheduler only if commit callback has not been armed if (commit_cb_.is_enabled() && !commit_cb_.is_inited()) { if (exec_info_.scheduler_ == addr_) { - ObTxCommitCallback cb; - cb.init(trans_service_, trans_id_, cb_param, SCN()); - if (OB_FAIL(cb_array.push_back(cb))) { - TRANS_LOG(WARN, "push commit callback fail", K(ret), KPC(this)); - } else { - commit_cb_.disable(); + if (OB_TMP_FAIL(prepare_commit_cb_for_role_change_(cb_param, cb_list))) { + TRANS_LOG(WARN, "prepare commit cb fail", K(tmp_ret), K(cb_param), KPC(this)); + ret = (ret == OB_SUCCESS) ? tmp_ret : ret; } } else { post_tx_commit_resp_(cb_param); @@ -5813,7 +5810,7 @@ inline bool ObPartTransCtx::need_callback_scheduler_() { // 3. resume_leader is called // 4. resume_leader fails // 5. revoke, and still in follower state -int ObPartTransCtx::switch_to_follower_forcedly(ObIArray &cb_array) +int ObPartTransCtx::switch_to_follower_forcedly(ObTxCommitCallback *&cb_list_head) { int ret = OB_SUCCESS; common::ObTimeGuard timeguard("switch_to_follower_forcely", 10 * 1000); @@ -5842,10 +5839,8 @@ int ObPartTransCtx::switch_to_follower_forcedly(ObIArray &cb if (OB_FAIL(do_local_tx_end_(TxEndAction::ABORT_TX))) { TRANS_LOG(WARN, "do local tx abort failed", K(ret)); } else if (need_cb_scheduler) { - ObTxCommitCallback cb; - cb.init(trans_service_, trans_id_, OB_TRANS_KILLED, SCN()); - if (OB_FAIL(cb_array.push_back(cb))) { - TRANS_LOG(WARN, "push back callback failed", K(ret), "context", *this); + if (OB_FAIL(prepare_commit_cb_for_role_change_(OB_TRANS_KILLED, cb_list_head))) { + TRANS_LOG(WARN, "prepare commit cb fail", K(ret), KPC(this)); } } if (OB_FAIL(ret) && need_cb_scheduler) { @@ -5889,18 +5884,21 @@ int ObPartTransCtx::switch_to_follower_forcedly(ObIArray &cb // special handle commit triggered by local call: coordinator colocate with scheduler // let scheduler retry commit with RPC if required if (need_callback_scheduler_()) { - ObTxCommitCallback cb; + int commit_ret = OB_SUCCESS; // no CommitInfoLog has been submitted, txn must abort if (exec_info_.state_ == ObTxState::INIT && !sub_state_.is_info_log_submitted()) { - cb.init(trans_service_, trans_id_, OB_TRANS_KILLED, SCN()); + commit_ret = OB_TRANS_KILLED; } else { // otherwise, txn either continue commit or abort, need retry to get final result - cb.init(trans_service_, trans_id_, OB_NOT_MASTER, SCN()); + commit_ret = OB_NOT_MASTER; } - TRANS_LOG(INFO, "switch to follower forcely, notify txn commit result to scheduler", - "commit_result", cb.ret_, KPC(this)); - if (OB_FAIL(cb_array.push_back(cb))) { - TRANS_LOG(WARN, "push back callback failed", K(ret), "context", *this); + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(prepare_commit_cb_for_role_change_(commit_ret, cb_list_head))) { + TRANS_LOG(WARN, "prepare commit cb fail", K(tmp_ret), KPC(this)); + ret = (OB_SUCCESS == ret) ? tmp_ret : ret; + } else { + TRANS_LOG(INFO, "switch to follower forcely, notify txn commit result to scheduler", + "commit_result", commit_ret, KPC(this)); } } } @@ -5921,7 +5919,7 @@ int ObPartTransCtx::switch_to_follower_forcedly(ObIArray &cb return ret; } -int ObPartTransCtx::switch_to_follower_gracefully(ObIArray &cb_array) +int ObPartTransCtx::switch_to_follower_gracefully(ObTxCommitCallback *&cb_list_head) { int ret = OB_SUCCESS; bool need_submit_log = false; @@ -5965,11 +5963,10 @@ int ObPartTransCtx::switch_to_follower_gracefully(ObIArray & log_type = ObTxLogType::TX_COMMIT_INFO_LOG; } if (need_callback_scheduler_()) { - ObTxCommitCallback cb; - cb.init(trans_service_, trans_id_, OB_SWITCHING_TO_FOLLOWER_GRACEFULLY, SCN()); - TRANS_LOG(INFO, "swtich to follower gracefully, notify scheduler retry", KPC(this)); - if (OB_FAIL(cb_array.push_back(cb))) { - TRANS_LOG(WARN, "push back callback failed", K(ret), "context", *this); + if (OB_FAIL(prepare_commit_cb_for_role_change_(OB_SWITCHING_TO_FOLLOWER_GRACEFULLY, cb_list_head))) { + TRANS_LOG(WARN, "prepare commit cb fail", K(ret), KPC(this)); + } else { + TRANS_LOG(INFO, "swtich to follower gracefully, notify scheduler retry", KPC(this)); } } } @@ -6025,10 +6022,10 @@ int ObPartTransCtx::switch_to_follower_gracefully(ObIArray & OB_ID(used), timeguard.get_diff(), OB_ID(ref), get_ref()); if (OB_FAIL(ret)) { - TRANS_LOG(WARN, "switch to follower gracefully failed", KR(ret), K(ret), KPC(this), K(cb_array)); + TRANS_LOG(WARN, "switch to follower gracefully failed", KR(ret), KPC(this)); } else { #ifndef NDEBUG - TRANS_LOG(INFO, "switch to follower gracefully succeed", KPC(this), K(cb_array)); + TRANS_LOG(INFO, "switch to follower gracefully succeed", KPC(this)); #endif } diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 22507e8ea..b5939c64d 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -192,7 +192,7 @@ public: /* * graceful kill: wait trx finish logging */ - int kill(const KillTransArg &arg, ObIArray &cb_array); + int kill(const KillTransArg &arg, ObTxCommitCallback *&cb_list); memtable::ObMemtableCtx *get_memtable_ctx() { return &mt_ctx_; } int commit(const ObTxCommitParts &parts, const MonotonicTs &commit_time, @@ -298,7 +298,6 @@ private: "2pc_role", get_2pc_role(), K_(collected), - K_(ref), K_(rec_log_ts), K_(prev_rec_log_ts), K_(lastest_snapshot), @@ -443,9 +442,9 @@ public: // leader switch related bool need_callback_scheduler_(); - int switch_to_follower_forcedly(ObIArray &cb_array); + int switch_to_follower_forcedly(ObTxCommitCallback *&cb_list); int switch_to_leader(const share::SCN &start_working_ts); - int switch_to_follower_gracefully(ObIArray &cb_array); + int switch_to_follower_gracefully(ObTxCommitCallback *&cb_list); int resume_leader(const share::SCN &start_working_ts); int supplement_undo_actions_if_exist_(); diff --git a/src/storage/tx/ob_tx_2pc_msg_handler.cpp b/src/storage/tx/ob_tx_2pc_msg_handler.cpp index 044ebe8f9..2d5d062d4 100644 --- a/src/storage/tx/ob_tx_2pc_msg_handler.cpp +++ b/src/storage/tx/ob_tx_2pc_msg_handler.cpp @@ -1104,8 +1104,17 @@ int ObPartTransCtx::post_tx_commit_resp_(const int status) TRANS_LOG(INFO, "report tx commit result to local scheduler succeed", K(status), KP(this)); #endif } - } else { has_skip = true; } - } else { + } else if (commit_cb_.get_cb_ret() == status) { + has_skip = true; + } else { + // maybe has been callbacked due to switch to follower + // the callback status is not final commit status: + // either OB_NOT_MASTER or OB_SWITCH_TO_FOLLOWER_GRACEFULLY + // in these case should callback the scheduler with final status again + use_rpc = true; + } + } + if (use_rpc) { ObTxCommitRespMsg msg; build_tx_common_msg_(SCHEDULER_LS, msg); msg.commit_version_ = commit_version; diff --git a/unittest/storage/tx/it/test_tx.cpp b/unittest/storage/tx/it/test_tx.cpp index c5eaec2e2..b9c318ee6 100644 --- a/unittest/storage/tx/it/test_tx.cpp +++ b/unittest/storage/tx/it/test_tx.cpp @@ -1953,9 +1953,11 @@ TEST_F(ObTestTx, distributed_tx_coordinator_switch_to_follower_forcedly_in_prepa n1->add_drop_msg_type(TX_2PC_PREPARE_RESP); int commit_ret = OB_SUCCESS; + // async start commit std::thread t(do_async_commit, n1, std::ref(tx), std::ref(commit_ret)); usleep(100 * 1000); + // wait coordinator into prepare state ObPartTransCtx *n1_ctx = NULL; ASSERT_EQ(OB_SUCCESS, n1->get_tx_ctx(n1->ls_id_, tx.tx_id_, n1_ctx)); int i = 0; @@ -1965,28 +1967,27 @@ TEST_F(ObTestTx, distributed_tx_coordinator_switch_to_follower_forcedly_in_prepa ASSERT_NE(i, 1001); ASSERT_EQ(OB_SUCCESS, n1->revert_tx_ctx(n1_ctx)); + // switch coordinator to follower forcedly ObLSTxCtxMgr *ls_tx_ctx_mgr1 = NULL; ASSERT_EQ(OB_SUCCESS, n1->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n1->ls_id_, ls_tx_ctx_mgr1)); ASSERT_EQ(OB_SUCCESS, ls_tx_ctx_mgr1->switch_to_follower_forcedly()); n1->wait_all_redolog_applied(); + // n3 takeover as leader ReplayLogEntryFunctor functor(n3); ASSERT_EQ(OB_SUCCESS, n3->fake_tx_log_adapter_->replay_all(functor)); - ObLSTxCtxMgr *ls_tx_ctx_mgr3 = NULL; ASSERT_EQ(OB_SUCCESS, n3->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n3->ls_id_, ls_tx_ctx_mgr3)); - ObTxNode::get_location_adapter_().update_localtion(n3->ls_id_, n3->addr_); - ASSERT_EQ(OB_SUCCESS, ls_tx_ctx_mgr3->switch_to_leader()); n3->wait_all_redolog_applied(); ASSERT_EQ(OB_SUCCESS, n2->wait_all_tx_ctx_is_destoryed()); + // wait commit complete on scheduler t.join(); ASSERT_EQ(OB_SUCCESS, commit_ret); - n3->del_drop_msg_type(TX_2PC_CLEAR_REQ); ASSERT_EQ(OB_SUCCESS, n3->wait_all_tx_ctx_is_destoryed()); ASSERT_EQ(OB_SUCCESS, n1->release_tx(tx)); diff --git a/unittest/storage/tx/it/tx_node.h b/unittest/storage/tx/it/tx_node.h index 8d5121607..51ae21b7b 100644 --- a/unittest/storage/tx/it/tx_node.h +++ b/unittest/storage/tx/it/tx_node.h @@ -221,13 +221,14 @@ private: ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL; OZ(txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(ls_id_, ls_tx_ctx_mgr)); int i = 0; - for (i = 0; i < 2000; ++i) { - if (0 == ls_tx_ctx_mgr->get_tx_ctx_count()) break; + int tx_count = ls_tx_ctx_mgr->get_tx_ctx_count(); + for (i = 0; tx_count > 0 && i < 2000; ++i) { + tx_count = ls_tx_ctx_mgr->get_tx_ctx_count(); usleep(500); } if (2000 == i) { ret = OB_ERR_UNEXPECTED; - LOG_INFO("print all tx begin", K(ret)); + LOG_INFO("wait all tx ctx destoryed fail, print all tx:", K(tx_count)); const bool verbose = true; ls_tx_ctx_mgr->print_all_tx_ctx(ObLSTxCtxMgr::MAX_HASH_ITEM_PRINT, verbose); LOG_INFO("print all tx end", K(ret));