2PC code format about msg_cache and 2pc_role
This commit is contained in:
@ -57,6 +57,13 @@ enum class ObTwoPhaseCommitMsgType : uint8_t
|
||||
OB_MSG_TX_MAX,
|
||||
};
|
||||
|
||||
enum class Ob2PCRole : uint8_t
|
||||
{
|
||||
ROOT = 0,
|
||||
INTERNAL,
|
||||
LEAF,
|
||||
};
|
||||
|
||||
enum class ObTxState : uint8_t
|
||||
{
|
||||
UNKNOWN = 0,
|
||||
@ -70,7 +77,7 @@ enum class ObTxState : uint8_t
|
||||
MAX = 100
|
||||
};
|
||||
|
||||
const uint8_t OB_C2PC_UPSTREAM_ID = UINT8_MAX - 1;
|
||||
const int64_t OB_C2PC_UPSTREAM_ID = INT64_MAX - 1;
|
||||
|
||||
/* // ObITxCommitter provides method to commit the transaction with user provided callbacks. */
|
||||
/* // The interface need guarantee the atomicity of the transaction. */
|
||||
|
||||
@ -267,6 +267,7 @@ void ObPartTransCtx::default_init_()
|
||||
clean_retain_cause_();
|
||||
|
||||
upstream_state_ = ObTxState::INIT;
|
||||
msg_2pc_cache_ = nullptr;
|
||||
exec_info_.reset();
|
||||
ctx_tx_data_.reset();
|
||||
sub_state_.reset();
|
||||
@ -482,11 +483,9 @@ int ObPartTransCtx::handle_timeout(const int64_t delay)
|
||||
bool unused = false;
|
||||
if (is_2pc_logging()) {
|
||||
TRANS_LOG(INFO, "committer is under logging", K(ret), K(*this));
|
||||
} else if (OB_FAIL(do_prepare(unused))) {
|
||||
} else if (OB_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) {
|
||||
TRANS_LOG(WARN, "do prepare failed", K(ret), K(*this));
|
||||
} else {
|
||||
set_upstream_state(ObTxState::PREPARE);
|
||||
collected_.reset();
|
||||
part_trans_action_ = ObPartTransAction::COMMIT;
|
||||
}
|
||||
}
|
||||
@ -1056,7 +1055,7 @@ int ObPartTransCtx::gts_elapse_callback(const MonotonicTs srr, const SCN >s)
|
||||
KR(ret), KPC(this));
|
||||
} else if (is_follower_()) {
|
||||
TRANS_LOG(INFO, "current state is follower, do nothing", KPC(this));
|
||||
} else if (OB_FAIL(ObTxCycleTwoPhaseCommitter::enter_pre_commit_state())) {
|
||||
} else if (OB_FAIL(ObTxCycleTwoPhaseCommitter::try_enter_pre_commit_state())) {
|
||||
TRANS_LOG(WARN, "enter_pre_commit_state failed", K(ret), KPC(this));
|
||||
} else {
|
||||
// TODO, refine in 4.1
|
||||
@ -1066,26 +1065,6 @@ int ObPartTransCtx::gts_elapse_callback(const MonotonicTs srr, const SCN >s)
|
||||
}
|
||||
TRANS_LOG(INFO, "apply prepare log for sub trans", K(ret), K(*this));
|
||||
}
|
||||
// TODO, currently, if a trans only has one participant,
|
||||
// the state can not be drived from pre commit to commit.
|
||||
// Therefore, enter commit state directly.
|
||||
const int64_t SINGLE_COUNT = 1;
|
||||
if (SINGLE_COUNT == get_participants_size()) {
|
||||
upstream_state_ = ObTxState::COMMIT;
|
||||
collected_.reset();
|
||||
// TODO, drive it and submit log via msg
|
||||
if (OB_FAIL(do_commit())) {
|
||||
TRANS_LOG(WARN, "do commit failed", K(ret), K(*this));
|
||||
} else {
|
||||
if (OB_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_REQ))) {
|
||||
TRANS_LOG(WARN, "post commit request failed", K(ret), K(*this));
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
if (OB_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_COMMIT))) {
|
||||
TRANS_LOG(WARN, "submit commit log failed", K(ret), K(*this));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
sub_state_.clear_gts_waiting();
|
||||
@ -3472,23 +3451,23 @@ int ObPartTransCtx::try_submit_next_log()
|
||||
|
||||
bool ObPartTransCtx::is_2pc_logging() const { return is_2pc_logging_(); }
|
||||
|
||||
uint64_t ObPartTransCtx::get_participant_id()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
uint64_t participant_id = UINT64_MAX;
|
||||
// uint64_t ObPartTransCtx::get_participant_id()
|
||||
// {
|
||||
// int ret = OB_SUCCESS;
|
||||
// uint64_t participant_id = UINT64_MAX;
|
||||
//
|
||||
// if (OB_FAIL(find_participant_id_(ls_id_, participant_id))) {
|
||||
// TRANS_LOG(ERROR, "find participant id failed", K(*this));
|
||||
// }
|
||||
//
|
||||
// return participant_id;
|
||||
// }
|
||||
|
||||
if (OB_FAIL(find_participant_id_(ls_id_, participant_id))) {
|
||||
TRANS_LOG(ERROR, "find participant id failed", K(*this));
|
||||
}
|
||||
|
||||
return participant_id;
|
||||
}
|
||||
|
||||
int ObPartTransCtx::find_participant_id_(const ObLSID &participant, uint64_t &participant_id)
|
||||
int ObPartTransCtx::find_participant_id_(const ObLSID &participant, int64_t &participant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool found = false;
|
||||
participant_id = UINT64_MAX;
|
||||
participant_id = INT64_MAX;
|
||||
|
||||
for (int64_t i = 0; !found && i < exec_info_.participants_.count(); i++) {
|
||||
if (participant == exec_info_.participants_[i]) {
|
||||
@ -3504,16 +3483,6 @@ int ObPartTransCtx::find_participant_id_(const ObLSID &participant, uint64_t &pa
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool ObPartTransCtx::is_root() const { return !exec_info_.upstream_.is_valid(); }
|
||||
|
||||
bool ObPartTransCtx::is_leaf() const
|
||||
{
|
||||
return exec_info_.participants_.empty()
|
||||
// root must not be leaf, because the distributed txn must be composed by
|
||||
// more than one participants.
|
||||
&& !is_root();
|
||||
}
|
||||
|
||||
//***************************** for 4.0
|
||||
int ObPartTransCtx::check_replay_avaliable_(const palf::LSN &offset,
|
||||
const SCN ×tamp,
|
||||
@ -3967,7 +3936,6 @@ int ObPartTransCtx::replay_commit_info(const ObTxCommitInfoLog &commit_info_log,
|
||||
} else if (OB_FAIL(set_app_trace_id_(commit_info_log.get_app_trace_id()))) {
|
||||
TRANS_LOG(WARN, "set app trace id error", K(ret), K(commit_info_log), K(*this));
|
||||
} else {
|
||||
set_durable_state_(ObTxState::REDO_COMPLETE);
|
||||
set_2pc_upstream_(commit_info_log.get_upstream());
|
||||
exec_info_.xid_ = commit_info_log.get_xid();
|
||||
can_elr_ = commit_info_log.is_elr();
|
||||
@ -3993,6 +3961,9 @@ int ObPartTransCtx::replay_commit_info(const ObTxCommitInfoLog &commit_info_log,
|
||||
ObTwoPhaseCommitLogType two_phase_log_type = ObTwoPhaseCommitLogType::OB_LOG_TX_MAX;
|
||||
if (is_incomplete_replay_ctx_) {
|
||||
// incomplete replay ctx will exiting by replay commit/abort/clear, no need to depend on 2PC
|
||||
} else if (is_local_tx_()) {
|
||||
set_durable_state_(ObTxState::REDO_COMPLETE);
|
||||
set_upstream_state(ObTxState::REDO_COMPLETE);
|
||||
} else if (OB_FAIL(switch_log_type_(commit_info_log.LOG_TYPE, two_phase_log_type))) {
|
||||
TRANS_LOG(WARN, "switch log type failed", KR(ret), KPC(this));
|
||||
} else if (OB_FAIL(ObTxCycleTwoPhaseCommitter::replay_log(two_phase_log_type))) {
|
||||
|
||||
@ -54,6 +54,7 @@ class ObTxCommitLog;
|
||||
class ObTxAbortLog;
|
||||
class ObTxClearLog;
|
||||
class ObIRetainCtxCheckFunctor;
|
||||
struct ObTxMsg;
|
||||
}
|
||||
namespace palf
|
||||
{
|
||||
@ -226,6 +227,9 @@ private:
|
||||
K(coord_prepare_info_arr_),
|
||||
K_(upstream_state),
|
||||
K_(retain_cause),
|
||||
"2pc_role",
|
||||
get_2pc_role(),
|
||||
KPC(msg_2pc_cache_),
|
||||
K_(collected),
|
||||
K_(ref),
|
||||
K_(rec_log_ts),
|
||||
@ -486,7 +490,7 @@ protected:
|
||||
virtual int wait_gts_elapse_commit_version_(bool &need_wait);
|
||||
virtual int get_local_max_read_version_(share::SCN &local_max_read_version);
|
||||
virtual int update_local_max_commit_version_(const share::SCN &commit_version);
|
||||
virtual int check_and_response_scheduler_(int result);
|
||||
virtual int check_and_response_scheduler_(ObTxState next_phase, int result);
|
||||
private:
|
||||
|
||||
int init_log_cbs_(const share::ObLSID&ls_id, const ObTransID &tx_id);
|
||||
@ -537,10 +541,11 @@ public:
|
||||
int handle_tx_2pc_prepare_version_req(const Ob2pcPrepareVersionReqMsg &msg);
|
||||
int handle_tx_2pc_prepare_version_resp(const Ob2pcPrepareVersionRespMsg &msg);
|
||||
protected:
|
||||
virtual int post_msg(const ObTwoPhaseCommitMsgType &msg_type) override;
|
||||
// virtual int post_msg(const ObTwoPhaseCommitMsgType &msg_type);
|
||||
virtual int post_msg(const ObTwoPhaseCommitMsgType& msg_type,
|
||||
const uint8_t participant_id) override;
|
||||
const int64_t participant_id) override;
|
||||
private:
|
||||
int apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type);
|
||||
int set_2pc_upstream_(const share::ObLSID&upstream);
|
||||
int set_2pc_participants_(const share::ObLSArray &participants);
|
||||
int set_2pc_incremental_participants_(const share::ObLSArray &participants);
|
||||
@ -550,7 +555,7 @@ private:
|
||||
int merge_prepare_log_info_(const ObLSLogInfo &prepare_info);
|
||||
int set_2pc_commit_version_(const share::SCN &commit_version);
|
||||
int find_participant_id_(const share::ObLSID&participant,
|
||||
uint64_t &participant_id);
|
||||
int64_t &participant_id);
|
||||
int post_tx_commit_resp_(const int status);
|
||||
int post_msg_(const ObTwoPhaseCommitMsgType& msg_type,
|
||||
const share::ObLSID&ls);
|
||||
@ -586,11 +591,12 @@ private:
|
||||
|
||||
// ========================== TX COMMITTER BEGIN ==========================
|
||||
protected:
|
||||
virtual bool is_root() const override;
|
||||
virtual bool is_leaf() const override;
|
||||
virtual int64_t get_participants_size() override
|
||||
{ return exec_info_.participants_.count(); }
|
||||
virtual uint64_t get_participant_id() override;
|
||||
virtual Ob2PCRole get_2pc_role() const override;
|
||||
virtual int64_t get_downstream_size() const override
|
||||
{
|
||||
return exec_info_.participants_.count();
|
||||
};
|
||||
virtual int64_t get_self_id();
|
||||
|
||||
virtual bool is_2pc_logging() const override;
|
||||
virtual ObTxState get_downstream_state() const override
|
||||
@ -615,7 +621,7 @@ protected:
|
||||
// size of the participants array is larger or equal than one.
|
||||
virtual int do_prepare(bool &no_need_submit_log) override;
|
||||
virtual int on_prepare() override;
|
||||
virtual int do_pre_commit(bool& need_wait) override;
|
||||
virtual int do_pre_commit(bool &need_wait) override;
|
||||
virtual int do_commit() override;
|
||||
virtual int on_commit() override;
|
||||
virtual int do_abort() override;
|
||||
@ -750,6 +756,7 @@ private:
|
||||
int16_t retain_cause_;
|
||||
|
||||
ObTxState upstream_state_;
|
||||
const ObTxMsg * msg_2pc_cache_;
|
||||
ObLSLogInfoArray coord_prepare_info_arr_;
|
||||
TransModulePageAllocator reserve_allocator_;
|
||||
// tmp scheduler addr is used to post response for the second phase of xa commit/rollback
|
||||
|
||||
@ -48,12 +48,12 @@ namespace transaction
|
||||
class ObTxCycleTwoPhaseCommitter
|
||||
{
|
||||
public:
|
||||
ObTxCycleTwoPhaseCommitter() : collected_() {}
|
||||
ObTxCycleTwoPhaseCommitter() : collected_(), self_id_(-1) {}
|
||||
~ObTxCycleTwoPhaseCommitter() {}
|
||||
void reset()
|
||||
{
|
||||
collected_.reset();
|
||||
set_upstream_state(ObTxState::INIT);
|
||||
self_id_ = -1;
|
||||
}
|
||||
// two_phase_commit triggers the underlying two phase commit progress.
|
||||
//
|
||||
@ -82,7 +82,7 @@ public:
|
||||
// was before invoking the method.
|
||||
int handle_2pc_req(const ObTwoPhaseCommitMsgType msg_type);
|
||||
int handle_2pc_resp(const ObTwoPhaseCommitMsgType msg_type,
|
||||
const uint8_t participant_id);
|
||||
const int64_t participant_id);
|
||||
|
||||
static int handle_orphan_2pc_req(const ObTwoPhaseCommitMsgType recv_msg_type,
|
||||
ObTwoPhaseCommitMsgType& send_msg_type,
|
||||
@ -121,25 +121,25 @@ public:
|
||||
//
|
||||
// two phase commit normal message handler.
|
||||
int handle_2pc_prepare_request();
|
||||
int handle_2pc_prepare_response(const uint8_t participant_id);
|
||||
int handle_2pc_prepare_response(const int64_t participant_id);
|
||||
int handle_2pc_commit_request();
|
||||
int handle_2pc_abort_request();
|
||||
int handle_2pc_commit_response(const uint8_t participant_id);
|
||||
int handle_2pc_abort_response(const uint8_t participant_id);
|
||||
int handle_2pc_commit_response(const int64_t participant_id);
|
||||
int handle_2pc_abort_response(const int64_t participant_id);
|
||||
|
||||
// Oceanbase's optimized two phase commit protocol
|
||||
//
|
||||
// In our optimized protocol, we use precommit msg to reduce single machine
|
||||
// read latency and clear msg to reduce SQL commit latency.
|
||||
int handle_2pc_pre_commit_request();
|
||||
int handle_2pc_pre_commit_response(const uint8_t participant_id);
|
||||
int handle_2pc_pre_commit_response(const int64_t participant_id);
|
||||
int handle_2pc_clear_request();
|
||||
int handle_2pc_clear_response(const uint8_t participant_id);
|
||||
int handle_2pc_clear_response(const int64_t participant_id);
|
||||
|
||||
// message handler for special usage
|
||||
// only persist redo and commit info
|
||||
int handle_2pc_prepare_redo_request();
|
||||
int handle_2pc_prepare_redo_response(const uint8_t participant_id);
|
||||
int handle_2pc_prepare_redo_response(const int64_t participant_id);
|
||||
|
||||
// two phase commit orphan message handler message handler
|
||||
//
|
||||
@ -186,7 +186,7 @@ public:
|
||||
int recover_from_tx_table();
|
||||
|
||||
int try_enter_pre_commit_state();
|
||||
int enter_pre_commit_state();
|
||||
int on_pre_commit();
|
||||
|
||||
// Two phase committer user should implement its own state handler on its own.
|
||||
// For ObTxCtx, we use the handler to implements the concurrent control
|
||||
@ -208,6 +208,12 @@ public:
|
||||
virtual int on_abort() = 0;
|
||||
virtual int on_clear() = 0;
|
||||
|
||||
// 1. when recive a new request msg, the participant will enter into the next phase.
|
||||
// 2. invoke do_xxx to execute all in-memory operation with the next phase
|
||||
// 3. set upstream_state at last
|
||||
// 4. retry submit log if upstream_state is larger than downstream_state
|
||||
int drive_self_2pc_phase(ObTxState next_phase);
|
||||
|
||||
// for xa
|
||||
virtual int reply_to_scheduler_for_sub2pc(int64_t msg_type) = 0;
|
||||
|
||||
@ -262,14 +268,16 @@ public:
|
||||
// state to his parent node.
|
||||
//
|
||||
// Detailed design is in https://yuque.antfin.com/ob/transaction/tkcto4
|
||||
virtual int64_t get_participants_size() = 0;
|
||||
virtual uint64_t get_participant_id() = 0;
|
||||
virtual int64_t get_downstream_size() const = 0;
|
||||
virtual int64_t get_self_id() = 0;
|
||||
// is_root returns whether it is the root participant in the cycle two phase
|
||||
// commit
|
||||
virtual bool is_root() const = 0;
|
||||
bool is_root() const { return Ob2PCRole::ROOT == get_2pc_role(); }
|
||||
// is_leaf returns whether it is the leaf participant in the cycle two phase
|
||||
// commit
|
||||
virtual bool is_leaf() const = 0;
|
||||
bool is_leaf() const { return Ob2PCRole::LEAF == get_2pc_role(); }
|
||||
virtual Ob2PCRole get_2pc_role() const = 0;
|
||||
|
||||
// is_2pc_logging returns whether it is waiting for the success of two phase
|
||||
// commit asynchronous logging. Because of the asynchronization of the
|
||||
// logging, We almost view the unfinished logging as the state transition all
|
||||
@ -284,11 +292,14 @@ public:
|
||||
// If you are interested in the application, see handle_2pc_prepare_request
|
||||
// and apply_prepare_log.
|
||||
virtual bool is_2pc_logging() const = 0;
|
||||
//
|
||||
|
||||
//durable state, set by applying log
|
||||
virtual ObTxState get_downstream_state() const = 0;
|
||||
virtual int set_downstream_state(const ObTxState state) = 0;
|
||||
//in-memory state, set by msg
|
||||
virtual ObTxState get_upstream_state() const = 0;
|
||||
virtual int set_upstream_state(const ObTxState state) = 0;
|
||||
|
||||
// for xa
|
||||
bool is_prepared_sub2pc()
|
||||
{
|
||||
@ -309,9 +320,9 @@ public:
|
||||
// Implementer need implement its own msg according to all ObTwoPhaseCommitMsgType.
|
||||
// While it should not guarantee anything except best effort property and interface
|
||||
// adaption for hande_msg
|
||||
virtual int post_msg(const ObTwoPhaseCommitMsgType& msg_type) = 0;
|
||||
int post_downstream_msg(const ObTwoPhaseCommitMsgType msg_type);
|
||||
virtual int post_msg(const ObTwoPhaseCommitMsgType& msg_type,
|
||||
const uint8_t participant_id) = 0;
|
||||
const int64_t participant_id) = 0;
|
||||
// whether the processing of current two phase commit is sub part of a global transaction
|
||||
// TODO, refine in 4.1
|
||||
virtual bool is_sub2pc() const = 0;
|
||||
@ -325,16 +336,18 @@ public:
|
||||
private:
|
||||
// Inner method for handle_2pc_xxx_request/response for clearity
|
||||
int handle_2pc_prepare_redo_request_impl_();
|
||||
int handle_2pc_prepare_redo_response_impl_(const uint8_t participant_id);
|
||||
int handle_2pc_prepare_redo_response_impl_(const int64_t participant_id);
|
||||
int handle_2pc_prepare_request_impl_();
|
||||
int handle_2pc_pre_commit_request_impl_();
|
||||
int handle_2pc_commit_request_impl_();
|
||||
int handle_2pc_abort_request_impl_();
|
||||
int handle_2pc_clear_request_impl_();
|
||||
int handle_2pc_prepare_response_impl_(const uint8_t participant_id);
|
||||
int handle_2pc_pre_commit_response_impl_(const uint8_t participant);
|
||||
int handle_2pc_ack_response_impl_(const uint8_t participant_id);
|
||||
int handle_2pc_abort_response_impl_(const uint8_t participant_id);
|
||||
int handle_2pc_prepare_response_impl_(const int64_t participant_id);
|
||||
int handle_2pc_pre_commit_response_impl_(const int64_t participant);
|
||||
int handle_2pc_ack_response_impl_(const int64_t participant_id);
|
||||
int handle_2pc_abort_response_impl_(const int64_t participant_id);
|
||||
|
||||
virtual int apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type) = 0;
|
||||
|
||||
// Because the post_msg is best effect, we need retry to post the msg under
|
||||
// exception.
|
||||
@ -342,17 +355,17 @@ private:
|
||||
// NB: We should take both upstream and downstream into consideration.
|
||||
int decide_downstream_msg_type_(bool &need_submit, ObTwoPhaseCommitMsgType &msg_type);
|
||||
int retransmit_downstream_msg_();
|
||||
int retransmit_downstream_msg_(const uint8_t participant);
|
||||
int retransmit_upstream_msg_(const ObTxState state);
|
||||
int retransmit_downstream_msg_(const int64_t participant);
|
||||
|
||||
// Because the submit_log may fail, we need retry to submit the log under
|
||||
// exception.
|
||||
int decide_downstream_log_type_(bool &need_submit, ObTwoPhaseCommitLogType &log_type);
|
||||
int resubmit_downstream_log_();
|
||||
int decide_2pc_log_type_(bool &need_submit, ObTwoPhaseCommitLogType &log_type);
|
||||
int resubmit_2pc_log_();
|
||||
|
||||
// Means we collect all downstream responses
|
||||
bool all_downstream_collected_();
|
||||
int collect_downstream_(const uint8_t participant);
|
||||
int collect_downstream_(const int64_t participant);
|
||||
|
||||
protected:
|
||||
// colloected_ is the bit set for storing responses from participants
|
||||
@ -360,6 +373,10 @@ protected:
|
||||
// NB: We introduce the rule that the bit set is cleaned up each time state
|
||||
// is transferred.
|
||||
common::ObBitSet<> collected_;
|
||||
|
||||
int64_t self_id_;
|
||||
//set by xa or dup table
|
||||
// bool no_need_submit_prepare_log_;
|
||||
};
|
||||
|
||||
bool is_2pc_request_msg(const ObTwoPhaseCommitMsgType msg_type);
|
||||
|
||||
@ -114,7 +114,7 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_redo_request_impl_()
|
||||
TRANS_LOG(WARN, "submit commit info log failed", K(tmp_ret), K(*this));
|
||||
}
|
||||
} else if (!is_root() && !is_leaf()) {
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REDO_REQ))) {
|
||||
if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REDO_REQ))) {
|
||||
TRANS_LOG(WARN, "post prepare redo msg failed", KR(tmp_ret), KPC(this));
|
||||
}
|
||||
}
|
||||
@ -122,7 +122,7 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_redo_request_impl_()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_redo_response(const uint8_t participant)
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_redo_response(const int64_t participant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -169,7 +169,7 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_redo_response(const uint8_t p
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_redo_response_impl_(const uint8_t participant)
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_redo_response_impl_(const int64_t participant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -256,9 +256,8 @@ int ObTxCycleTwoPhaseCommitter::prepare_redo()
|
||||
} else {
|
||||
set_upstream_state(ObTxState::REDO_COMPLETE);
|
||||
collected_.reset();
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REDO_REQ))) {
|
||||
if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REDO_REQ))) {
|
||||
TRANS_LOG(WARN, "post prepare request failed", K(tmp_ret), K(*this));
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
|
||||
// TODO, only submit commit info
|
||||
@ -289,33 +288,18 @@ int ObTxCycleTwoPhaseCommitter::continue_execution(const bool is_rollback)
|
||||
TRANS_LOG(INFO, "already in second phase", K(ret), K(*this));
|
||||
} else {
|
||||
if (is_rollback) {
|
||||
set_upstream_state(ObTxState::ABORT);
|
||||
collected_.reset();
|
||||
if (OB_FAIL(do_abort())) {
|
||||
if (OB_FAIL(drive_self_2pc_phase(ObTxState::ABORT))) {
|
||||
TRANS_LOG(ERROR, "do abort failed", K(ret));
|
||||
} else if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_REQ))) {
|
||||
} else if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_REQ))) {
|
||||
TRANS_LOG(WARN, "post abort request failed", K(tmp_ret), KPC(this));
|
||||
}
|
||||
if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_ABORT))) {
|
||||
TRANS_LOG(WARN, "submit abort log failed", K(tmp_ret), KPC(this));
|
||||
}
|
||||
} else {
|
||||
// TODO, switch state first if do preapre can be executed repeatedly
|
||||
if (OB_FAIL(do_prepare(no_need_submit_log))) {
|
||||
if (OB_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) {
|
||||
TRANS_LOG(ERROR, "do prepare failed", K(ret));
|
||||
} else {
|
||||
// switch coord state to prepare
|
||||
set_upstream_state(ObTxState::PREPARE);
|
||||
collected_.reset();
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ))) {
|
||||
} else if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ))) {
|
||||
TRANS_LOG(WARN, "post prepare request failed", K(tmp_ret), KPC(this));
|
||||
}
|
||||
if (no_need_submit_log) {
|
||||
} else if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_PREPARE))) {
|
||||
/* failure of submitting prepare log is harmless, this action will be retried */
|
||||
TRANS_LOG(WARN, "submit prepare log failed", K(tmp_ret), KPC(this));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -173,6 +173,7 @@ int ObTxCycleTwoPhaseCommitter::leader_revoke()
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_request()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
const ObTxState state = get_downstream_state();
|
||||
|
||||
switch (state) {
|
||||
@ -184,41 +185,36 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_request()
|
||||
break;
|
||||
}
|
||||
case ObTxState::PREPARE: {
|
||||
if (ObTxState::PREPARE == get_upstream_state() && all_downstream_collected_()) {
|
||||
// no need retransmit downstream msg
|
||||
} else {
|
||||
if (OB_FAIL(retransmit_downstream_msg_())) {
|
||||
TRANS_LOG(WARN, "retransmit downstream msg failed", KR(ret));
|
||||
if (OB_TMP_FAIL(retransmit_downstream_msg_())) {
|
||||
TRANS_LOG(WARN, "retransmit downstream msg failed", KR(tmp_ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(retransmit_upstream_msg_(ObTxState::PREPARE))) {
|
||||
TRANS_LOG(WARN, "retransmit upstream msg failed", KR(ret));
|
||||
if (OB_TMP_FAIL(retransmit_upstream_msg_(ObTxState::PREPARE))) {
|
||||
TRANS_LOG(WARN, "retransmit upstream msg failed", KR(tmp_ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObTxState::ABORT: {
|
||||
// Txn may go abort itself, so we need reply the response based on the state
|
||||
// to advance the two phase commit protocol as soon as possible
|
||||
if (OB_FAIL(retransmit_downstream_msg_())) {
|
||||
TRANS_LOG(WARN, "retransmit downstream msg failed", KR(ret));
|
||||
if (OB_TMP_FAIL(retransmit_downstream_msg_())) {
|
||||
TRANS_LOG(WARN, "retransmit downstream msg failed", KR(tmp_ret));
|
||||
}
|
||||
if (OB_FAIL(retransmit_upstream_msg_(ObTxState::ABORT))) {
|
||||
TRANS_LOG(WARN, "retransmit upstream msg failed", KR(ret));
|
||||
if (OB_TMP_FAIL(retransmit_upstream_msg_(ObTxState::ABORT))) {
|
||||
TRANS_LOG(WARN, "retransmit upstream msg failed", KR(tmp_ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObTxState::PRE_COMMIT:
|
||||
case ObTxState::COMMIT: {
|
||||
if (OB_FAIL(retransmit_downstream_msg_())) {
|
||||
TRANS_LOG(WARN, "retransmit downstream msg failed", KR(ret));
|
||||
if (OB_TMP_FAIL(retransmit_downstream_msg_())) {
|
||||
TRANS_LOG(WARN, "retransmit downstream msg failed", KR(tmp_ret));
|
||||
}
|
||||
if (OB_FAIL(retransmit_upstream_msg_(ObTxState::PREPARE))) {
|
||||
TRANS_LOG(WARN, "retransmit upstream msg failed", KR(ret));
|
||||
if (OB_TMP_FAIL(retransmit_upstream_msg_(ObTxState::PREPARE))) {
|
||||
TRANS_LOG(WARN, "retransmit upstream msg failed", KR(tmp_ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObTxState::CLEAR:
|
||||
{
|
||||
case ObTxState::CLEAR: {
|
||||
TRANS_LOG(WARN, "handle orphan request, ignore it", K(ret));
|
||||
break;
|
||||
}
|
||||
@ -239,16 +235,39 @@ int ObTxCycleTwoPhaseCommitter::retransmit_upstream_msg_(const ObTxState state)
|
||||
ObTwoPhaseCommitMsgType msg_type = ObTwoPhaseCommitMsgType::OB_MSG_TX_UNKNOWN;
|
||||
bool need_respond = false;
|
||||
|
||||
if (is_root()) {
|
||||
// root do not respond
|
||||
} else if (is_leaf()) {
|
||||
// leaf need respond
|
||||
need_respond = true;
|
||||
if (get_downstream_state() > get_upstream_state()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
TRANS_LOG(WARN, "Invalid downstream_state", K(ret), KPC(this));
|
||||
|
||||
} else {
|
||||
// need respond if all downstreams has responded
|
||||
need_respond = all_downstream_collected_() ;
|
||||
switch (get_2pc_role()) {
|
||||
// root do not respond
|
||||
case Ob2PCRole::ROOT: {
|
||||
need_respond = false;
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::INTERNAL: {
|
||||
// need respond if all downstreams has responded and submit log succesfully
|
||||
need_respond = (all_downstream_collected_() && get_downstream_state() == state)
|
||||
// dowstream_state <= upstream_state
|
||||
// => state < downstream_state && state < upstream_state
|
||||
// => post response for last phase
|
||||
|| (get_downstream_state() > state);
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::LEAF: {
|
||||
// leaf need respond
|
||||
need_respond = get_downstream_state() >= state;
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_TRANS_INVALID_STATE;
|
||||
TRANS_LOG(WARN, "invalid coord state", KR(ret), K(get_upstream_state()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (need_respond == true) {
|
||||
switch (state) {
|
||||
case ObTxState::INIT: {
|
||||
// It may happen when participant failed to submit the log
|
||||
@ -289,6 +308,8 @@ int ObTxCycleTwoPhaseCommitter::retransmit_upstream_msg_(const ObTxState state)
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && need_respond) {
|
||||
if (OB_TMP_FAIL(post_msg(msg_type, OB_C2PC_UPSTREAM_ID))) {
|
||||
@ -299,28 +320,39 @@ int ObTxCycleTwoPhaseCommitter::retransmit_upstream_msg_(const ObTxState state)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_request_impl_()
|
||||
{
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_request_impl_() {
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
bool no_need_submit_log = false;
|
||||
|
||||
if (is_2pc_logging()) {
|
||||
TRANS_LOG(INFO, "committer is under logging", K(ret), K(*this));
|
||||
} else if (OB_FAIL(do_prepare(no_need_submit_log))) {
|
||||
TRANS_LOG(WARN, "do prepare failed", K(ret), K(*this));
|
||||
} else if (FALSE_IT(set_upstream_state(ObTxState::PREPARE))) {
|
||||
} else if (FALSE_IT(collected_.reset())) {
|
||||
} else if (!no_need_submit_log
|
||||
&& OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_PREPARE))) {
|
||||
if (OB_BLOCK_FROZEN == tmp_ret) {
|
||||
// memtable is freezing, can not submit log right now.
|
||||
} else if (OB_FAIL(apply_2pc_msg_(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ))) {
|
||||
TRANS_LOG(WARN, "apply msg failed", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) {
|
||||
TRANS_LOG(ERROR, "do prepare failed", K(ret), K(*this));
|
||||
} else {
|
||||
TRANS_LOG(WARN, "submit prepare log failed", K(tmp_ret), K(*this));
|
||||
switch (get_2pc_role()) {
|
||||
case Ob2PCRole::ROOT: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "The root should not recive prepare request", K(ret), KPC(this));
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::INTERNAL: {
|
||||
|
||||
if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ))) {
|
||||
TRANS_LOG(WARN, "post prepare msg failed", KR(ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::LEAF: {
|
||||
// do nothing
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected 2pc role", K(ret), K(get_2pc_role()), KPC(this));
|
||||
break;
|
||||
}
|
||||
} else if (!is_root() && !is_leaf()) {
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ))) {
|
||||
TRANS_LOG(WARN, "post prepare msg failed", KR(tmp_ret), KPC(this));
|
||||
}
|
||||
}
|
||||
|
||||
@ -379,15 +411,31 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_commit_request_impl_()
|
||||
|
||||
if (is_2pc_logging()) {
|
||||
TRANS_LOG(INFO, "committer is under logging", K(ret), K(*this));
|
||||
} else if (OB_FAIL(do_commit())) {
|
||||
TRANS_LOG(WARN, "do commit failed", K(ret), K(*this));
|
||||
} else if (FALSE_IT(set_upstream_state(ObTxState::COMMIT))) {
|
||||
} else if (FALSE_IT(collected_.reset())) {
|
||||
} else if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_COMMIT))) {
|
||||
TRANS_LOG(WARN, "submit commit log failed", K(tmp_ret), K(*this));
|
||||
} else if (!is_root() && !is_leaf()) {
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_REQ))) {
|
||||
TRANS_LOG(WARN, "post commit msg failed", KR(tmp_ret), KPC(this));
|
||||
} else if (OB_FAIL(apply_2pc_msg_(ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_REQ))) {
|
||||
TRANS_LOG(WARN, "apply msg failed", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(drive_self_2pc_phase(ObTxState::COMMIT))) {
|
||||
TRANS_LOG(WARN, "enter commit phase failed", K(ret));
|
||||
} else {
|
||||
switch (get_2pc_role()) {
|
||||
case Ob2PCRole::ROOT: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "The root should not recive commit request", K(ret), KPC(this));
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::INTERNAL: {
|
||||
if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_REQ))) {
|
||||
TRANS_LOG(WARN, "post downstream msg failed", K(tmp_ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::LEAF: {
|
||||
// do nothing
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected 2pc role", K(ret), K(get_2pc_role()), KPC(this));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -456,19 +504,34 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_abort_request_impl_()
|
||||
|
||||
if (is_2pc_logging()) {
|
||||
TRANS_LOG(INFO, "committer is under logging", K(ret), K(*this));
|
||||
} else if (OB_FAIL(do_abort())) {
|
||||
TRANS_LOG(WARN, "do commit failed", K(ret), K(*this));
|
||||
} else if (FALSE_IT(set_upstream_state(ObTxState::ABORT))) {
|
||||
} else if (FALSE_IT(collected_.reset())) {
|
||||
} else if (OB_FAIL(apply_2pc_msg_(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_REQ))) {
|
||||
TRANS_LOG(WARN, "apply msg failed", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(drive_self_2pc_phase(ObTxState::ABORT))) {
|
||||
TRANS_LOG(WARN, "enter abort phase failed", K(ret), K(*this));
|
||||
} else {
|
||||
if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_ABORT))) {
|
||||
TRANS_LOG(WARN, "submit abort log failed", K(tmp_ret), K(*this));
|
||||
switch (get_2pc_role()) {
|
||||
case Ob2PCRole::ROOT: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "The root should not recive 2pc abort request", K(ret), KPC(this));
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::INTERNAL: {
|
||||
if (OB_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_REQ))) {
|
||||
TRANS_LOG(WARN, "post abort msg failed", KR(ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::LEAF: {
|
||||
// do nothing
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected 2pc role", K(ret), K(get_2pc_role()), KPC(this));
|
||||
break;
|
||||
}
|
||||
if (!is_leaf() && OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_REQ))) {
|
||||
TRANS_LOG(WARN, "post abort msg failed", KR(tmp_ret), KPC(this));
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -525,9 +588,12 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_pre_commit_request_impl_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (OB_FAIL(try_enter_pre_commit_state())) {
|
||||
if (OB_FAIL(apply_2pc_msg_(ObTwoPhaseCommitMsgType::OB_MSG_TX_PRE_COMMIT_REQ))) {
|
||||
TRANS_LOG(WARN, "apply msg failed", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(try_enter_pre_commit_state())) {
|
||||
TRANS_LOG(WARN, "try_enter_pre_commit_state failed", K(ret), KPC(this));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -572,15 +638,32 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_clear_request_impl_()
|
||||
|
||||
if (is_2pc_logging()) {
|
||||
TRANS_LOG(INFO, "committer is under logging", K(ret), K(*this));
|
||||
} else if (OB_FAIL(do_clear())) {
|
||||
TRANS_LOG(WARN, "do commit failed", K(ret), K(*this));
|
||||
} else if (FALSE_IT(set_upstream_state(ObTxState::CLEAR))) {
|
||||
} else if (FALSE_IT(collected_.reset())) {
|
||||
} else if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_CLEAR))) {
|
||||
TRANS_LOG(WARN, "submit clear log failed", K(tmp_ret), K(*this));
|
||||
} else if (!is_root() && !is_leaf()) {
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ))) {
|
||||
TRANS_LOG(WARN, "post clear msg failed", KR(tmp_ret), KPC(this));
|
||||
} else if (OB_FAIL(apply_2pc_msg_(ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ))) {
|
||||
TRANS_LOG(WARN, "apply msg failed", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(drive_self_2pc_phase(ObTxState::CLEAR))) {
|
||||
TRANS_LOG(WARN, "enter clear phase failed", K(ret), K(*this));
|
||||
} else {
|
||||
switch (get_2pc_role()) {
|
||||
case Ob2PCRole::ROOT: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "The root should not recive clear request", K(ret), KPC(this));
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::INTERNAL: {
|
||||
if (OB_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ))) {
|
||||
TRANS_LOG(WARN, "post clear msg failed", KR(ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::LEAF: {
|
||||
// do nothing
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected 2pc role", K(ret), K(get_2pc_role()), KPC(this));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -682,19 +765,36 @@ int ObTxCycleTwoPhaseCommitter::apply_prepare_log()
|
||||
TRANS_LOG(ERROR, "on prepare failed", K(ret), K(*this), K(state));
|
||||
} else if (OB_FAIL(set_downstream_state(ObTxState::PREPARE))) {
|
||||
TRANS_LOG(ERROR, "set 2pc state failed", K(ret), K(*this), K(state));
|
||||
} else {
|
||||
if (!is_root()
|
||||
&& all_downstream_collected_()) {
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_RESP,
|
||||
OB_C2PC_UPSTREAM_ID))) {
|
||||
TRANS_LOG(WARN, "post prepare response failed", K(tmp_ret), K(*this));
|
||||
}
|
||||
}
|
||||
|
||||
if (is_root()
|
||||
&& all_downstream_collected_()) {
|
||||
} else if (all_downstream_collected_()) {
|
||||
switch (get_2pc_role()) {
|
||||
case Ob2PCRole::ROOT: {
|
||||
if (OB_FAIL(try_enter_pre_commit_state())) {
|
||||
TRANS_LOG(ERROR, "try_enter_pre_commit_state failed", K(ret), KPC(this));
|
||||
TRANS_LOG(WARN, "try enter pre_commit state failed", K(ret), KPC(this));
|
||||
if (OB_EAGAIN == ret) {
|
||||
ret = OB_SUCCESS;
|
||||
// retry by gts callback or handle_timout
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::INTERNAL: {
|
||||
if (OB_TMP_FAIL(
|
||||
post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_RESP, OB_C2PC_UPSTREAM_ID))) {
|
||||
TRANS_LOG(WARN, "post prepare response failed", K(ret), K(*this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::LEAF: {
|
||||
if (OB_TMP_FAIL(
|
||||
post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_RESP, OB_C2PC_UPSTREAM_ID))) {
|
||||
TRANS_LOG(WARN, "post prepare response failed", K(ret), K(*this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
TRANS_LOG(WARN, "invalid 2pc state");
|
||||
ret = OB_TRANS_INVALID_STATE;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -709,9 +809,7 @@ int ObTxCycleTwoPhaseCommitter::apply_commit_log()
|
||||
const ObTxState state = get_downstream_state();
|
||||
const ObTxState upstream_state = get_upstream_state();
|
||||
|
||||
if (ObTxState::PREPARE != state
|
||||
&& ObTxState::PRE_COMMIT != state
|
||||
&& ObTxState::COMMIT != state) {
|
||||
if (ObTxState::PREPARE != state && ObTxState::PRE_COMMIT != state && ObTxState::COMMIT != state) {
|
||||
// We will never apply commit under abort and clear state
|
||||
ret = OB_TRANS_INVALID_STATE;
|
||||
TRANS_LOG(ERROR, "apply commit with wrong state", K(state), K(*this));
|
||||
@ -723,23 +821,33 @@ int ObTxCycleTwoPhaseCommitter::apply_commit_log()
|
||||
} else if (OB_FAIL(set_downstream_state(ObTxState::COMMIT))) {
|
||||
TRANS_LOG(ERROR, "set 2pc state failed", K(ret), K(*this), K(state));
|
||||
} else if (all_downstream_collected_()) {
|
||||
if (is_root()) {
|
||||
set_upstream_state(ObTxState::CLEAR);
|
||||
collected_.reset();
|
||||
if (OB_FAIL(do_clear())) {
|
||||
TRANS_LOG(ERROR, "do clear failed", KR(ret));
|
||||
} else if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ))) {
|
||||
TRANS_LOG(WARN, "post clear request failed", K(tmp_ret), K(*this));
|
||||
switch (get_2pc_role()) {
|
||||
case Ob2PCRole::ROOT: {
|
||||
if (OB_FAIL(drive_self_2pc_phase(ObTxState::CLEAR))) {
|
||||
TRANS_LOG(WARN, "enter into clear phase failed", K(ret), KPC(this));
|
||||
} else if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ))) {
|
||||
TRANS_LOG(WARN, "post downstream msg failed", K(tmp_ret));
|
||||
}
|
||||
|
||||
// TODO, drive it and do_clear via msg
|
||||
if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_CLEAR))) {
|
||||
TRANS_LOG(WARN, "submit clear log failed", K(tmp_ret), K(*this));
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_RESP,
|
||||
OB_C2PC_UPSTREAM_ID))) {
|
||||
TRANS_LOG(WARN, "post commit response failed", K(tmp_ret), K(*this));
|
||||
case Ob2PCRole::INTERNAL: {
|
||||
if (OB_TMP_FAIL(
|
||||
post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_RESP, OB_C2PC_UPSTREAM_ID))) {
|
||||
TRANS_LOG(WARN, "post commit response failed", K(ret), K(*this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::LEAF: {
|
||||
if (OB_TMP_FAIL(
|
||||
post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_RESP, OB_C2PC_UPSTREAM_ID))) {
|
||||
TRANS_LOG(WARN, "post commit response failed", K(ret), K(*this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected 2pc role", K(ret), K(get_2pc_role()), KPC(this));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -772,27 +880,33 @@ int ObTxCycleTwoPhaseCommitter::apply_abort_log()
|
||||
} else if (OB_FAIL(set_downstream_state(ObTxState::ABORT))) {
|
||||
TRANS_LOG(ERROR, "set 2pc state failed", K(ret), K(*this), K(state));
|
||||
} else if (all_downstream_collected_()) {
|
||||
if (is_root()) {
|
||||
set_upstream_state(ObTxState::CLEAR);
|
||||
collected_.reset();
|
||||
// TODO, drive it and submit_log via msg
|
||||
if (OB_FAIL(do_clear())) {
|
||||
TRANS_LOG(ERROR, "do clear failed", KR(ret));
|
||||
} else {
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ))) {
|
||||
switch (get_2pc_role()) {
|
||||
case Ob2PCRole::ROOT: {
|
||||
if (OB_FAIL(drive_self_2pc_phase(ObTxState::CLEAR))) {
|
||||
TRANS_LOG(WARN, "enter into clear phase failed", K(ret), KPC(this));
|
||||
} else if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ))) {
|
||||
TRANS_LOG(WARN, "post clear request failed", K(tmp_ret), K(*this));
|
||||
}
|
||||
|
||||
if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_CLEAR))) {
|
||||
TRANS_LOG(WARN, "submit clear log failed", K(tmp_ret), K(*this));
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::INTERNAL: {
|
||||
if (OB_TMP_FAIL(
|
||||
post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_RESP, OB_C2PC_UPSTREAM_ID))) {
|
||||
TRANS_LOG(WARN, "post abort response failed", K(tmp_ret), K(*this));
|
||||
}
|
||||
} else {
|
||||
// TODO(handora.qc): we can even response abort before successfully
|
||||
// logging
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_RESP,
|
||||
OB_C2PC_UPSTREAM_ID))) {
|
||||
TRANS_LOG(WARN, "post commit response failed", K(tmp_ret), K(*this));
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::LEAF: {
|
||||
if (OB_TMP_FAIL(
|
||||
post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_RESP, OB_C2PC_UPSTREAM_ID))) {
|
||||
TRANS_LOG(WARN, "post abort response failed", K(tmp_ret), K(*this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected 2pc role", K(ret), K(get_2pc_role()), KPC(this));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -953,80 +1067,81 @@ int ObTxCycleTwoPhaseCommitter::recover_from_tx_table()
|
||||
int ObTxCycleTwoPhaseCommitter::try_enter_pre_commit_state()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
bool need_wait = false;
|
||||
|
||||
if (is_2pc_logging()) {
|
||||
TRANS_LOG(INFO, "committer is under executing", KPC(this));
|
||||
} else if (OB_FAIL(do_pre_commit(need_wait))) {
|
||||
TRANS_LOG(WARN, "do pre commit failed", K(ret));
|
||||
} else if (is_root() && need_wait) {
|
||||
TRANS_LOG(DEBUG, "do pre commit need wait gts elapse", KPC(this));
|
||||
} else if (!is_root() && need_wait) {
|
||||
TRANS_LOG(ERROR, "unexpected error, participant need wait commit version elapse", KPC(this));
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
} else if (OB_FAIL(enter_pre_commit_state())) {
|
||||
ret = OB_EAGAIN;
|
||||
TRANS_LOG(INFO, "committer is 2pc logging", KPC(this));
|
||||
} else if (OB_FAIL(drive_self_2pc_phase(ObTxState::PRE_COMMIT))) {
|
||||
if (OB_EAGAIN != ret) {
|
||||
TRANS_LOG(WARN, "drive self 2pc pre_commit phase failed", K(ret), KPC(this));
|
||||
}
|
||||
} else if (OB_FAIL(on_pre_commit())) {
|
||||
TRANS_LOG(WARN, "enter_pre_commit_state failed", K(ret), KPC(this));
|
||||
} else {
|
||||
TRANS_LOG(DEBUG, "enter_pre_commit_state succ", K(ret), KPC(this));
|
||||
}
|
||||
if (is_root() && !need_wait) {
|
||||
// TODO, currently, if a trans only has one participant,
|
||||
// the state can not be drived from pre commit to commit.
|
||||
// Therefore, enter commit state directly.
|
||||
const int64_t SINGLE_COUNT = 1;
|
||||
if (SINGLE_COUNT == get_participants_size()) {
|
||||
set_upstream_state(ObTxState::COMMIT);
|
||||
collected_.reset();
|
||||
// TODO, drive it and submit log via msg
|
||||
if (OB_FAIL(do_commit())) {
|
||||
TRANS_LOG(ERROR, "do commit failed", K(ret), K(*this));
|
||||
} else {
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_REQ))) {
|
||||
TRANS_LOG(WARN, "post commit request failed", K(tmp_ret), K(*this));
|
||||
}
|
||||
if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_COMMIT))) {
|
||||
TRANS_LOG(WARN, "submit commit log failed", K(tmp_ret), K(*this));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
TRANS_LOG(TRACE, "try enter pre commit state", K(ret), KPC(this));
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::enter_pre_commit_state()
|
||||
int ObTxCycleTwoPhaseCommitter::on_pre_commit()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
|
||||
set_upstream_state(ObTxState::PRE_COMMIT);
|
||||
collected_.reset();
|
||||
|
||||
if (OB_FAIL(set_downstream_state(ObTxState::PRE_COMMIT))) {
|
||||
TRANS_LOG(ERROR, "set 2pc state failed", K(*this));
|
||||
} else {
|
||||
if (!is_leaf()) {
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PRE_COMMIT_REQ))) {
|
||||
TRANS_LOG(WARN, "post pre commit msg failed", KR(tmp_ret), KPC(this));
|
||||
switch (get_2pc_role()) {
|
||||
case Ob2PCRole::ROOT: {
|
||||
const int64_t SINGLE_COUNT = 1;
|
||||
if (SINGLE_COUNT == get_downstream_size()) {
|
||||
TRANS_LOG(INFO, "only one participant, skip pre commit", KPC(this));
|
||||
// TODO, currently, if a trans only has one participant,
|
||||
// the state can not be drived from pre commit to commit.
|
||||
// Therefore, enter commit state directly.
|
||||
if (OB_FAIL(drive_self_2pc_phase(ObTxState::COMMIT))) {
|
||||
TRANS_LOG(WARN, "do commit in memory failed", K(ret), KPC(this));
|
||||
}
|
||||
// not need post downstream msg
|
||||
} else {
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PRE_COMMIT_RESP,
|
||||
OB_C2PC_UPSTREAM_ID))) {
|
||||
TRANS_LOG(WARN, "post pre commit response failed", K(tmp_ret), K(*this));
|
||||
if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PRE_COMMIT_REQ))) {
|
||||
TRANS_LOG(WARN, "post pre commit msg failed", KR(ret));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::INTERNAL: {
|
||||
if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PRE_COMMIT_REQ))) {
|
||||
TRANS_LOG(WARN, "post pre commit msg failed", KR(ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::LEAF: {
|
||||
if (OB_TMP_FAIL(
|
||||
post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PRE_COMMIT_RESP, OB_C2PC_UPSTREAM_ID))) {
|
||||
TRANS_LOG(WARN, "post pre commit response failed", K(ret), K(*this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
TRANS_LOG(WARN, "invalid 2pc state", K(*this));
|
||||
ret = OB_TRANS_INVALID_STATE;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::resubmit_downstream_log_()
|
||||
int ObTxCycleTwoPhaseCommitter::resubmit_2pc_log_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
bool need_submit = false;
|
||||
ObTwoPhaseCommitLogType log_type = ObTwoPhaseCommitLogType::OB_LOG_TX_INIT;
|
||||
|
||||
if (OB_FAIL(decide_downstream_log_type_(need_submit, log_type))) {
|
||||
if (OB_FAIL(decide_2pc_log_type_(need_submit, log_type))) {
|
||||
TRANS_LOG(WARN, "decide downstream msg type fail", K(ret), KPC(this));
|
||||
} else if (need_submit && OB_TMP_FAIL(submit_log(log_type))) {
|
||||
TRANS_LOG(WARN, "submit log failed", KR(tmp_ret), K(log_type));
|
||||
@ -1037,7 +1152,7 @@ int ObTxCycleTwoPhaseCommitter::resubmit_downstream_log_()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::decide_downstream_log_type_(bool &need_submit,
|
||||
int ObTxCycleTwoPhaseCommitter::decide_2pc_log_type_(bool &need_submit,
|
||||
ObTwoPhaseCommitLogType &log_type)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1065,6 +1180,7 @@ int ObTxCycleTwoPhaseCommitter::decide_downstream_log_type_(bool &need_submit,
|
||||
break;
|
||||
}
|
||||
case ObTxState::PREPARE: {
|
||||
//TODO dup_table can not submit prepare log before redo sync finished
|
||||
need_submit = true;
|
||||
log_type = ObTwoPhaseCommitLogType::OB_LOG_TX_PREPARE;
|
||||
break;
|
||||
|
||||
@ -31,7 +31,6 @@ int ObTxCycleTwoPhaseCommitter::two_phase_commit()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
const ObTxState state = get_downstream_state();
|
||||
bool no_need_submit_log = false;
|
||||
|
||||
//start 2pc from root
|
||||
@ -39,19 +38,11 @@ int ObTxCycleTwoPhaseCommitter::two_phase_commit()
|
||||
TRANS_LOG(INFO, "already enter two phase commit", K(ret), K(*this));
|
||||
} else if (is_2pc_logging()) {
|
||||
TRANS_LOG(INFO, "committer is under logging", K(ret), K(*this));
|
||||
} else if (OB_FAIL(do_prepare(no_need_submit_log))) {
|
||||
TRANS_LOG(WARN, "do prepare failed", K(ret), K(*this));
|
||||
} else if (OB_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) {
|
||||
TRANS_LOG(WARN, "enter prepare phase failed", K(ret), K(*this));
|
||||
} else {
|
||||
set_upstream_state(ObTxState::PREPARE);
|
||||
collected_.reset();
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ))) {
|
||||
TRANS_LOG(WARN, "post prepare request failed, will retry later", K(tmp_ret), KPC(this));
|
||||
}
|
||||
|
||||
if (no_need_submit_log) {
|
||||
} else if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_PREPARE))) {
|
||||
/* submit prepare log fail is harmless, the 2pc will retry */
|
||||
TRANS_LOG(WARN, "submit prepare log failed, will retry by prepare message", K(tmp_ret), KPC(this));
|
||||
if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ))) {
|
||||
TRANS_LOG(WARN, "post prepare requests failed", K(tmp_ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -67,27 +58,99 @@ int ObTxCycleTwoPhaseCommitter::two_phase_abort()
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
TRANS_LOG(WARN, "abort when in 2pc", K(ret), K(*this));
|
||||
} else if (is_2pc_logging()) {
|
||||
ret = OB_STATE_NOT_MATCH;
|
||||
ret = OB_EAGAIN;
|
||||
TRANS_LOG(WARN, "abort when logging in 2pc", K(ret), K(*this));
|
||||
} else if (OB_FAIL(do_abort())) {
|
||||
TRANS_LOG(WARN, "do commit failed", K(ret), K(*this));
|
||||
} else if (FALSE_IT(collected_.reset())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "reset colloected failed", K(ret), K(*this));
|
||||
} else if (OB_FAIL(drive_self_2pc_phase(ObTxState::ABORT))) {
|
||||
TRANS_LOG(WARN, "enter abort phase failed", K(ret), K(*this));
|
||||
} else {
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_REQ))) {
|
||||
TRANS_LOG(WARN, "post abort request failed", K(tmp_ret), K(*this));
|
||||
|
||||
if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_REQ))) {
|
||||
TRANS_LOG(WARN, "post abort request failed", K(tmp_ret));
|
||||
}
|
||||
|
||||
if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_ABORT))) {
|
||||
TRANS_LOG(WARN, "submit abort log failed", K(tmp_ret), K(*this));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::drive_self_2pc_phase(ObTxState next_phase)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
|
||||
bool no_need_submit_log = false;
|
||||
|
||||
if (is_2pc_logging()) {
|
||||
ret = OB_EAGAIN;
|
||||
TRANS_LOG(WARN, "can not enter next phase when logging", K(ret), KPC(this));
|
||||
// TODO check state
|
||||
} else if (next_phase == get_upstream_state()) {
|
||||
// do nothing about in-memory operation
|
||||
} else {
|
||||
switch (next_phase) {
|
||||
case ObTxState::PREPARE: {
|
||||
if (OB_FAIL(do_prepare(no_need_submit_log))) {
|
||||
TRANS_LOG(WARN, "do prepare in memory failed", K(ret), KPC(this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObTxState::PRE_COMMIT: {
|
||||
bool need_wait = false;
|
||||
if (OB_FAIL(do_pre_commit(need_wait))) {
|
||||
TRANS_LOG(WARN, "do pre commit in memory failed", K(ret), KPC(this));
|
||||
} else if (need_wait && is_root()) {
|
||||
ret = OB_EAGAIN;
|
||||
TRANS_LOG(INFO, "need wait before pre_commit", K(ret), KPC(this));
|
||||
} else if (need_wait && !is_root()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "no need to wait gts on any participant", K(ret), KPC(this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObTxState::COMMIT: {
|
||||
if (OB_FAIL(do_commit())) {
|
||||
TRANS_LOG(WARN, "do commit in memory failed", K(ret), KPC(this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObTxState::CLEAR: {
|
||||
if (OB_FAIL(do_clear())) {
|
||||
TRANS_LOG(WARN, "do clear in memory failed", K(ret), KPC(this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObTxState::ABORT: {
|
||||
if (OB_FAIL(do_abort())) {
|
||||
TRANS_LOG(WARN, "do abort in memory failed", K(ret), KPC(this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected next 2pc phase", K(ret), KPC(this));
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
// do nothing
|
||||
} else {
|
||||
collected_.reset();
|
||||
set_upstream_state(next_phase);
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
// do nothing
|
||||
} else if (!no_need_submit_log && !is_2pc_logging()) {
|
||||
if (OB_TMP_FAIL(resubmit_2pc_log_())) {
|
||||
TRANS_LOG(WARN, "submit log failed", K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_resp(const ObTwoPhaseCommitMsgType msg_type,
|
||||
const uint8_t participant_id)
|
||||
const int64_t participant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -164,8 +227,8 @@ int ObTxCycleTwoPhaseCommitter::handle_timeout()
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
|
||||
if (OB_TMP_FAIL(resubmit_downstream_log_())) {
|
||||
TRANS_LOG(WARN, "resubmit downstream log failed", KR(tmp_ret));
|
||||
if (OB_TMP_FAIL(resubmit_2pc_log_())) {
|
||||
TRANS_LOG(WARN, "resubmit 2pc log failed", KR(tmp_ret));
|
||||
}
|
||||
|
||||
if (!is_leaf() && OB_TMP_FAIL(retransmit_downstream_msg_())) {
|
||||
@ -179,6 +242,22 @@ int ObTxCycleTwoPhaseCommitter::handle_timeout()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::post_downstream_msg(const ObTwoPhaseCommitMsgType msg_type)
|
||||
{
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
|
||||
for (int64_t downstream_id = 0; downstream_id < get_downstream_size(); downstream_id++) {
|
||||
if (downstream_id != get_self_id()) {
|
||||
if (OB_TMP_FAIL(post_msg(msg_type, downstream_id))) {
|
||||
TRANS_LOG(WARN, "post downstream msg failed, will retry later", K(tmp_ret),
|
||||
K(downstream_id), K(msg_type), KPC(this));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return tmp_ret;
|
||||
}
|
||||
|
||||
// retransmit msg to all unresponded downstreams
|
||||
int ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_()
|
||||
{
|
||||
@ -188,11 +267,11 @@ int ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_()
|
||||
bool need_submit = true;
|
||||
|
||||
if (!is_leaf()) {
|
||||
int this_part_id = get_participant_id();
|
||||
int64_t this_part_id = get_self_id();
|
||||
if (OB_FAIL(decide_downstream_msg_type_(need_submit, msg_type))) {
|
||||
TRANS_LOG(WARN, "deecide downstream msg_type fail", K(ret), KPC(this));
|
||||
} else if (need_submit) {
|
||||
for (int64_t i = 0; i < get_participants_size(); ++i) {
|
||||
for (int64_t i = 0; i < get_downstream_size(); ++i) {
|
||||
if (!collected_.has_member(i) && this_part_id != i) {
|
||||
TRANS_LOG(INFO, "unresponded participant", K(i), K(*this));
|
||||
if (OB_TMP_FAIL(post_msg(msg_type, i))) {
|
||||
@ -258,7 +337,7 @@ int ObTxCycleTwoPhaseCommitter::decide_downstream_msg_type_(bool &need_submit,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_(const uint8_t participant)
|
||||
int ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_(const int64_t participant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -274,10 +353,10 @@ int ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_(const uint8_t partici
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_response(const uint8_t participant)
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_response(const int64_t participant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool no_need_submit_log = false;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
|
||||
switch (get_upstream_state()) {
|
||||
case ObTxState::INIT:
|
||||
@ -301,7 +380,7 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_response(const uint8_t partic
|
||||
case ObTxState::ABORT: {
|
||||
// Downstream may lost the request, so we need reply the request based on
|
||||
// the state to advance the two phase commit protocol as soon as possible
|
||||
if (OB_FAIL(retransmit_downstream_msg_(participant))) {
|
||||
if (OB_TMP_FAIL(retransmit_downstream_msg_(participant))) {
|
||||
TRANS_LOG(WARN, "retransmit msg failed", K(ret), K(*this));
|
||||
}
|
||||
break;
|
||||
@ -320,26 +399,42 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_response(const uint8_t partic
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_response_impl_(const uint8_t participant)
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_response_impl_(const int64_t participant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
const ObTxState state = get_downstream_state();
|
||||
const ObTxState downstream_state = get_downstream_state();
|
||||
|
||||
if (OB_FAIL(collect_downstream_(participant))) {
|
||||
if (OB_FAIL(apply_2pc_msg_(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_RESP))) {
|
||||
TRANS_LOG(WARN, "apply 2pc msg failed", K(ret));
|
||||
} else if (OB_FAIL(collect_downstream_(participant))) {
|
||||
TRANS_LOG(ERROR, "add participant to collected list failed", K(participant));
|
||||
} else if (is_2pc_logging()) {
|
||||
// skip if during logging
|
||||
} else if ((ObTxState::PREPARE == state)
|
||||
&& all_downstream_collected_()) {
|
||||
if (is_root()) {
|
||||
} else if ((ObTxState::PREPARE == downstream_state) && all_downstream_collected_()) {
|
||||
switch (get_2pc_role()) {
|
||||
case Ob2PCRole::ROOT: {
|
||||
if (OB_FAIL(try_enter_pre_commit_state())) {
|
||||
TRANS_LOG(WARN, "try_enter_pre_commit_state failed", K(ret), KPC(this));
|
||||
TRANS_LOG(WARN, "try enter pre commit state faild", K(ret));
|
||||
}
|
||||
} else {
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_RESP,
|
||||
OB_C2PC_UPSTREAM_ID))) {
|
||||
TRANS_LOG(WARN, "post prepare response failed", KR(tmp_ret), KPC(this));
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::INTERNAL: {
|
||||
if (OB_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_RESP, OB_C2PC_UPSTREAM_ID))) {
|
||||
TRANS_LOG(WARN, "post prepare response to upstream failed", K(ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::LEAF: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "The leaf should not recive prepare response", K(ret), K(participant),
|
||||
KPC(this));
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected 2pc role", K(ret), K(get_2pc_role()), KPC(this));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -347,7 +442,7 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_response_impl_(const uint8_t
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_commit_response(const uint8_t participant)
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_commit_response(const int64_t participant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -355,7 +450,8 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_commit_response(const uint8_t partici
|
||||
case ObTxState::INIT:
|
||||
case ObTxState::REDO_COMPLETE: {
|
||||
ret = OB_TRANS_PROTOCOL_ERROR;
|
||||
TRANS_LOG(ERROR, "handle 2pc commit response find protocol error", K(get_upstream_state()), K(*this));
|
||||
TRANS_LOG(ERROR, "handle 2pc commit response find protocol error", K(get_upstream_state()),
|
||||
K(*this));
|
||||
break;
|
||||
}
|
||||
case ObTxState::PREPARE:
|
||||
@ -398,7 +494,7 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_commit_response(const uint8_t partici
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_abort_response(const uint8_t participant)
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_abort_response(const int64_t participant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -440,38 +536,40 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_abort_response(const uint8_t particip
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_ack_response_impl_(const uint8_t participant)
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_ack_response_impl_(const int64_t participant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
const ObTxState state = get_downstream_state();
|
||||
const ObTxState downstream_state = get_downstream_state();
|
||||
|
||||
if (OB_FAIL(collect_downstream_(participant))) {
|
||||
if (get_upstream_state() == ObTxState::COMMIT
|
||||
&& apply_2pc_msg_(ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_RESP)) {
|
||||
TRANS_LOG(WARN, "apply ack commit response failed", K(ret), KPC(this));
|
||||
} else if (get_upstream_state() == ObTxState::ABORT
|
||||
&& apply_2pc_msg_(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_RESP)) {
|
||||
TRANS_LOG(WARN, "apply ack abort response failed", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(collect_downstream_(participant))) {
|
||||
TRANS_LOG(ERROR, "add participant to collected list failed", K(participant));
|
||||
} else if (is_2pc_logging()) {
|
||||
// skip if during logging
|
||||
} else if ((ObTxState::COMMIT == state
|
||||
|| ObTxState::ABORT == state)
|
||||
} else if ((ObTxState::COMMIT == downstream_state || ObTxState::ABORT == downstream_state)
|
||||
&& all_downstream_collected_()) {
|
||||
if (is_root()) {
|
||||
set_upstream_state(ObTxState::CLEAR);
|
||||
collected_.reset();
|
||||
// TODO, drive it and submit_log via msg
|
||||
if (OB_FAIL(do_clear())) {
|
||||
TRANS_LOG(WARN, "do clear failed", KR(ret));
|
||||
switch (get_2pc_role()) {
|
||||
case Ob2PCRole::ROOT: {
|
||||
if (OB_FAIL(drive_self_2pc_phase(ObTxState::CLEAR))) {
|
||||
TRANS_LOG(WARN, "enter clear phase failed", K(ret));
|
||||
} else {
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ))) {
|
||||
TRANS_LOG(WARN, "post clear request failed", K(tmp_ret), KPC(this));
|
||||
}
|
||||
if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_CLEAR))) {
|
||||
TRANS_LOG(WARN, "submit clear log failed", K(tmp_ret), KPC(this));
|
||||
if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ))) {
|
||||
TRANS_LOG(WARN, "post clear request failed", K(ret), KPC(this));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::INTERNAL: {
|
||||
ObTwoPhaseCommitMsgType msg_type = ObTwoPhaseCommitMsgType::OB_MSG_TX_UNKNOWN;
|
||||
switch (get_upstream_state()) {
|
||||
case ObTxState::PRE_COMMIT:
|
||||
@ -487,19 +585,33 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_ack_response_impl_(const uint8_t part
|
||||
if (OB_TMP_FAIL(post_msg(msg_type, OB_C2PC_UPSTREAM_ID))) {
|
||||
TRANS_LOG(WARN, "post commit response failed", KR(tmp_ret), KPC(this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::LEAF: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "The leaf should not recive ack response", K(ret), K(participant), KPC(this));
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected 2pc role", K(ret), K(get_2pc_role()), KPC(this));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_abort_response_impl_(const uint8_t participant)
|
||||
// first abort response with init, redo_complete or prepare state.
|
||||
// switch to abort state
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_abort_response_impl_(const int64_t participant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
const ObTxState state = get_downstream_state();
|
||||
const ObTxState downstream_state = get_downstream_state();
|
||||
|
||||
if (ObTxState::INIT != state
|
||||
&& ObTxState::PREPARE != state) {
|
||||
if (ObTxState::INIT != downstream_state && ObTxState::REDO_COMPLETE != downstream_state
|
||||
&& ObTxState::PREPARE != downstream_state) {
|
||||
ret = OB_TRANS_INVALID_STATE;
|
||||
TRANS_LOG(WARN, "invalid state when handle abort response", K(participant), KPC(this));
|
||||
} else if (is_2pc_logging()) {
|
||||
@ -513,30 +625,44 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_abort_response_impl_(const uint8_t pa
|
||||
// state machine to abort process.
|
||||
//
|
||||
// Adopted: For safety, we donot allow the above optimization
|
||||
set_upstream_state(ObTxState::ABORT);
|
||||
collected_.reset();
|
||||
if (OB_FAIL(do_abort())) {
|
||||
TRANS_LOG(WARN, "do abort fail", K(ret), KPC(this));
|
||||
|
||||
|
||||
switch (get_2pc_role()) {
|
||||
case Ob2PCRole::ROOT:
|
||||
case Ob2PCRole::INTERNAL: {
|
||||
if (OB_FAIL(apply_2pc_msg_(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_RESP))) {
|
||||
TRANS_LOG(WARN, "apply msg failed", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(drive_self_2pc_phase(ObTxState::ABORT))) {
|
||||
TRANS_LOG(WARN, "enter abort phase failed", K(ret));
|
||||
} else if (OB_FAIL(collect_downstream_(participant))) {
|
||||
TRANS_LOG(ERROR, "add participant to collected list failed", K(participant));
|
||||
} else {
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_REQ))) {
|
||||
} else if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_REQ))) {
|
||||
TRANS_LOG(WARN, "post commit request failed", K(tmp_ret), K(*this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::LEAF: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "The leaf should not recive abort response", K(ret), K(participant),
|
||||
KPC(this));
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected 2pc role", K(ret), K(get_2pc_role()), KPC(this));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_ABORT))) {
|
||||
TRANS_LOG(WARN, "submit commit log failed", K(tmp_ret), K(*this));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ret = OB_TRANS_INVALID_STATE;
|
||||
TRANS_LOG(WARN, "handle 2pc abort response find a bug", K(ret), K(*this));
|
||||
TRANS_LOG(WARN, "handle 2pc abort response with invalid state", K(ret), K(*this));
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_pre_commit_response(const uint8_t participant)
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_pre_commit_response(const int64_t participant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
@ -544,7 +670,8 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_pre_commit_response(const uint8_t par
|
||||
case ObTxState::INIT:
|
||||
case ObTxState::REDO_COMPLETE: {
|
||||
ret = OB_TRANS_PROTOCOL_ERROR;
|
||||
TRANS_LOG(ERROR, "handle 2pc commit response find protocol error", K(get_upstream_state()), K(*this));
|
||||
TRANS_LOG(ERROR, "handle 2pc commit response find protocol error", K(get_upstream_state()),
|
||||
K(*this));
|
||||
break;
|
||||
}
|
||||
case ObTxState::PREPARE: {
|
||||
@ -561,7 +688,7 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_pre_commit_response(const uint8_t par
|
||||
}
|
||||
case ObTxState::PRE_COMMIT: {
|
||||
if (OB_FAIL(handle_2pc_pre_commit_response_impl_(participant))) {
|
||||
TRANS_LOG(WARN, "retransmit msg failed", K(ret), K(*this));
|
||||
TRANS_LOG(WARN, "handle pre_commit response failed", K(ret), K(*this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -592,43 +719,56 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_pre_commit_response(const uint8_t par
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_pre_commit_response_impl_(const uint8_t participant)
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_pre_commit_response_impl_(const int64_t participant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
|
||||
if (OB_FAIL(collect_downstream_(participant))) {
|
||||
if (OB_FAIL(apply_2pc_msg_(ObTwoPhaseCommitMsgType::OB_MSG_TX_PRE_COMMIT_RESP))) {
|
||||
TRANS_LOG(WARN, "apply msg failed", K(ret), KPC(this));
|
||||
} else if (OB_FAIL(collect_downstream_(participant))) {
|
||||
TRANS_LOG(ERROR, "add participant to collected list failed", K(participant));
|
||||
} else if (all_downstream_collected_()) {
|
||||
if (is_root()) {
|
||||
set_upstream_state(ObTxState::COMMIT);
|
||||
collected_.reset();
|
||||
// TODO, drive it and submit log via msg
|
||||
if (OB_FAIL(do_commit())) {
|
||||
TRANS_LOG(WARN, "do commit failed", K(ret), K(*this));
|
||||
switch (get_2pc_role()) {
|
||||
case Ob2PCRole::ROOT: {
|
||||
if (OB_FAIL(drive_self_2pc_phase(ObTxState::COMMIT))) {
|
||||
TRANS_LOG(WARN, "enter commit phase failed", K(ret));
|
||||
} else {
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_REQ))) {
|
||||
TRANS_LOG(WARN, "post commit request failed", K(tmp_ret), K(*this));
|
||||
}
|
||||
if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_COMMIT))) {
|
||||
TRANS_LOG(WARN, "submit commit log failed", K(tmp_ret), K(*this));
|
||||
}
|
||||
if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_REQ))) {
|
||||
TRANS_LOG(WARN, "post commit request failed", K(ret), K(*this));
|
||||
}
|
||||
|
||||
// TODO, refine in 4.1
|
||||
if (is_sub2pc()) {
|
||||
TRANS_LOG(INFO, "handle pre commit response for sub trans", K(ret));
|
||||
}
|
||||
} else {
|
||||
if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PRE_COMMIT_RESP,
|
||||
OB_C2PC_UPSTREAM_ID))) {
|
||||
TRANS_LOG(WARN, "post prepare response failed", KR(tmp_ret), KPC(this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::INTERNAL: {
|
||||
if (OB_FAIL(
|
||||
post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PRE_COMMIT_RESP, OB_C2PC_UPSTREAM_ID))) {
|
||||
TRANS_LOG(WARN, "post pre_commit response failed", KR(ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::LEAF: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(WARN, "The leaf should not recive pre_commit response", K(ret), K(participant),
|
||||
KPC(this));
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected 2pc role", K(ret), K(get_2pc_role()), KPC(this));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_clear_response(const uint8_t participant)
|
||||
int ObTxCycleTwoPhaseCommitter::handle_2pc_clear_response(const int64_t participant)
|
||||
{
|
||||
UNUSED(participant);
|
||||
return OB_SUCCESS;
|
||||
@ -706,22 +846,42 @@ int ObTxCycleTwoPhaseCommitter::handle_orphan_2pc_clear_response()
|
||||
|
||||
bool ObTxCycleTwoPhaseCommitter::all_downstream_collected_()
|
||||
{
|
||||
if (!is_leaf()) {
|
||||
return collected_.num_members() == get_participants_size() - 1;
|
||||
} else {
|
||||
return true;
|
||||
bool all_collected = false;
|
||||
switch (get_2pc_role()) {
|
||||
case Ob2PCRole::ROOT:
|
||||
case Ob2PCRole::INTERNAL: {
|
||||
all_collected = collected_.num_members() == get_downstream_size() - 1;
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::LEAF: {
|
||||
all_collected = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return all_collected;
|
||||
}
|
||||
|
||||
int ObTxCycleTwoPhaseCommitter::collect_downstream_(const uint8_t participant)
|
||||
int ObTxCycleTwoPhaseCommitter::collect_downstream_(const int64_t participant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (!is_leaf()) {
|
||||
// TODO(handora.qc): remove the logic after donot need send vote to myself
|
||||
if (participant != get_participant_id()) {
|
||||
switch (get_2pc_role()) {
|
||||
case Ob2PCRole::ROOT:
|
||||
case Ob2PCRole::INTERNAL: {
|
||||
if (participant == get_self_id()) {
|
||||
TRANS_LOG(WARN, "recive self 2pc msg", K(participant), KPC(this));
|
||||
} else {
|
||||
ret = collected_.add_member(participant);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case Ob2PCRole::LEAF: {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unexpected 2pc role", K(ret), K(get_2pc_role()), KPC(this));
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
||||
@ -20,6 +20,35 @@ using namespace share;
|
||||
namespace transaction
|
||||
{
|
||||
|
||||
Ob2PCRole ObPartTransCtx::get_2pc_role() const
|
||||
{
|
||||
Ob2PCRole role;
|
||||
|
||||
if (!exec_info_.upstream_.is_valid()) {
|
||||
role = Ob2PCRole::ROOT;
|
||||
} else if (exec_info_.incremental_participants_.empty()) {
|
||||
// not root & downstream is empty
|
||||
// root must not be leaf, because the distributed txn must be composed by
|
||||
// more than one participants.
|
||||
role = Ob2PCRole::LEAF;
|
||||
} else {
|
||||
role = Ob2PCRole::INTERNAL;
|
||||
}
|
||||
|
||||
return role;
|
||||
}
|
||||
|
||||
int64_t ObPartTransCtx::get_self_id()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (self_id_ == -1) {
|
||||
if (OB_FAIL(find_participant_id_(ls_id_, self_id_))) {
|
||||
TRANS_LOG(ERROR, "find participant id failed", K(ret), K(*this));
|
||||
}
|
||||
}
|
||||
return self_id_;
|
||||
}
|
||||
|
||||
int ObPartTransCtx::restart_2pc_trans_timer_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -43,7 +72,6 @@ int ObPartTransCtx::do_prepare(bool &no_need_submit_log)
|
||||
int ret = OB_SUCCESS;
|
||||
no_need_submit_log = false;
|
||||
|
||||
// common operation
|
||||
if (exec_info_.is_dup_tx_ || OB_SUCC(search_unsubmitted_dup_table_redo_())) {
|
||||
no_need_submit_log = true;
|
||||
if (OB_FAIL(dup_table_tx_redo_sync_())) {
|
||||
@ -73,6 +101,7 @@ int ObPartTransCtx::on_prepare()
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
/* Why do we need to execute wait_gts_elapse_commit_version_ successfully before post
|
||||
* the pre-commit message;
|
||||
*
|
||||
@ -183,7 +212,7 @@ int ObPartTransCtx::do_commit()
|
||||
} else if (is_root() && OB_FAIL(coord_prepare_info_arr_.assign(exec_info_.prepare_log_info_arr_))) {
|
||||
TRANS_LOG(WARN, "assign coord_prepare_info_arr_ for root failed", K(ret));
|
||||
} else if (is_root()) {
|
||||
check_and_response_scheduler_(OB_SUCCESS);
|
||||
check_and_response_scheduler_(ObTxState::COMMIT, OB_SUCCESS);
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && OB_FAIL(restart_2pc_trans_timer_())) {
|
||||
@ -192,34 +221,35 @@ int ObPartTransCtx::do_commit()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartTransCtx::check_and_response_scheduler_(int result)
|
||||
int ObPartTransCtx::check_and_response_scheduler_(ObTxState next_phase, int result)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ret = E(EventTable::EN_EARLY_RESPONSE_SCHEDULER) OB_SUCCESS;
|
||||
if (!is_sub2pc() && OB_FAIL(ret)) {
|
||||
// when error inject, response scheduler delayed to CLEAR state
|
||||
if (ObTxState::CLEAR == get_upstream_state()) {
|
||||
if (ObTxState::CLEAR == next_phase) {
|
||||
if (REACH_TIME_INTERVAL(1000 * 1000)) {
|
||||
TRANS_LOG(INFO, "response scheduler in clear state", K(ret), K(*this));
|
||||
}
|
||||
ret = OB_SUCCESS;
|
||||
} else {
|
||||
TRANS_LOG(INFO, "response scheduler in 2pc", K(ret), K(result), KPC(this));
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
// general path, won't response scheduler in CLEAR state
|
||||
if (ObTxState::CLEAR == get_upstream_state()) {
|
||||
if (ObTxState::CLEAR == next_phase) {
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
if (is_sub2pc()) {
|
||||
// TODO, according to part trans action
|
||||
if (ObTxState::COMMIT == get_upstream_state()) {
|
||||
if (ObTxState::COMMIT == next_phase) {
|
||||
if (OB_FAIL(reply_to_scheduler_for_sub2pc(SUBCOMMIT_RESP))) {
|
||||
TRANS_LOG(ERROR, "fail to reply sub commit", K(ret), K(*this));
|
||||
}
|
||||
} else if (ObTxState::ABORT == get_upstream_state() || ObTxState::ABORT == get_downstream_state()) {
|
||||
} else if (ObTxState::ABORT == next_phase || ObTxState::ABORT == get_downstream_state()) {
|
||||
if (OB_FAIL(reply_to_scheduler_for_sub2pc(SUBROLLBACK_RESP))) {
|
||||
TRANS_LOG(ERROR, "fail to reply sub rollback", K(ret), K(*this));
|
||||
}
|
||||
@ -230,6 +260,10 @@ int ObPartTransCtx::check_and_response_scheduler_(int result)
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
TRANS_LOG(INFO, "response scheduler in 2pc", K(ret), K(result), KPC(this));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -264,7 +298,7 @@ int ObPartTransCtx::do_abort()
|
||||
// do nothing
|
||||
} else {
|
||||
if (is_root()) {
|
||||
check_and_response_scheduler_(OB_TRANS_KILLED);
|
||||
check_and_response_scheduler_(ObTxState::ABORT, OB_TRANS_KILLED);
|
||||
}
|
||||
}
|
||||
|
||||
@ -276,7 +310,7 @@ int ObPartTransCtx::on_abort()
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (is_sub2pc() && is_root()) {
|
||||
check_and_response_scheduler_(OB_TRANS_KILLED);
|
||||
check_and_response_scheduler_(ObTxState::ABORT, OB_TRANS_KILLED);
|
||||
}
|
||||
if (OB_FAIL(on_dist_end_(false /*commit*/))) {
|
||||
TRANS_LOG(WARN, "transaciton end error", KR(ret), "context", *this);
|
||||
@ -293,7 +327,7 @@ int ObPartTransCtx::do_clear()
|
||||
|
||||
if (is_root()) {
|
||||
// response scheduler after all participant commit log sycned
|
||||
check_and_response_scheduler_(OB_SUCCESS);
|
||||
check_and_response_scheduler_(ObTxState::CLEAR, OB_SUCCESS);
|
||||
}
|
||||
// currently do nothing
|
||||
|
||||
|
||||
@ -308,7 +308,7 @@ int ObPartTransCtx::post_orphan_msg_(const ObTwoPhaseCommitMsgType &msg_type,
|
||||
}
|
||||
|
||||
int ObPartTransCtx::post_msg(const ObTwoPhaseCommitMsgType& msg_type,
|
||||
const uint8_t participant_id)
|
||||
const int64_t participant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObLSID receiver;
|
||||
@ -332,19 +332,6 @@ int ObPartTransCtx::post_msg(const ObTwoPhaseCommitMsgType& msg_type,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartTransCtx::post_msg(const ObTwoPhaseCommitMsgType &msg_type)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < exec_info_.participants_.count(); ++i) {
|
||||
if (exec_info_.participants_[i] != ls_id_ && OB_FAIL(post_msg(msg_type, i))) {
|
||||
TRANS_LOG(WARN, "post msg failed", KR(ret), K(i), K(*this));
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartTransCtx::set_2pc_upstream_(const ObLSID &upstream)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -470,6 +457,119 @@ int ObPartTransCtx::set_2pc_commit_version_(const SCN &commit_version)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartTransCtx::apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (OB_ISNULL(msg_2pc_cache_)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
TRANS_LOG(WARN, "empty 2pc msg", K(ret));
|
||||
} else if (switch_msg_type_(msg_2pc_cache_->type_) != msg_type) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
TRANS_LOG(WARN, "unexpected 2pc msg type", K(ret), K(msg_type), KPC(msg_2pc_cache_));
|
||||
} else {
|
||||
switch (msg_type) {
|
||||
case ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ: {
|
||||
const Ob2pcPrepareReqMsg &msg = *(static_cast<const Ob2pcPrepareReqMsg *>(msg_2pc_cache_));
|
||||
|
||||
part_trans_action_ = ObPartTransAction::COMMIT;
|
||||
if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) {
|
||||
} else if (OB_FAIL(set_app_trace_info_(msg.app_trace_info_))) {
|
||||
TRANS_LOG(WARN, "set app trace info failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) {
|
||||
TRANS_LOG(WARN, "set coordinator failed", KR(ret), K(msg), K(*this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_RESP: {
|
||||
const Ob2pcPrepareRespMsg &msg = *(static_cast<const Ob2pcPrepareRespMsg *>(msg_2pc_cache_));
|
||||
|
||||
if (OB_FAIL(update_2pc_prepare_version_(msg.prepare_version_))) {
|
||||
TRANS_LOG(WARN, "update prepare version failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(merge_prepare_log_info_(msg.prepare_info_array_))) {
|
||||
TRANS_LOG(WARN, "merge prepare log info failed", K(ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObTwoPhaseCommitMsgType::OB_MSG_TX_PRE_COMMIT_REQ: {
|
||||
|
||||
const Ob2pcPreCommitReqMsg &msg =
|
||||
*(static_cast<const Ob2pcPreCommitReqMsg *>(msg_2pc_cache_));
|
||||
|
||||
if (OB_FAIL(set_2pc_commit_version_(msg.commit_version_))) {
|
||||
TRANS_LOG(WARN, "set commit version failed", KR(ret), K(msg), KPC(this));
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case ObTwoPhaseCommitMsgType::OB_MSG_TX_PRE_COMMIT_RESP: {
|
||||
|
||||
const Ob2pcPreCommitRespMsg &msg =
|
||||
*(static_cast<const Ob2pcPreCommitRespMsg *>(msg_2pc_cache_));
|
||||
|
||||
if (OB_FAIL(update_2pc_prepare_version_(msg.commit_version_))) {
|
||||
TRANS_LOG(WARN, "update prepare version failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(set_2pc_commit_version_(msg.commit_version_))) {
|
||||
TRANS_LOG(WARN, "set commit version failed", KR(ret), K(msg), KPC(this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_REQ: {
|
||||
|
||||
const Ob2pcCommitReqMsg &msg = *(static_cast<const Ob2pcCommitReqMsg *>(msg_2pc_cache_));
|
||||
|
||||
if (OB_FAIL(set_2pc_commit_version_(msg.commit_version_))) {
|
||||
TRANS_LOG(WARN, "set commit version failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(coord_prepare_info_arr_.assign(msg.prepare_info_array_))) {
|
||||
TRANS_LOG(WARN, "assign prepare_log_info_arr_ failed", K(ret));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObTwoPhaseCommitMsgType::OB_MSG_TX_COMMIT_RESP: {
|
||||
|
||||
const Ob2pcCommitRespMsg &msg = *(static_cast<const Ob2pcCommitRespMsg *>(msg_2pc_cache_));
|
||||
|
||||
if (OB_FAIL(set_2pc_commit_version_(msg.commit_version_))) {
|
||||
TRANS_LOG(WARN, "set commit version failed", KR(ret), K(msg), K(*this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_REQ: {
|
||||
|
||||
const Ob2pcAbortReqMsg &msg = *(static_cast<const Ob2pcAbortReqMsg *>(msg_2pc_cache_));
|
||||
|
||||
if (msg.upstream_.is_valid() && // upstream may be invalid for orphan msg
|
||||
OB_FAIL(set_2pc_upstream_(msg.upstream_))) {
|
||||
TRANS_LOG(WARN, "set upstream failed", KR(ret), K(msg), K(*this));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_RESP: {
|
||||
const Ob2pcAbortRespMsg &msg = *(static_cast<const Ob2pcAbortRespMsg *>(msg_2pc_cache_));
|
||||
|
||||
break;
|
||||
}
|
||||
case ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_REQ: {
|
||||
const Ob2pcClearReqMsg &msg = *(static_cast<const Ob2pcClearReqMsg *>(msg_2pc_cache_));
|
||||
|
||||
break;
|
||||
}
|
||||
case ObTwoPhaseCommitMsgType::OB_MSG_TX_CLEAR_RESP: {
|
||||
const Ob2pcClearRespMsg &msg = *(static_cast<const Ob2pcClearRespMsg *>(msg_2pc_cache_));
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
default: {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "unkown 2pc msg", K(ret), K(msg_type), KPC(msg_2pc_cache_), KPC(this));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartTransCtx::handle_tx_2pc_prepare_req(const Ob2pcPrepareReqMsg &msg)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -477,16 +577,13 @@ int ObPartTransCtx::handle_tx_2pc_prepare_req(const Ob2pcPrepareReqMsg &msg)
|
||||
ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type());
|
||||
exec_info_.trans_type_ = TransType::DIST_TRANS;
|
||||
|
||||
if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) {
|
||||
} else if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
|
||||
msg_2pc_cache_ = &msg;
|
||||
if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
|
||||
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(set_app_trace_info_(msg.app_trace_info_))) {
|
||||
TRANS_LOG(WARN, "set app trace info failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) {
|
||||
TRANS_LOG(WARN, "set coordinator failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(handle_2pc_req(msg_type))) {
|
||||
TRANS_LOG(WARN, "handle 2pc request failed", KR(ret), K(msg), K(*this));
|
||||
}
|
||||
msg_2pc_cache_ = nullptr;
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
part_trans_action_ = ObPartTransAction::COMMIT;
|
||||
@ -500,19 +597,17 @@ int ObPartTransCtx::handle_tx_2pc_prepare_resp(const Ob2pcPrepareRespMsg &msg)
|
||||
int ret = OB_SUCCESS;
|
||||
CtxLockGuard guard(lock_);
|
||||
ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type());
|
||||
uint64_t participant_id = UINT64_MAX;
|
||||
int64_t participant_id = INT64_MAX;
|
||||
|
||||
msg_2pc_cache_ = &msg;
|
||||
if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
|
||||
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(update_2pc_prepare_version_(msg.prepare_version_))) {
|
||||
TRANS_LOG(WARN, "update prepare version failed", KR(ret), K(msg), K(*this));
|
||||
} else if(OB_FAIL(merge_prepare_log_info_(msg.prepare_info_array_))){
|
||||
TRANS_LOG(WARN, "merge prepare log info failed",K(ret));
|
||||
} else if (OB_FAIL(find_participant_id_(msg.sender_, participant_id))) {
|
||||
TRANS_LOG(ERROR, "find participant failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(handle_2pc_resp(msg_type, participant_id))) {
|
||||
TRANS_LOG(WARN, "handle 2pc response failed", KR(ret), K(msg), K(participant_id), K(*this));
|
||||
}
|
||||
msg_2pc_cache_ = nullptr;
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -558,7 +653,7 @@ int ObPartTransCtx::handle_tx_2pc_prepare_redo_resp(const Ob2pcPrepareRedoRespMs
|
||||
int ret = OB_SUCCESS;
|
||||
CtxLockGuard guard(lock_);
|
||||
ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type());
|
||||
uint64_t participant_id = UINT64_MAX;
|
||||
int64_t participant_id = INT64_MAX;
|
||||
|
||||
if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
|
||||
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this));
|
||||
@ -602,7 +697,7 @@ int ObPartTransCtx::handle_tx_2pc_prepare_version_resp(const Ob2pcPrepareVersion
|
||||
int ret = OB_SUCCESS;
|
||||
CtxLockGuard guard(lock_);
|
||||
ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type());
|
||||
uint64_t participant_id = UINT64_MAX;
|
||||
int64_t participant_id = INT64_MAX;
|
||||
|
||||
if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
|
||||
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this));
|
||||
@ -625,13 +720,13 @@ int ObPartTransCtx::handle_tx_2pc_pre_commit_req(const Ob2pcPreCommitReqMsg &msg
|
||||
CtxLockGuard guard(lock_);
|
||||
ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type());
|
||||
|
||||
msg_2pc_cache_ = &msg;
|
||||
if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
|
||||
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), KPC(this));
|
||||
} else if (OB_FAIL(set_2pc_commit_version_(msg.commit_version_))) {
|
||||
TRANS_LOG(WARN, "set commit version failed", KR(ret), K(msg), KPC(this));
|
||||
} else if (OB_FAIL(handle_2pc_req(msg_type))) {
|
||||
TRANS_LOG(WARN, "handle 2pc request failed", KR(ret), K(msg), KPC(this));
|
||||
}
|
||||
msg_2pc_cache_ = nullptr;
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -641,19 +736,17 @@ int ObPartTransCtx::handle_tx_2pc_pre_commit_resp(const Ob2pcPreCommitRespMsg &m
|
||||
int ret = OB_SUCCESS;
|
||||
CtxLockGuard guard(lock_);
|
||||
ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type());
|
||||
uint64_t participant_id = UINT64_MAX;
|
||||
int64_t participant_id = INT64_MAX;
|
||||
|
||||
msg_2pc_cache_ = &msg;
|
||||
if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
|
||||
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), KPC(this));
|
||||
} else if (OB_FAIL(update_2pc_prepare_version_(msg.commit_version_))) {
|
||||
TRANS_LOG(WARN, "update prepare version failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(set_2pc_commit_version_(msg.commit_version_))) {
|
||||
TRANS_LOG(WARN, "set commit version failed", KR(ret), K(msg), KPC(this));
|
||||
} else if (OB_FAIL(find_participant_id_(msg.sender_, participant_id))) {
|
||||
TRANS_LOG(ERROR, "find participant failed", KR(ret), K(msg), KPC(this));
|
||||
} else if (OB_FAIL(handle_2pc_resp(msg_type, participant_id))) {
|
||||
TRANS_LOG(WARN, "handle 2pc response failed", KR(ret), K(msg), K(participant_id), KPC(this));
|
||||
}
|
||||
msg_2pc_cache_ = nullptr;
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -664,15 +757,13 @@ int ObPartTransCtx::handle_tx_2pc_commit_req(const Ob2pcCommitReqMsg &msg)
|
||||
CtxLockGuard guard(lock_);
|
||||
ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type());
|
||||
|
||||
msg_2pc_cache_ = &msg;
|
||||
if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
|
||||
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(set_2pc_commit_version_(msg.commit_version_))) {
|
||||
TRANS_LOG(WARN, "set commit version failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(coord_prepare_info_arr_.assign(msg.prepare_info_array_))) {
|
||||
TRANS_LOG(WARN, "assign prepare_log_info_arr_ failed", K(ret));
|
||||
} else if (OB_FAIL(handle_2pc_req(msg_type))) {
|
||||
TRANS_LOG(WARN, "handle 2pc request failed", KR(ret), K(msg), K(*this));
|
||||
}
|
||||
msg_2pc_cache_ = nullptr;
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -682,17 +773,17 @@ int ObPartTransCtx::handle_tx_2pc_commit_resp(const Ob2pcCommitRespMsg &msg)
|
||||
int ret = OB_SUCCESS;
|
||||
CtxLockGuard guard(lock_);
|
||||
ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type());
|
||||
uint64_t participant_id = UINT64_MAX;
|
||||
int64_t participant_id = INT64_MAX;
|
||||
|
||||
msg_2pc_cache_ = &msg;
|
||||
if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
|
||||
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(set_2pc_commit_version_(msg.commit_version_))) {
|
||||
TRANS_LOG(WARN, "set commit version failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(find_participant_id_(msg.sender_, participant_id))) {
|
||||
TRANS_LOG(ERROR, "find participant failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(handle_2pc_resp(msg_type, participant_id))) {
|
||||
TRANS_LOG(WARN, "handle 2pc response failed", KR(ret), K(msg), K(participant_id), K(*this));
|
||||
}
|
||||
msg_2pc_cache_ = nullptr;
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -703,14 +794,13 @@ int ObPartTransCtx::handle_tx_2pc_abort_req(const Ob2pcAbortReqMsg &msg)
|
||||
CtxLockGuard guard(lock_);
|
||||
ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type());
|
||||
|
||||
msg_2pc_cache_ = &msg;
|
||||
if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
|
||||
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this));
|
||||
} else if (msg.upstream_.is_valid() && // upstream may be invalid for orphan msg
|
||||
OB_FAIL(set_2pc_upstream_(msg.upstream_))) {
|
||||
TRANS_LOG(WARN, "set upstream failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(handle_2pc_req(msg_type))) {
|
||||
TRANS_LOG(WARN, "handle 2pc request failed", KR(ret), K(msg), K(*this));
|
||||
}
|
||||
msg_2pc_cache_ = nullptr;
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -720,8 +810,9 @@ int ObPartTransCtx::handle_tx_2pc_abort_resp(const Ob2pcAbortRespMsg &msg)
|
||||
int ret = OB_SUCCESS;
|
||||
CtxLockGuard guard(lock_);
|
||||
ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type());
|
||||
uint64_t participant_id = UINT64_MAX;
|
||||
int64_t participant_id = INT64_MAX;
|
||||
|
||||
msg_2pc_cache_ = &msg;
|
||||
if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
|
||||
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(find_participant_id_(msg.sender_, participant_id))) {
|
||||
@ -729,6 +820,7 @@ int ObPartTransCtx::handle_tx_2pc_abort_resp(const Ob2pcAbortRespMsg &msg)
|
||||
} else if (OB_FAIL(handle_2pc_resp(msg_type, participant_id))) {
|
||||
TRANS_LOG(WARN, "handle 2pc response failed", KR(ret), K(msg), K(participant_id), K(*this));
|
||||
}
|
||||
msg_2pc_cache_ = nullptr;
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -739,11 +831,13 @@ int ObPartTransCtx::handle_tx_2pc_clear_req(const Ob2pcClearReqMsg &msg)
|
||||
CtxLockGuard guard(lock_);
|
||||
ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type());
|
||||
|
||||
msg_2pc_cache_ = &msg;
|
||||
if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
|
||||
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(handle_2pc_req(msg_type))) {
|
||||
TRANS_LOG(WARN, "handle 2pc request failed", KR(ret), K(msg), K(*this));
|
||||
}
|
||||
msg_2pc_cache_ = nullptr;
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -753,8 +847,9 @@ int ObPartTransCtx::handle_tx_2pc_clear_resp(const Ob2pcClearRespMsg &msg)
|
||||
int ret = OB_SUCCESS;
|
||||
CtxLockGuard guard(lock_);
|
||||
ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type());
|
||||
uint64_t participant_id = UINT64_MAX;
|
||||
int64_t participant_id = INT64_MAX;
|
||||
|
||||
msg_2pc_cache_ = &msg;
|
||||
if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
|
||||
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this));
|
||||
} else if (OB_FAIL(find_participant_id_(msg.sender_, participant_id))) {
|
||||
@ -762,6 +857,7 @@ int ObPartTransCtx::handle_tx_2pc_clear_resp(const Ob2pcClearRespMsg &msg)
|
||||
} else if (OB_FAIL(handle_2pc_resp(msg_type, participant_id))) {
|
||||
TRANS_LOG(WARN, "handle 2pc response failed", KR(ret), K(msg), K(participant_id), K(*this));
|
||||
}
|
||||
msg_2pc_cache_ = nullptr;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -129,22 +129,29 @@ int MockOb2pcCtx::on_clear()
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
|
||||
bool MockOb2pcCtx::is_root() const
|
||||
Ob2PCRole MockOb2pcCtx::get_2pc_role() const
|
||||
{
|
||||
return participants_.size() != 0;
|
||||
|
||||
Ob2PCRole role;
|
||||
|
||||
if (participants_.size()!=0) {
|
||||
role = Ob2PCRole::ROOT;
|
||||
} else if (participants_.size()==0) {
|
||||
// not root & downstream is empty
|
||||
role = Ob2PCRole::LEAF;
|
||||
} else {
|
||||
role = Ob2PCRole::INTERNAL;
|
||||
}
|
||||
|
||||
return role;
|
||||
}
|
||||
|
||||
bool MockOb2pcCtx::is_leaf() const
|
||||
{
|
||||
return participants_.size() == 0;
|
||||
}
|
||||
|
||||
int64_t MockOb2pcCtx::get_participants_size()
|
||||
int64_t MockOb2pcCtx::get_downstream_size() const
|
||||
{
|
||||
return participants_.size();
|
||||
}
|
||||
|
||||
uint64_t MockOb2pcCtx::get_participant_id()
|
||||
int64_t MockOb2pcCtx::get_self_id()
|
||||
{
|
||||
int participant_id = 0;
|
||||
|
||||
@ -163,7 +170,7 @@ int MockOb2pcCtx::submit_log(const ObTwoPhaseCommitLogType& log_type)
|
||||
}
|
||||
|
||||
int MockOb2pcCtx::post_msg(const ObTwoPhaseCommitMsgType& msg_type,
|
||||
const uint8_t participant)
|
||||
const int64_t participant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t to = 0;
|
||||
@ -200,17 +207,11 @@ int MockOb2pcCtx::post_msg(const ObTwoPhaseCommitMsgType& msg_type,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int MockOb2pcCtx::post_msg(const ObTwoPhaseCommitMsgType& msg_type)
|
||||
int MockOb2pcCtx::apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
for (int i = 0; i < participants_.size(); i++) {
|
||||
if (OB_FAIL(post_msg(msg_type, i))) {
|
||||
TRANS_LOG(WARN, "send mailbox failed", K(ret), K(msg_type));
|
||||
}
|
||||
}
|
||||
|
||||
TRANS_LOG(INFO, "post msg to participants success", K(*this), K(msg_type));
|
||||
UNUSED(msg_type);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -112,10 +112,9 @@ protected:
|
||||
// the 2pc state to the dst. Otherwise, the 2pc will invoke a cycle style commit
|
||||
// which the parent node waits for the node's 2pc state response before responsing
|
||||
// its 2pc state to his parent node.
|
||||
virtual int64_t get_participants_size() override;
|
||||
virtual uint64_t get_participant_id() override;
|
||||
virtual bool is_root() const override;
|
||||
virtual bool is_leaf() const override;
|
||||
virtual int64_t get_downstream_size() const override;
|
||||
virtual int64_t get_self_id() override;
|
||||
virtual Ob2PCRole get_2pc_role() const override;
|
||||
virtual ObTxState get_downstream_state() const override;
|
||||
virtual int set_downstream_state(const ObTxState state) override;
|
||||
virtual ObTxState get_upstream_state() const override;
|
||||
@ -132,11 +131,12 @@ protected:
|
||||
virtual int submit_log(const ObTwoPhaseCommitLogType& log_type) override;
|
||||
// The msg poster is best effort, so we need rely on the timeout handler to retry the last
|
||||
// message should be sent
|
||||
virtual int post_msg(const ObTwoPhaseCommitMsgType& msg_type) override;
|
||||
// virtual int post_msg(const ObTwoPhaseCommitMsgType& msg_type) override;
|
||||
virtual int post_msg(const ObTwoPhaseCommitMsgType& msg_type,
|
||||
const uint8_t participant_id) override;
|
||||
const int64_t participant_id) override;
|
||||
private:
|
||||
int64_t find_participant_id(int64_t participant_key);
|
||||
virtual int apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type) override;
|
||||
|
||||
private:
|
||||
common::ObSpinLock latch_;
|
||||
|
||||
Reference in New Issue
Block a user