diff --git a/src/storage/tx/ob_committer_define.h b/src/storage/tx/ob_committer_define.h index afa1253c7..680cc140e 100644 --- a/src/storage/tx/ob_committer_define.h +++ b/src/storage/tx/ob_committer_define.h @@ -57,8 +57,9 @@ enum class ObTwoPhaseCommitMsgType : uint8_t OB_MSG_TX_MAX, }; -enum class Ob2PCRole : uint8_t +enum class Ob2PCRole : int8_t { + UNKNOWN = -1, ROOT = 0, INTERNAL, LEAF, diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 550bb62cb..84fcd40d5 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -456,7 +456,8 @@ int ObPartTransCtx::handle_timeout(const int64_t delay) } // handle commit timeout on root node - if (!is_sub2pc() && !is_follower_() && is_root() && part_trans_action_ == ObPartTransAction::COMMIT) { + if (!is_sub2pc() && !is_follower_() && part_trans_action_ == ObPartTransAction::COMMIT && + (is_local_tx_() || is_root())) { if (tx_expired) { tmp_ret = post_tx_commit_resp_(OB_TRANS_TIMEOUT); TRANS_LOG(INFO, "callback scheduler txn has timeout", K(tmp_ret), KPC(this)); @@ -585,7 +586,8 @@ int ObPartTransCtx::kill(const KillTransArg &arg, ObIArray & // if ctx was killed gracefully or forcely killed // notify scheduler commit result, if in committing - if (is_root() && !is_follower_() && part_trans_action_ == ObPartTransAction::COMMIT) { + if (!is_follower_() && part_trans_action_ == ObPartTransAction::COMMIT && + (is_local_tx_() || is_root())) { // notify scheduler only if commit callback has not been armed if (commit_cb_.is_enabled() && !commit_cb_.is_inited()) { if (exec_info_.scheduler_ == addr_) { @@ -699,6 +701,8 @@ int ObPartTransCtx::commit(const ObLSArray &parts, } } else { exec_info_.trans_type_ = TransType::DIST_TRANS; + // set 2pc upstream to self + set_2pc_upstream_(ls_id_); if (OB_FAIL(two_phase_commit())) { TRANS_LOG(WARN, "start dist coimit fail", K(ret), KPC(this)); } @@ -1273,6 +1277,16 @@ int ObPartTransCtx::recover_tx_ctx_table_info(const ObTxCtxTableInfo &ctx_info) trans_id_ = ctx_info.tx_id_; ls_id_ = ctx_info.ls_id_; exec_info_ = ctx_info.exec_info_; + if (!exec_info_.upstream_.is_valid() && + !is_local_tx_() && + (ObTxState::REDO_COMPLETE == exec_info_.state_ || + ObTxState::PREPARE == exec_info_.state_ || + ObTxState::PRE_COMMIT == exec_info_.state_ || + ObTxState::COMMIT == exec_info_.state_ || + ObTxState::CLEAR == exec_info_.state_)) { + set_2pc_upstream_(ls_id_); + TRANS_LOG(INFO, "set upstream to self", K(*this)); + } if (ObTxState::REDO_COMPLETE == get_downstream_state()) { sub_state_.set_info_log_submitted(); } @@ -3942,7 +3956,12 @@ int ObPartTransCtx::replay_commit_info(const ObTxCommitInfoLog &commit_info_log, } else if (OB_FAIL(set_app_trace_id_(commit_info_log.get_app_trace_id()))) { TRANS_LOG(WARN, "set app trace id error", K(ret), K(commit_info_log), K(*this)); } else { - set_2pc_upstream_(commit_info_log.get_upstream()); + if (!is_local_tx_() && !commit_info_log.get_upstream().is_valid()) { + set_2pc_upstream_(ls_id_); + TRANS_LOG(INFO, "set upstream to self", K(*this), K(commit_info_log)); + } else { + set_2pc_upstream_(commit_info_log.get_upstream()); + } exec_info_.xid_ = commit_info_log.get_xid(); can_elr_ = commit_info_log.is_elr(); cluster_version_ = commit_info_log.get_cluster_version(); @@ -4555,7 +4574,7 @@ int ObPartTransCtx::switch_to_leader(const SCN &start_working_ts) } inline bool ObPartTransCtx::need_callback_scheduler_() { - return is_root() + return (is_local_tx_() || is_root()) && !is_sub2pc() && ObPartTransAction::COMMIT == part_trans_action_ && addr_ == exec_info_.scheduler_ @@ -5468,6 +5487,8 @@ int ObPartTransCtx::sub_prepare(const ObLSArray &parts, } else { set_stc_by_now_(); } + // set 2pc upstream to self + set_2pc_upstream_(ls_id_); exec_info_.trans_type_ = TransType::DIST_TRANS; exec_info_.xid_ = xid; // (void)set_sub2pc_coord_state(Ob2PCPrepareState::REDO_PREPARING); diff --git a/src/storage/tx/ob_two_phase_upstream_committer.cpp b/src/storage/tx/ob_two_phase_upstream_committer.cpp index 16ce45ee7..74760c0b0 100644 --- a/src/storage/tx/ob_two_phase_upstream_committer.cpp +++ b/src/storage/tx/ob_two_phase_upstream_committer.cpp @@ -872,6 +872,9 @@ bool ObTxCycleTwoPhaseCommitter::all_downstream_collected_() all_collected = true; break; } + default: { + break; + } } return all_collected; } diff --git a/src/storage/tx/ob_tx_2pc_ctx_impl.cpp b/src/storage/tx/ob_tx_2pc_ctx_impl.cpp index c4e526f74..2668d2e2e 100644 --- a/src/storage/tx/ob_tx_2pc_ctx_impl.cpp +++ b/src/storage/tx/ob_tx_2pc_ctx_impl.cpp @@ -12,6 +12,7 @@ #include "storage/tx/ob_trans_part_ctx.h" #include "storage/tx/ob_trans_service.h" +#include "storage/tx/ob_committer_define.h" namespace oceanbase { @@ -22,17 +23,19 @@ namespace transaction Ob2PCRole ObPartTransCtx::get_2pc_role() const { - Ob2PCRole role; + Ob2PCRole role = Ob2PCRole::UNKNOWN; - if (!exec_info_.upstream_.is_valid()) { - role = Ob2PCRole::ROOT; - } else if (exec_info_.incremental_participants_.empty()) { - // not root & downstream is empty - // root must not be leaf, because the distributed txn must be composed by - // more than one participants. - role = Ob2PCRole::LEAF; - } else { - role = Ob2PCRole::INTERNAL; + if (exec_info_.upstream_.is_valid()) { + if (exec_info_.upstream_ == ls_id_) { + role = Ob2PCRole::ROOT; + } else if (exec_info_.incremental_participants_.empty()) { + // not root & downstream is empty + // root must not be leaf, because the distributed txn must be composed by + // more than one participants. + role = Ob2PCRole::LEAF; + } else { + role = Ob2PCRole::INTERNAL; + } } return role; diff --git a/src/storage/tx/ob_tx_2pc_msg_handler.cpp b/src/storage/tx/ob_tx_2pc_msg_handler.cpp index b6e3182ca..4fb480023 100644 --- a/src/storage/tx/ob_tx_2pc_msg_handler.cpp +++ b/src/storage/tx/ob_tx_2pc_msg_handler.cpp @@ -473,10 +473,10 @@ int ObPartTransCtx::apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type) const Ob2pcPrepareReqMsg &msg = *(static_cast(msg_2pc_cache_)); if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) { - } else if (OB_FAIL(set_app_trace_info_(msg.app_trace_info_))) { - TRANS_LOG(WARN, "set app trace info failed", KR(ret), K(msg), K(*this)); } else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) { TRANS_LOG(WARN, "set coordinator failed", KR(ret), K(msg), K(*this)); + } else if (OB_FAIL(set_app_trace_info_(msg.app_trace_info_))) { + TRANS_LOG(WARN, "set app trace info failed", KR(ret), K(msg), K(*this)); } break; } @@ -625,12 +625,12 @@ int ObPartTransCtx::handle_tx_2pc_prepare_redo_req(const Ob2pcPrepareRedoReqMsg exec_info_.xid_ = msg.xid_; if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) { + } else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) { + TRANS_LOG(WARN, "set coordinator failed", KR(ret), K(msg), K(*this)); } else if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) { TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this)); } else if (OB_FAIL(set_app_trace_info_(msg.app_trace_info_))) { TRANS_LOG(WARN, "set app trace info failed", KR(ret), K(msg), K(*this)); - } else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) { - TRANS_LOG(WARN, "set coordinator failed", KR(ret), K(msg), K(*this)); } else if (OB_FAIL(handle_2pc_req(msg_type))) { TRANS_LOG(WARN, "handle 2pc request failed", KR(ret), K(msg), K(*this)); } diff --git a/unittest/storage/tx/test_simple_tx_ctx.cpp b/unittest/storage/tx/test_simple_tx_ctx.cpp index bd950cc85..007251724 100644 --- a/unittest/storage/tx/test_simple_tx_ctx.cpp +++ b/unittest/storage/tx/test_simple_tx_ctx.cpp @@ -111,6 +111,8 @@ TEST_F(TestMockObTxCtx, test_simple_tx_ctx1) ctx1.ls_memo_[ctx2.addr_] = ls_id2; ctx1.set_trans_type_(TransType::DIST_TRANS); ctx1.upstream_state_ = ObTxState::INIT; + // set self to root + ctx1.exec_info_.upstream_ = ls_id1; ctx1.set_downstream_state(ObTxState::REDO_COMPLETE); ctx1.exec_info_.participants_.push_back(ls_id1); ctx1.exec_info_.participants_.push_back(ls_id2);