refine is_root defination

This commit is contained in:
obdev 2022-12-29 12:38:05 +00:00 committed by ob-robot
parent 26f2754db2
commit efc06a1b8f
6 changed files with 49 additions and 19 deletions

View File

@ -57,8 +57,9 @@ enum class ObTwoPhaseCommitMsgType : uint8_t
OB_MSG_TX_MAX,
};
enum class Ob2PCRole : uint8_t
enum class Ob2PCRole : int8_t
{
UNKNOWN = -1,
ROOT = 0,
INTERNAL,
LEAF,

View File

@ -456,7 +456,8 @@ int ObPartTransCtx::handle_timeout(const int64_t delay)
}
// handle commit timeout on root node
if (!is_sub2pc() && !is_follower_() && is_root() && part_trans_action_ == ObPartTransAction::COMMIT) {
if (!is_sub2pc() && !is_follower_() && part_trans_action_ == ObPartTransAction::COMMIT &&
(is_local_tx_() || is_root())) {
if (tx_expired) {
tmp_ret = post_tx_commit_resp_(OB_TRANS_TIMEOUT);
TRANS_LOG(INFO, "callback scheduler txn has timeout", K(tmp_ret), KPC(this));
@ -585,7 +586,8 @@ int ObPartTransCtx::kill(const KillTransArg &arg, ObIArray<ObTxCommitCallback> &
// if ctx was killed gracefully or forcely killed
// notify scheduler commit result, if in committing
if (is_root() && !is_follower_() && part_trans_action_ == ObPartTransAction::COMMIT) {
if (!is_follower_() && part_trans_action_ == ObPartTransAction::COMMIT &&
(is_local_tx_() || is_root())) {
// notify scheduler only if commit callback has not been armed
if (commit_cb_.is_enabled() && !commit_cb_.is_inited()) {
if (exec_info_.scheduler_ == addr_) {
@ -699,6 +701,8 @@ int ObPartTransCtx::commit(const ObLSArray &parts,
}
} else {
exec_info_.trans_type_ = TransType::DIST_TRANS;
// set 2pc upstream to self
set_2pc_upstream_(ls_id_);
if (OB_FAIL(two_phase_commit())) {
TRANS_LOG(WARN, "start dist coimit fail", K(ret), KPC(this));
}
@ -1273,6 +1277,16 @@ int ObPartTransCtx::recover_tx_ctx_table_info(const ObTxCtxTableInfo &ctx_info)
trans_id_ = ctx_info.tx_id_;
ls_id_ = ctx_info.ls_id_;
exec_info_ = ctx_info.exec_info_;
if (!exec_info_.upstream_.is_valid() &&
!is_local_tx_() &&
(ObTxState::REDO_COMPLETE == exec_info_.state_ ||
ObTxState::PREPARE == exec_info_.state_ ||
ObTxState::PRE_COMMIT == exec_info_.state_ ||
ObTxState::COMMIT == exec_info_.state_ ||
ObTxState::CLEAR == exec_info_.state_)) {
set_2pc_upstream_(ls_id_);
TRANS_LOG(INFO, "set upstream to self", K(*this));
}
if (ObTxState::REDO_COMPLETE == get_downstream_state()) {
sub_state_.set_info_log_submitted();
}
@ -3942,7 +3956,12 @@ int ObPartTransCtx::replay_commit_info(const ObTxCommitInfoLog &commit_info_log,
} else if (OB_FAIL(set_app_trace_id_(commit_info_log.get_app_trace_id()))) {
TRANS_LOG(WARN, "set app trace id error", K(ret), K(commit_info_log), K(*this));
} else {
set_2pc_upstream_(commit_info_log.get_upstream());
if (!is_local_tx_() && !commit_info_log.get_upstream().is_valid()) {
set_2pc_upstream_(ls_id_);
TRANS_LOG(INFO, "set upstream to self", K(*this), K(commit_info_log));
} else {
set_2pc_upstream_(commit_info_log.get_upstream());
}
exec_info_.xid_ = commit_info_log.get_xid();
can_elr_ = commit_info_log.is_elr();
cluster_version_ = commit_info_log.get_cluster_version();
@ -4555,7 +4574,7 @@ int ObPartTransCtx::switch_to_leader(const SCN &start_working_ts)
}
inline bool ObPartTransCtx::need_callback_scheduler_() {
return is_root()
return (is_local_tx_() || is_root())
&& !is_sub2pc()
&& ObPartTransAction::COMMIT == part_trans_action_
&& addr_ == exec_info_.scheduler_
@ -5468,6 +5487,8 @@ int ObPartTransCtx::sub_prepare(const ObLSArray &parts,
} else {
set_stc_by_now_();
}
// set 2pc upstream to self
set_2pc_upstream_(ls_id_);
exec_info_.trans_type_ = TransType::DIST_TRANS;
exec_info_.xid_ = xid;
// (void)set_sub2pc_coord_state(Ob2PCPrepareState::REDO_PREPARING);

View File

@ -872,6 +872,9 @@ bool ObTxCycleTwoPhaseCommitter::all_downstream_collected_()
all_collected = true;
break;
}
default: {
break;
}
}
return all_collected;
}

View File

@ -12,6 +12,7 @@
#include "storage/tx/ob_trans_part_ctx.h"
#include "storage/tx/ob_trans_service.h"
#include "storage/tx/ob_committer_define.h"
namespace oceanbase
{
@ -22,17 +23,19 @@ namespace transaction
Ob2PCRole ObPartTransCtx::get_2pc_role() const
{
Ob2PCRole role;
Ob2PCRole role = Ob2PCRole::UNKNOWN;
if (!exec_info_.upstream_.is_valid()) {
role = Ob2PCRole::ROOT;
} else if (exec_info_.incremental_participants_.empty()) {
// not root & downstream is empty
// root must not be leaf, because the distributed txn must be composed by
// more than one participants.
role = Ob2PCRole::LEAF;
} else {
role = Ob2PCRole::INTERNAL;
if (exec_info_.upstream_.is_valid()) {
if (exec_info_.upstream_ == ls_id_) {
role = Ob2PCRole::ROOT;
} else if (exec_info_.incremental_participants_.empty()) {
// not root & downstream is empty
// root must not be leaf, because the distributed txn must be composed by
// more than one participants.
role = Ob2PCRole::LEAF;
} else {
role = Ob2PCRole::INTERNAL;
}
}
return role;

View File

@ -473,10 +473,10 @@ int ObPartTransCtx::apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type)
const Ob2pcPrepareReqMsg &msg = *(static_cast<const Ob2pcPrepareReqMsg *>(msg_2pc_cache_));
if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) {
} else if (OB_FAIL(set_app_trace_info_(msg.app_trace_info_))) {
TRANS_LOG(WARN, "set app trace info failed", KR(ret), K(msg), K(*this));
} else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) {
TRANS_LOG(WARN, "set coordinator failed", KR(ret), K(msg), K(*this));
} else if (OB_FAIL(set_app_trace_info_(msg.app_trace_info_))) {
TRANS_LOG(WARN, "set app trace info failed", KR(ret), K(msg), K(*this));
}
break;
}
@ -625,12 +625,12 @@ int ObPartTransCtx::handle_tx_2pc_prepare_redo_req(const Ob2pcPrepareRedoReqMsg
exec_info_.xid_ = msg.xid_;
if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) {
} else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) {
TRANS_LOG(WARN, "set coordinator failed", KR(ret), K(msg), K(*this));
} else if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this));
} else if (OB_FAIL(set_app_trace_info_(msg.app_trace_info_))) {
TRANS_LOG(WARN, "set app trace info failed", KR(ret), K(msg), K(*this));
} else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) {
TRANS_LOG(WARN, "set coordinator failed", KR(ret), K(msg), K(*this));
} else if (OB_FAIL(handle_2pc_req(msg_type))) {
TRANS_LOG(WARN, "handle 2pc request failed", KR(ret), K(msg), K(*this));
}

View File

@ -111,6 +111,8 @@ TEST_F(TestMockObTxCtx, test_simple_tx_ctx1)
ctx1.ls_memo_[ctx2.addr_] = ls_id2;
ctx1.set_trans_type_(TransType::DIST_TRANS);
ctx1.upstream_state_ = ObTxState::INIT;
// set self to root
ctx1.exec_info_.upstream_ = ls_id1;
ctx1.set_downstream_state(ObTxState::REDO_COMPLETE);
ctx1.exec_info_.participants_.push_back(ls_id1);
ctx1.exec_info_.participants_.push_back(ls_id2);