diff --git a/src/storage/tx/ob_two_phase_committer_xa.cpp b/src/storage/tx/ob_two_phase_committer_xa.cpp index 8d5a9a43b..63a1b898c 100644 --- a/src/storage/tx/ob_two_phase_committer_xa.cpp +++ b/src/storage/tx/ob_two_phase_committer_xa.cpp @@ -36,9 +36,6 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_redo_request() case ObTxState::INIT: { if (OB_FAIL(handle_2pc_prepare_redo_request_impl_())) { TRANS_LOG(WARN, "handle 2pc prepare request failed", K(ret), K(*this)); - } else { - collected_.reset(); - set_upstream_state(ObTxState::REDO_COMPLETE); } break; } @@ -107,12 +104,10 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_redo_request_impl_() TRANS_LOG(WARN, "unexpected operation", K(ret), K(*this)); } else if (is_2pc_logging()) { TRANS_LOG(INFO, "committer is under logging", K(ret), K(*this)); - } else if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_COMMIT_INFO))) { - if (OB_BLOCK_FROZEN == tmp_ret) { - // memtable is freezing, can not submit log right now. - } else { - TRANS_LOG(WARN, "submit commit info log failed", K(tmp_ret), K(*this)); - } + } else if (OB_FAIL(apply_2pc_msg_(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REDO_REQ))) { + TRANS_LOG(WARN, "apply msg failed", K(ret), KPC(this)); + } else if (OB_FAIL(drive_self_2pc_phase(ObTxState::REDO_COMPLETE))) { + TRANS_LOG(WARN, "drive 2pc phase", K(ret), K(*this)); } else if (is_internal()) { if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REDO_REQ))) { TRANS_LOG(WARN, "post prepare redo msg failed", KR(tmp_ret), KPC(this)); diff --git a/src/storage/tx/ob_two_phase_upstream_committer.cpp b/src/storage/tx/ob_two_phase_upstream_committer.cpp index 04df0d77a..6e717b2a1 100644 --- a/src/storage/tx/ob_two_phase_upstream_committer.cpp +++ b/src/storage/tx/ob_two_phase_upstream_committer.cpp @@ -87,6 +87,9 @@ int ObTxCycleTwoPhaseCommitter::drive_self_2pc_phase(ObTxState next_phase) // do nothing about in-memory operation } else { switch (next_phase) { + case ObTxState::REDO_COMPLETE: { + break; + } case ObTxState::PREPARE: { if (OB_FAIL(do_prepare(no_need_submit_log))) { TRANS_LOG(WARN, "do prepare in memory failed", K(ret), KPC(this)); diff --git a/src/storage/tx/ob_tx_2pc_msg_handler.cpp b/src/storage/tx/ob_tx_2pc_msg_handler.cpp index 6d6a66c04..bd7a59a61 100644 --- a/src/storage/tx/ob_tx_2pc_msg_handler.cpp +++ b/src/storage/tx/ob_tx_2pc_msg_handler.cpp @@ -481,12 +481,24 @@ int ObPartTransCtx::apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type) TRANS_LOG(WARN, "unexpected 2pc msg type", K(ret), K(msg_type), KPC(msg_2pc_cache_)); } else { switch (msg_type) { + case ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REDO_REQ: { + const Ob2pcPrepareRedoReqMsg &msg = *(static_cast(msg_2pc_cache_)); + if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) { + } else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) { + TRANS_LOG(WARN, "set coordinator failed", KR(ret), K(msg), K(*this)); + } else if (OB_FAIL(set_app_trace_info_(msg.app_trace_info_))) { + TRANS_LOG(WARN, "set app trace info failed", KR(ret), K(msg), K(*this)); + } else { + exec_info_.xid_ = msg.xid_; + exec_info_.is_sub2pc_ = true; + } + break; + } case ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ: { if (is_sub2pc()) { // prepare version for xa trans // these actions has been done in entrance function handle_tx_2pc_prepare_version_req } else { - // if modify logic here, please check codes in handle_tx_2pc_prepare_redo (version) const Ob2pcPrepareReqMsg &msg = *(static_cast(msg_2pc_cache_)); if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) { @@ -663,23 +675,21 @@ int ObPartTransCtx::handle_tx_2pc_prepare_redo_req(const Ob2pcPrepareRedoReqMsg int ret = OB_SUCCESS; CtxLockGuard guard(lock_); ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type()); - exec_info_.trans_type_ = TransType::DIST_TRANS; - exec_info_.xid_ = msg.xid_; - exec_info_.is_sub2pc_ = true; - if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) { - } else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) { - TRANS_LOG(WARN, "set coordinator failed", KR(ret), K(msg), K(*this)); - } else if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) { - TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this)); - } else if (OB_FAIL(set_app_trace_info_(msg.app_trace_info_))) { - TRANS_LOG(WARN, "set app trace info failed", KR(ret), K(msg), K(*this)); - } else if (OB_FAIL(handle_2pc_req(msg_type))) { - TRANS_LOG(WARN, "handle 2pc request failed", KR(ret), K(msg), K(*this)); - } + if (!is_2pc_logging()) { + exec_info_.xid_ = msg.xid_; + exec_info_.is_sub2pc_ = true; + msg_2pc_cache_ = &msg; + if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) { + TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this)); + } else if (OB_FAIL(handle_2pc_req(msg_type))) { + TRANS_LOG(WARN, "handle 2pc request failed", KR(ret), K(msg), K(*this)); + } - if (OB_SUCC(ret)) { - part_trans_action_ = ObPartTransAction::COMMIT; + if (OB_SUCC(ret)) { + part_trans_action_ = ObPartTransAction::COMMIT; + } + msg_2pc_cache_ = nullptr; } TRANS_LOG(INFO, "handle prepare redo request", KR(ret), K(msg));