From 768e502783b3c1acb980c7d9c58d1dc3c046cb24 Mon Sep 17 00:00:00 2001 From: KyrielightWei Date: Tue, 18 Jun 2024 00:17:25 +0000 Subject: [PATCH] post 2pc requests after advancing the upstream_state successfully --- src/storage/tx/ob_trans_part_ctx.cpp | 21 ++++++++++++++++--- src/storage/tx/ob_trans_part_ctx.h | 2 ++ src/storage/tx/ob_two_phase_committer.h | 5 ++++- .../tx/ob_two_phase_upstream_committer.cpp | 3 +++ unittest/storage/tx/ob_mock_2pc_ctx.cpp | 5 +++++ unittest/storage/tx/ob_mock_2pc_ctx.h | 2 ++ 6 files changed, 34 insertions(+), 4 deletions(-) diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 4f5abe6a2d..87e0a9da23 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -1311,9 +1311,13 @@ int ObPartTransCtx::get_gts_callback(const MonotonicTs srr, } else { TRANS_LOG(WARN, "need not drive 2pc phase when 2pc blocking", K(ret), KPC(this)); } - } else if (get_upstream_state() <= ObTxState::PREPARE - && OB_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) { - TRANS_LOG(WARN, "drive into prepare phase failed in gts callback", K(ret), KPC(this)); + } else if (get_upstream_state() <= ObTxState::PREPARE) { + int tmp_ret = OB_SUCCESS; + if (OB_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) { + TRANS_LOG(WARN, "drive into prepare phase failed in gts callback", K(ret), KPC(this)); + } else if (OB_TMP_FAIL(ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_())) { + TRANS_LOG(WARN, "post prepare request failed", K(ret), KPC(this)); + } } } @@ -4413,6 +4417,7 @@ ERRSIM_POINT_DEF(ERRSIM_DELAY_TX_SUBMIT_LOG); int ObPartTransCtx::submit_log_impl_(const ObTxLogType log_type) { int ret = OB_SUCCESS; + int tmp_ret = OB_SUCCESS; MTL_SWITCH(tenant_id_) { if (big_segment_info_.segment_buf_.is_active()) { @@ -4438,6 +4443,8 @@ int ObPartTransCtx::submit_log_impl_(const ObTxLogType log_type) if (get_upstream_state() < ObTxState::PREPARE) { if (OB_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) { TRANS_LOG(WARN, "drive 2pc prepare phase failed", K(ret), K(*this)); + } else if (OB_TMP_FAIL(ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_())) { + TRANS_LOG(WARN, "retry to post preapre request failed", K(tmp_ret), KPC(this)); } } else if (get_upstream_state() > ObTxState::PREPARE || get_downstream_state() >= ObTxState::PREPARE) { @@ -7869,6 +7876,8 @@ int ObPartTransCtx::dup_table_tx_redo_sync_(const bool need_retry_by_task) bool no_need_submit_log = false; if (OB_TMP_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) { TRANS_LOG(WARN, "do prepare failed after redo sync", K(tmp_ret), KPC(this)); + } else if (OB_TMP_FAIL(ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_())) { + TRANS_LOG(WARN, "retry to post preapre request failed", K(tmp_ret), KPC(this)); } } else if (OB_FAIL(ls_tx_ctx_mgr_->get_ls_log_adapter()->check_redo_sync_completed( trans_id_, exec_info_.max_applied_log_ts_, redo_sync_finish, @@ -7890,7 +7899,10 @@ int ObPartTransCtx::dup_table_tx_redo_sync_(const bool need_retry_by_task) if (OB_TMP_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) { TRANS_LOG(WARN, "do prepare failed after redo sync", K(tmp_ret), KPC(this)); + } else if (OB_TMP_FAIL(ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_())) { + TRANS_LOG(WARN, "retry to post preapre request failed", K(tmp_ret), KPC(this)); } + } } else { if (need_retry_by_task) { @@ -7902,6 +7914,9 @@ int ObPartTransCtx::dup_table_tx_redo_sync_(const bool need_retry_by_task) if (OB_TMP_FAIL(restart_2pc_trans_timer_())) { TRANS_LOG(WARN, "set 2pc trans timer for dup table failed", K(ret), K(tmp_ret), KPC(this)); } + if (OB_TMP_FAIL(ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_())) { + TRANS_LOG(WARN, "retry to post preapre request failed", K(tmp_ret), KPC(this)); + } } } diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index dc37b6b0d5..0cf8d5280b 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -961,6 +961,8 @@ protected: // for xa virtual bool is_sub2pc() const override { return exec_info_.is_sub2pc_; } + virtual bool is_dup_tx() const override + { return exec_info_.is_dup_tx_; } // =========================== TREE COMMITTER START =========================== public: diff --git a/src/storage/tx/ob_two_phase_committer.h b/src/storage/tx/ob_two_phase_committer.h index d23fa7a1e5..521dd149c5 100644 --- a/src/storage/tx/ob_two_phase_committer.h +++ b/src/storage/tx/ob_two_phase_committer.h @@ -333,6 +333,9 @@ public: virtual bool is_sub2pc() const = 0; // only persist redo and commit info int prepare_redo(); + + virtual bool is_dup_tx() const = 0; + // continue execution of two phase commit int continue_execution(const bool is_rollback); @@ -380,7 +383,6 @@ private: // // NB: We should take both upstream and downstream into consideration. int decide_downstream_msg_type_(bool &need_submit, ObTwoPhaseCommitMsgType &msg_type); - int retransmit_downstream_msg_(); int retransmit_upstream_msg_(const ObTxState state); int retransmit_downstream_msg_(const int64_t participant); @@ -393,6 +395,7 @@ private: protected: // Means we collect all downstream responses bool all_downstream_collected_(); + int retransmit_downstream_msg_(); protected: // colloected_ is the bit set for storing responses from participants // diff --git a/src/storage/tx/ob_two_phase_upstream_committer.cpp b/src/storage/tx/ob_two_phase_upstream_committer.cpp index 783310f847..34919b6564 100644 --- a/src/storage/tx/ob_two_phase_upstream_committer.cpp +++ b/src/storage/tx/ob_two_phase_upstream_committer.cpp @@ -380,6 +380,9 @@ int ObTxCycleTwoPhaseCommitter::decide_downstream_msg_type_(bool &need_submit, if (is_sub2pc()) { need_submit = true; msg_type = ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REDO_REQ; + } else if(is_dup_tx()) { + need_submit = true; + msg_type = ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ; } else { need_submit = false; if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) { diff --git a/unittest/storage/tx/ob_mock_2pc_ctx.cpp b/unittest/storage/tx/ob_mock_2pc_ctx.cpp index 1c98d7ff12..9e0799bbdc 100644 --- a/unittest/storage/tx/ob_mock_2pc_ctx.cpp +++ b/unittest/storage/tx/ob_mock_2pc_ctx.cpp @@ -437,6 +437,11 @@ bool MockOb2pcCtx::is_sub2pc() const return false; } +bool MockOb2pcCtx::is_dup_tx() const +{ + return false; +} + int MockOb2pcCtx::merge_intermediate_participants() { int ret = OB_SUCCESS; diff --git a/unittest/storage/tx/ob_mock_2pc_ctx.h b/unittest/storage/tx/ob_mock_2pc_ctx.h index 991d848c8e..4396948cdf 100644 --- a/unittest/storage/tx/ob_mock_2pc_ctx.h +++ b/unittest/storage/tx/ob_mock_2pc_ctx.h @@ -129,6 +129,8 @@ protected: // for xa virtual bool is_sub2pc() const override; + virtual bool is_dup_tx() const override; + virtual int merge_intermediate_participants() override; void add_intermediate_participants(const int64_t ls_id);