post 2pc requests after advancing the upstream_state successfully

This commit is contained in:
KyrielightWei
2024-06-18 00:17:25 +00:00
committed by ob-robot
parent 8df6ceb0ab
commit 768e502783
6 changed files with 34 additions and 4 deletions

View File

@ -1311,9 +1311,13 @@ int ObPartTransCtx::get_gts_callback(const MonotonicTs srr,
} else {
TRANS_LOG(WARN, "need not drive 2pc phase when 2pc blocking", K(ret), KPC(this));
}
} else if (get_upstream_state() <= ObTxState::PREPARE
&& OB_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) {
} else if (get_upstream_state() <= ObTxState::PREPARE) {
int tmp_ret = OB_SUCCESS;
if (OB_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) {
TRANS_LOG(WARN, "drive into prepare phase failed in gts callback", K(ret), KPC(this));
} else if (OB_TMP_FAIL(ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_())) {
TRANS_LOG(WARN, "post prepare request failed", K(ret), KPC(this));
}
}
}
@ -4413,6 +4417,7 @@ ERRSIM_POINT_DEF(ERRSIM_DELAY_TX_SUBMIT_LOG);
int ObPartTransCtx::submit_log_impl_(const ObTxLogType log_type)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
MTL_SWITCH(tenant_id_)
{
if (big_segment_info_.segment_buf_.is_active()) {
@ -4438,6 +4443,8 @@ int ObPartTransCtx::submit_log_impl_(const ObTxLogType log_type)
if (get_upstream_state() < ObTxState::PREPARE) {
if (OB_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) {
TRANS_LOG(WARN, "drive 2pc prepare phase failed", K(ret), K(*this));
} else if (OB_TMP_FAIL(ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_())) {
TRANS_LOG(WARN, "retry to post preapre request failed", K(tmp_ret), KPC(this));
}
} else if (get_upstream_state() > ObTxState::PREPARE ||
get_downstream_state() >= ObTxState::PREPARE) {
@ -7869,6 +7876,8 @@ int ObPartTransCtx::dup_table_tx_redo_sync_(const bool need_retry_by_task)
bool no_need_submit_log = false;
if (OB_TMP_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) {
TRANS_LOG(WARN, "do prepare failed after redo sync", K(tmp_ret), KPC(this));
} else if (OB_TMP_FAIL(ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_())) {
TRANS_LOG(WARN, "retry to post preapre request failed", K(tmp_ret), KPC(this));
}
} else if (OB_FAIL(ls_tx_ctx_mgr_->get_ls_log_adapter()->check_redo_sync_completed(
trans_id_, exec_info_.max_applied_log_ts_, redo_sync_finish,
@ -7890,7 +7899,10 @@ int ObPartTransCtx::dup_table_tx_redo_sync_(const bool need_retry_by_task)
if (OB_TMP_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) {
TRANS_LOG(WARN, "do prepare failed after redo sync", K(tmp_ret), KPC(this));
} else if (OB_TMP_FAIL(ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_())) {
TRANS_LOG(WARN, "retry to post preapre request failed", K(tmp_ret), KPC(this));
}
}
} else {
if (need_retry_by_task) {
@ -7902,6 +7914,9 @@ int ObPartTransCtx::dup_table_tx_redo_sync_(const bool need_retry_by_task)
if (OB_TMP_FAIL(restart_2pc_trans_timer_())) {
TRANS_LOG(WARN, "set 2pc trans timer for dup table failed", K(ret), K(tmp_ret), KPC(this));
}
if (OB_TMP_FAIL(ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_())) {
TRANS_LOG(WARN, "retry to post preapre request failed", K(tmp_ret), KPC(this));
}
}
}

View File

@ -961,6 +961,8 @@ protected:
// for xa
virtual bool is_sub2pc() const override
{ return exec_info_.is_sub2pc_; }
virtual bool is_dup_tx() const override
{ return exec_info_.is_dup_tx_; }
// =========================== TREE COMMITTER START ===========================
public:

View File

@ -333,6 +333,9 @@ public:
virtual bool is_sub2pc() const = 0;
// only persist redo and commit info
int prepare_redo();
virtual bool is_dup_tx() const = 0;
// continue execution of two phase commit
int continue_execution(const bool is_rollback);
@ -380,7 +383,6 @@ private:
//
// NB: We should take both upstream and downstream into consideration.
int decide_downstream_msg_type_(bool &need_submit, ObTwoPhaseCommitMsgType &msg_type);
int retransmit_downstream_msg_();
int retransmit_upstream_msg_(const ObTxState state);
int retransmit_downstream_msg_(const int64_t participant);
@ -393,6 +395,7 @@ private:
protected:
// Means we collect all downstream responses
bool all_downstream_collected_();
int retransmit_downstream_msg_();
protected:
// colloected_ is the bit set for storing responses from participants
//

View File

@ -380,6 +380,9 @@ int ObTxCycleTwoPhaseCommitter::decide_downstream_msg_type_(bool &need_submit,
if (is_sub2pc()) {
need_submit = true;
msg_type = ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REDO_REQ;
} else if(is_dup_tx()) {
need_submit = true;
msg_type = ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ;
} else {
need_submit = false;
if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) {

View File

@ -437,6 +437,11 @@ bool MockOb2pcCtx::is_sub2pc() const
return false;
}
bool MockOb2pcCtx::is_dup_tx() const
{
return false;
}
int MockOb2pcCtx::merge_intermediate_participants()
{
int ret = OB_SUCCESS;

View File

@ -129,6 +129,8 @@ protected:
// for xa
virtual bool is_sub2pc() const override;
virtual bool is_dup_tx() const override;
virtual int merge_intermediate_participants() override;
void add_intermediate_participants(const int64_t ls_id);