[XA] simple adaption for xa

This commit is contained in:
Handora 2023-03-07 03:46:07 +00:00 committed by ob-robot
parent 20dbea5ce3
commit e4a3438573
3 changed files with 33 additions and 25 deletions

View File

@ -36,9 +36,6 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_redo_request()
case ObTxState::INIT: {
if (OB_FAIL(handle_2pc_prepare_redo_request_impl_())) {
TRANS_LOG(WARN, "handle 2pc prepare request failed", K(ret), K(*this));
} else {
collected_.reset();
set_upstream_state(ObTxState::REDO_COMPLETE);
}
break;
}
@ -107,12 +104,10 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_redo_request_impl_()
TRANS_LOG(WARN, "unexpected operation", K(ret), K(*this));
} else if (is_2pc_logging()) {
TRANS_LOG(INFO, "committer is under logging", K(ret), K(*this));
} else if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_COMMIT_INFO))) {
if (OB_BLOCK_FROZEN == tmp_ret) {
// memtable is freezing, can not submit log right now.
} else {
TRANS_LOG(WARN, "submit commit info log failed", K(tmp_ret), K(*this));
}
} else if (OB_FAIL(apply_2pc_msg_(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REDO_REQ))) {
TRANS_LOG(WARN, "apply msg failed", K(ret), KPC(this));
} else if (OB_FAIL(drive_self_2pc_phase(ObTxState::REDO_COMPLETE))) {
TRANS_LOG(WARN, "drive 2pc phase", K(ret), K(*this));
} else if (is_internal()) {
if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REDO_REQ))) {
TRANS_LOG(WARN, "post prepare redo msg failed", KR(tmp_ret), KPC(this));

View File

@ -87,6 +87,9 @@ int ObTxCycleTwoPhaseCommitter::drive_self_2pc_phase(ObTxState next_phase)
// do nothing about in-memory operation
} else {
switch (next_phase) {
case ObTxState::REDO_COMPLETE: {
break;
}
case ObTxState::PREPARE: {
if (OB_FAIL(do_prepare(no_need_submit_log))) {
TRANS_LOG(WARN, "do prepare in memory failed", K(ret), KPC(this));

View File

@ -481,12 +481,24 @@ int ObPartTransCtx::apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type)
TRANS_LOG(WARN, "unexpected 2pc msg type", K(ret), K(msg_type), KPC(msg_2pc_cache_));
} else {
switch (msg_type) {
case ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REDO_REQ: {
const Ob2pcPrepareRedoReqMsg &msg = *(static_cast<const Ob2pcPrepareRedoReqMsg *>(msg_2pc_cache_));
if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) {
} else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) {
TRANS_LOG(WARN, "set coordinator failed", KR(ret), K(msg), K(*this));
} else if (OB_FAIL(set_app_trace_info_(msg.app_trace_info_))) {
TRANS_LOG(WARN, "set app trace info failed", KR(ret), K(msg), K(*this));
} else {
exec_info_.xid_ = msg.xid_;
exec_info_.is_sub2pc_ = true;
}
break;
}
case ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ: {
if (is_sub2pc()) {
// prepare version for xa trans
// these actions has been done in entrance function handle_tx_2pc_prepare_version_req
} else {
// if modify logic here, please check codes in handle_tx_2pc_prepare_redo (version)
const Ob2pcPrepareReqMsg &msg = *(static_cast<const Ob2pcPrepareReqMsg *>(msg_2pc_cache_));
if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) {
@ -663,23 +675,21 @@ int ObPartTransCtx::handle_tx_2pc_prepare_redo_req(const Ob2pcPrepareRedoReqMsg
int ret = OB_SUCCESS;
CtxLockGuard guard(lock_);
ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type());
exec_info_.trans_type_ = TransType::DIST_TRANS;
exec_info_.xid_ = msg.xid_;
exec_info_.is_sub2pc_ = true;
if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) {
} else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) {
TRANS_LOG(WARN, "set coordinator failed", KR(ret), K(msg), K(*this));
} else if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this));
} else if (OB_FAIL(set_app_trace_info_(msg.app_trace_info_))) {
TRANS_LOG(WARN, "set app trace info failed", KR(ret), K(msg), K(*this));
} else if (OB_FAIL(handle_2pc_req(msg_type))) {
TRANS_LOG(WARN, "handle 2pc request failed", KR(ret), K(msg), K(*this));
}
if (!is_2pc_logging()) {
exec_info_.xid_ = msg.xid_;
exec_info_.is_sub2pc_ = true;
msg_2pc_cache_ = &msg;
if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this));
} else if (OB_FAIL(handle_2pc_req(msg_type))) {
TRANS_LOG(WARN, "handle 2pc request failed", KR(ret), K(msg), K(*this));
}
if (OB_SUCC(ret)) {
part_trans_action_ = ObPartTransAction::COMMIT;
if (OB_SUCC(ret)) {
part_trans_action_ = ObPartTransAction::COMMIT;
}
msg_2pc_cache_ = nullptr;
}
TRANS_LOG(INFO, "handle prepare redo request", KR(ret), K(msg));