From 7be9ef2e431bbd00c83c4be67fc40ce9aac3ea3a Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 10 May 2023 09:09:54 +0000 Subject: [PATCH] fix bugs in 2pc of xa --- src/storage/tx/ob_trans_part_ctx.cpp | 11 ++++++++--- src/storage/tx/ob_two_phase_committer_xa.cpp | 2 +- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index ec00c016c3..c9cb2f7d10 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -5078,7 +5078,7 @@ int ObPartTransCtx::switch_to_leader(const SCN &start_working_ts) } else { TRANS_LOG(WARN, "txn data incomplete, will be aborted", K(contain_table_lock), KPC(this)); if (has_persisted_log_()) { - if (ObPartTransAction::COMMIT == part_trans_action_ || get_upstream_state() >= ObTxState::PREPARE) { + if (ObPartTransAction::COMMIT == part_trans_action_ || get_upstream_state() >= ObTxState::REDO_COMPLETE) { TRANS_LOG(WARN, "abort self instantly with a tx_commit request", K(contain_table_lock), KPC(this)); @@ -6101,7 +6101,6 @@ int ObPartTransCtx::sub_prepare(const ObLSArray &parts, } else if (OB_UNLIKELY(is_2pc_logging_())) { TRANS_LOG(WARN, "tx is 2pc logging", KPC(this)); } else if (ObTxState::INIT != upstream_state_) { - // TODO, consider prepare state if (is_prepared_sub2pc()) { if (OB_FAIL(post_tx_sub_prepare_resp_(OB_SUCCESS))) { TRANS_LOG(WARN, "fail to post sub prepare response", K(ret), KPC(this)); @@ -6114,6 +6113,12 @@ int ObPartTransCtx::sub_prepare(const ObLSArray &parts, } else if (OB_UNLIKELY(pending_write_)) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "access in progress", K(ret), K_(pending_write), KPC(this)); + } else if (sub_state_.is_force_abort()) { + if (OB_FAIL(compensate_abort_log_())) { + TRANS_LOG(WARN, "compensate abort log failed", K(ret), KPC(this)); + } else { + ret = OB_TRANS_KILLED; + } } else if (OB_FAIL(set_2pc_participants_(parts))) { TRANS_LOG(WARN, "set participants failed", K(ret), KPC(this)); } else if (OB_FAIL(set_2pc_request_id_(request_id))) { @@ -6162,7 +6167,7 @@ int ObPartTransCtx::sub_end_tx(const int64_t &request_id, } else if (xid.empty() || xid != exec_info_.xid_) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "invalid argument", K(ret), K(xid), KPC(this)); - } else if (!is_sub2pc()) { + } else if (!is_sub2pc() && !is_rollback) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "unexpected trans ctx", KR(ret), KPC(this)); } else if (OB_UNLIKELY(is_follower_())) { diff --git a/src/storage/tx/ob_two_phase_committer_xa.cpp b/src/storage/tx/ob_two_phase_committer_xa.cpp index 63a1b898c4..283e5edf32 100644 --- a/src/storage/tx/ob_two_phase_committer_xa.cpp +++ b/src/storage/tx/ob_two_phase_committer_xa.cpp @@ -279,7 +279,7 @@ int ObTxCycleTwoPhaseCommitter::continue_execution(const bool is_rollback) const ObTxState state = get_downstream_state(); bool no_need_submit_log = false; - if (ObTxState::REDO_COMPLETE != get_upstream_state()) { + if (ObTxState::REDO_COMPLETE != get_upstream_state() && !is_rollback) { TRANS_LOG(INFO, "already in second phase", K(ret), K(*this)); } else { if (is_rollback) {