diff --git a/src/storage/tx/ob_two_phase_committer.h b/src/storage/tx/ob_two_phase_committer.h index 1286506351..d6aa57374c 100644 --- a/src/storage/tx/ob_two_phase_committer.h +++ b/src/storage/tx/ob_two_phase_committer.h @@ -351,6 +351,12 @@ private: virtual int apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type) = 0; + // The next state of current state, it is based on the order of ObTxState. + // + // It will return ObTxState::UNKNOWN if you pass the state which is smaller + // than ObTxState::INIT or larger than ObTxState::CLEAR. + ObTxState decide_next_state_(const ObTxState cur_state); + // Because the post_msg is best effect, we need retry to post the msg under // exception. // diff --git a/src/storage/tx/ob_two_phase_upstream_committer.cpp b/src/storage/tx/ob_two_phase_upstream_committer.cpp index 7ff060f6a6..f6fcf0f650 100644 --- a/src/storage/tx/ob_two_phase_upstream_committer.cpp +++ b/src/storage/tx/ob_two_phase_upstream_committer.cpp @@ -242,17 +242,34 @@ int ObTxCycleTwoPhaseCommitter::handle_timeout() TRANS_LOG(WARN, "retransmit upstream msg failed", KR(tmp_ret)); } - // if a distributed trans has one participant and its 2pc state (up and down) is prepare, - // then try enter into pre commit state. - // NOTE that if a distributed trans has at least two participants, - // the state can be drived by message. + // If a distributed txn has one participant and it cannot drive its state by + // msg or log, then we will try enter into the next state by timeout. + // + // NOTE that if a distributed txn has at least two participants, the state + // can be drived by message. if (is_root()) { const int SINGLE_COUNT = 1; if (SINGLE_COUNT == get_downstream_size() - && ObTxState::PREPARE == get_downstream_state() - && ObTxState::PREPARE == get_upstream_state()) { - if (OB_FAIL(try_enter_pre_commit_state())) { - TRANS_LOG(WARN, "try enter into pre commit state failed", K(ret)); + && !is_2pc_logging()) { + if (get_upstream_state() == get_downstream_state() + && ObTxState::CLEAR > get_downstream_state() + && ObTxState::PREPARE <= get_downstream_state()) { + ObTxState next_state = decide_next_state_(get_downstream_state()); + if (OB_TMP_FAIL(drive_self_2pc_phase(next_state))) { + TRANS_LOG(WARN, "enter next phase failed", K(tmp_ret), K(*this)); + } + } + + // If a distributed txn has one participant and its state is pre_commit + // and cannot drive its state by msg, then we will try to apply the + // pre_commit. + // + // NOTE 2pc with other state will drive it self with log + if (ObTxState::PRE_COMMIT == get_upstream_state() + && ObTxState::PREPARE == get_downstream_state()) { + if (OB_TMP_FAIL(on_pre_commit())) { + TRANS_LOG(WARN, "apply pre commit failed", K(tmp_ret), K(*this)); + } } } } @@ -302,6 +319,45 @@ int ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_() return ret; } +ObTxState ObTxCycleTwoPhaseCommitter::decide_next_state_(const ObTxState cur_state) +{ + ObTxState next_state = ObTxState::UNKNOWN; + + switch (cur_state) + { + case ObTxState::INIT: { + next_state = ObTxState::REDO_COMPLETE; + break; + } + case ObTxState::REDO_COMPLETE: { + next_state = ObTxState::PREPARE; + break; + } + case ObTxState::PREPARE: { + next_state = ObTxState::PRE_COMMIT; + break; + } + case ObTxState::PRE_COMMIT: { + next_state = ObTxState::COMMIT; + break; + } + case ObTxState::COMMIT: { + next_state = ObTxState::CLEAR; + break; + } + case ObTxState::ABORT: { + next_state = ObTxState::CLEAR; + break; + } + default: { + next_state = ObTxState::UNKNOWN; + break; + } + } + + return next_state; +} + int ObTxCycleTwoPhaseCommitter::decide_downstream_msg_type_(bool &need_submit, ObTwoPhaseCommitMsgType &msg_type) { diff --git a/unittest/storage/tx/test_simple_tx_commit.cpp b/unittest/storage/tx/test_simple_tx_commit.cpp index 7b98133901..2a8e5f5b65 100644 --- a/unittest/storage/tx/test_simple_tx_commit.cpp +++ b/unittest/storage/tx/test_simple_tx_commit.cpp @@ -12,6 +12,8 @@ #include #include +#define private public +#define protected public #include "ob_mock_2pc_ctx.h" namespace oceanbase @@ -228,6 +230,81 @@ TEST_F(TestMockOb2pcCtx, test_simple_abort2) EXPECT_EQ(true, ctx2.check_status_valid(false/*should commit*/)); } +TEST_F(TestMockOb2pcCtx, test_single_participants_prepare) +{ + MockOb2pcCtx ctx1; + ctx1.init(&mailbox_mgr_); + auto addr1 = ctx1.get_addr(); + MockObParticipants participants; + participants.push_back(addr1); + ctx1.participants_.assign(participants.begin(), participants.end()); + + // ========== Two Phase Commit prepare Phase ========== + // ctx1 start prepare state + ctx1.downstream_state_ = ObTxState::PREPARE; + ctx1.set_upstream_state(ObTxState::PREPARE); + ctx1.handle_timeout(); + EXPECT_EQ(ObTxState::COMMIT, ctx1.get_upstream_state()); + EXPECT_EQ(ObTxState::PRE_COMMIT, ctx1.get_downstream_state()); +} + +TEST_F(TestMockOb2pcCtx, test_single_participants_precommit) +{ + MockOb2pcCtx ctx1; + ctx1.init(&mailbox_mgr_); + auto addr1 = ctx1.get_addr(); + MockObParticipants participants; + participants.push_back(addr1); + ctx1.participants_.assign(participants.begin(), participants.end()); + + // ========== Two Phase Commit precommit Phase ========== + ctx1.downstream_state_ = ObTxState::PREPARE; + ctx1.set_upstream_state(ObTxState::PRE_COMMIT); + ctx1.handle_timeout(); + EXPECT_EQ(ObTxState::COMMIT, ctx1.get_upstream_state()); + EXPECT_EQ(ObTxState::PRE_COMMIT, ctx1.get_downstream_state()); +} + +TEST_F(TestMockOb2pcCtx, test_single_participants_precommit2) +{ + MockOb2pcCtx ctx1; + ctx1.init(&mailbox_mgr_); + auto addr1 = ctx1.get_addr(); + MockObParticipants participants; + participants.push_back(addr1); + ctx1.participants_.assign(participants.begin(), participants.end()); + + // ========== Two Phase Commit precommit Phase ========== + ctx1.downstream_state_ = ObTxState::PRE_COMMIT; + ctx1.set_upstream_state(ObTxState::PRE_COMMIT); + ctx1.handle_timeout(); + EXPECT_EQ(ObTxState::COMMIT, ctx1.get_upstream_state()); + EXPECT_EQ(ObTxState::PRE_COMMIT, ctx1.get_downstream_state()); + ctx1.apply(); + EXPECT_EQ(ObTxState::CLEAR, ctx1.get_upstream_state()); + EXPECT_EQ(ObTxState::COMMIT, ctx1.get_downstream_state()); +} + +TEST_F(TestMockOb2pcCtx, test_single_participants_commit) +{ + MockOb2pcCtx ctx1; + ctx1.init(&mailbox_mgr_); + auto addr1 = ctx1.get_addr(); + MockObParticipants participants; + participants.push_back(addr1); + ctx1.participants_.assign(participants.begin(), participants.end()); + + // ========== Two Phase Commit precommit Phase ========== + ctx1.downstream_state_ = ObTxState::COMMIT; + ctx1.set_upstream_state(ObTxState::COMMIT); + ctx1.handle_timeout(); + EXPECT_EQ(ObTxState::CLEAR, ctx1.get_upstream_state()); + EXPECT_EQ(ObTxState::COMMIT, ctx1.get_downstream_state()); + ctx1.apply(); + EXPECT_EQ(ObTxState::CLEAR, ctx1.get_upstream_state()); + EXPECT_EQ(ObTxState::CLEAR, ctx1.get_downstream_state()); +} + } }