diff --git a/src/storage/tx/ob_tx_2pc_msg_handler.cpp b/src/storage/tx/ob_tx_2pc_msg_handler.cpp index 4fb480023..1663ae72e 100644 --- a/src/storage/tx/ob_tx_2pc_msg_handler.cpp +++ b/src/storage/tx/ob_tx_2pc_msg_handler.cpp @@ -470,23 +470,35 @@ int ObPartTransCtx::apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type) } else { switch (msg_type) { case ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ: { - const Ob2pcPrepareReqMsg &msg = *(static_cast(msg_2pc_cache_)); + if (is_sub2pc()) { + // prepare version for xa trans + // these actions has been done in entrance function handle_tx_2pc_prepare_version_req + } else { + // if modify logic here, please check codes in handle_tx_2pc_prepare_redo (version) + const Ob2pcPrepareReqMsg &msg = *(static_cast(msg_2pc_cache_)); - if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) { - } else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) { - TRANS_LOG(WARN, "set coordinator failed", KR(ret), K(msg), K(*this)); - } else if (OB_FAIL(set_app_trace_info_(msg.app_trace_info_))) { - TRANS_LOG(WARN, "set app trace info failed", KR(ret), K(msg), K(*this)); + if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) { + } else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) { + TRANS_LOG(WARN, "set coordinator failed", KR(ret), K(msg), K(*this)); + } else if (OB_FAIL(set_app_trace_info_(msg.app_trace_info_))) { + TRANS_LOG(WARN, "set app trace info failed", KR(ret), K(msg), K(*this)); + } } break; } case ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_RESP: { - const Ob2pcPrepareRespMsg &msg = *(static_cast(msg_2pc_cache_)); + if (is_sub2pc()) { + // prepare version for xa trans + // these actions has been done in entrance function handle_tx_2pc_prepare_version_resp + } else { + // if modify logic here, please check codes in handle_tx_2pc_prepare_redo (version) + const Ob2pcPrepareRespMsg &msg = *(static_cast(msg_2pc_cache_)); - if (OB_FAIL(update_2pc_prepare_version_(msg.prepare_version_))) { - TRANS_LOG(WARN, "update prepare version failed", KR(ret), K(msg), K(*this)); - } else if (OB_FAIL(merge_prepare_log_info_(msg.prepare_info_array_))) { - TRANS_LOG(WARN, "merge prepare log info failed", K(ret)); + if (OB_FAIL(update_2pc_prepare_version_(msg.prepare_version_))) { + TRANS_LOG(WARN, "update prepare version failed", KR(ret), K(msg), K(*this)); + } else if (OB_FAIL(merge_prepare_log_info_(msg.prepare_info_array_))) { + TRANS_LOG(WARN, "merge prepare log info failed", K(ret)); + } } break; } @@ -674,6 +686,7 @@ int ObPartTransCtx::handle_tx_2pc_prepare_version_req(const Ob2pcPrepareVersionR int ret = OB_SUCCESS; CtxLockGuard guard(lock_); ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type()); + msg_2pc_cache_ = &msg; if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) { TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this)); } else if (OB_FAIL(handle_2pc_req(msg_type))) { @@ -683,6 +696,7 @@ int ObPartTransCtx::handle_tx_2pc_prepare_version_req(const Ob2pcPrepareVersionR if (OB_SUCC(ret)) { part_trans_action_ = ObPartTransAction::COMMIT; } + msg_2pc_cache_ = nullptr; return ret; } @@ -697,6 +711,7 @@ int ObPartTransCtx::handle_tx_2pc_prepare_version_resp(const Ob2pcPrepareVersion CtxLockGuard guard(lock_); ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type()); int64_t participant_id = INT64_MAX; + msg_2pc_cache_ = &msg; if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) { TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this)); @@ -709,6 +724,7 @@ int ObPartTransCtx::handle_tx_2pc_prepare_version_resp(const Ob2pcPrepareVersion } else if (OB_FAIL(handle_2pc_resp(msg_type, participant_id))) { TRANS_LOG(WARN, "handle 2pc response failed", KR(ret), K(msg), K(participant_id), K(*this)); } + msg_2pc_cache_ = nullptr; return ret; }