[4.1][xa] refine prepare version in 2pc to support apply msg

This commit is contained in:
obdev 2022-12-30 07:38:16 +00:00 committed by ob-robot
parent 8aa24cb9e2
commit 5d176c1b0d

View File

@ -470,23 +470,35 @@ int ObPartTransCtx::apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type)
} else {
switch (msg_type) {
case ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ: {
const Ob2pcPrepareReqMsg &msg = *(static_cast<const Ob2pcPrepareReqMsg *>(msg_2pc_cache_));
if (is_sub2pc()) {
// prepare version for xa trans
// these actions has been done in entrance function handle_tx_2pc_prepare_version_req
} else {
// if modify logic here, please check codes in handle_tx_2pc_prepare_redo (version)
const Ob2pcPrepareReqMsg &msg = *(static_cast<const Ob2pcPrepareReqMsg *>(msg_2pc_cache_));
if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) {
} else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) {
TRANS_LOG(WARN, "set coordinator failed", KR(ret), K(msg), K(*this));
} else if (OB_FAIL(set_app_trace_info_(msg.app_trace_info_))) {
TRANS_LOG(WARN, "set app trace info failed", KR(ret), K(msg), K(*this));
if (FALSE_IT(set_trans_type_(TransType::DIST_TRANS))) {
} else if (OB_FAIL(set_2pc_upstream_(msg.upstream_))) {
TRANS_LOG(WARN, "set coordinator failed", KR(ret), K(msg), K(*this));
} else if (OB_FAIL(set_app_trace_info_(msg.app_trace_info_))) {
TRANS_LOG(WARN, "set app trace info failed", KR(ret), K(msg), K(*this));
}
}
break;
}
case ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_RESP: {
const Ob2pcPrepareRespMsg &msg = *(static_cast<const Ob2pcPrepareRespMsg *>(msg_2pc_cache_));
if (is_sub2pc()) {
// prepare version for xa trans
// these actions has been done in entrance function handle_tx_2pc_prepare_version_resp
} else {
// if modify logic here, please check codes in handle_tx_2pc_prepare_redo (version)
const Ob2pcPrepareRespMsg &msg = *(static_cast<const Ob2pcPrepareRespMsg *>(msg_2pc_cache_));
if (OB_FAIL(update_2pc_prepare_version_(msg.prepare_version_))) {
TRANS_LOG(WARN, "update prepare version failed", KR(ret), K(msg), K(*this));
} else if (OB_FAIL(merge_prepare_log_info_(msg.prepare_info_array_))) {
TRANS_LOG(WARN, "merge prepare log info failed", K(ret));
if (OB_FAIL(update_2pc_prepare_version_(msg.prepare_version_))) {
TRANS_LOG(WARN, "update prepare version failed", KR(ret), K(msg), K(*this));
} else if (OB_FAIL(merge_prepare_log_info_(msg.prepare_info_array_))) {
TRANS_LOG(WARN, "merge prepare log info failed", K(ret));
}
}
break;
}
@ -674,6 +686,7 @@ int ObPartTransCtx::handle_tx_2pc_prepare_version_req(const Ob2pcPrepareVersionR
int ret = OB_SUCCESS;
CtxLockGuard guard(lock_);
ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type());
msg_2pc_cache_ = &msg;
if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this));
} else if (OB_FAIL(handle_2pc_req(msg_type))) {
@ -683,6 +696,7 @@ int ObPartTransCtx::handle_tx_2pc_prepare_version_req(const Ob2pcPrepareVersionR
if (OB_SUCC(ret)) {
part_trans_action_ = ObPartTransAction::COMMIT;
}
msg_2pc_cache_ = nullptr;
return ret;
}
@ -697,6 +711,7 @@ int ObPartTransCtx::handle_tx_2pc_prepare_version_resp(const Ob2pcPrepareVersion
CtxLockGuard guard(lock_);
ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type());
int64_t participant_id = INT64_MAX;
msg_2pc_cache_ = &msg;
if (OB_FAIL(set_2pc_request_id_(msg.request_id_))) {
TRANS_LOG(WARN, "set request id failed", KR(ret), K(msg), K(*this));
@ -709,6 +724,7 @@ int ObPartTransCtx::handle_tx_2pc_prepare_version_resp(const Ob2pcPrepareVersion
} else if (OB_FAIL(handle_2pc_resp(msg_type, participant_id))) {
TRANS_LOG(WARN, "handle 2pc response failed", KR(ret), K(msg), K(participant_id), K(*this));
}
msg_2pc_cache_ = nullptr;
return ret;
}