[CP] [2pc] drive 2pc with 1 participants

This commit is contained in:
Handora
2023-05-24 05:48:05 +00:00
committed by ob-robot
parent bf002af793
commit 5e797edef3
3 changed files with 147 additions and 8 deletions

View File

@ -351,6 +351,12 @@ private:
virtual int apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type) = 0;
// The next state of current state, it is based on the order of ObTxState.
//
// It will return ObTxState::UNKNOWN if you pass the state which is smaller
// than ObTxState::INIT or larger than ObTxState::CLEAR.
ObTxState decide_next_state_(const ObTxState cur_state);
// Because the post_msg is best effect, we need retry to post the msg under
// exception.
//

View File

@ -242,17 +242,34 @@ int ObTxCycleTwoPhaseCommitter::handle_timeout()
TRANS_LOG(WARN, "retransmit upstream msg failed", KR(tmp_ret));
}
// if a distributed trans has one participant and its 2pc state (up and down) is prepare,
// then try enter into pre commit state.
// NOTE that if a distributed trans has at least two participants,
// the state can be drived by message.
// If a distributed txn has one participant and it cannot drive its state by
// msg or log, then we will try enter into the next state by timeout.
//
// NOTE that if a distributed txn has at least two participants, the state
// can be drived by message.
if (is_root()) {
const int SINGLE_COUNT = 1;
if (SINGLE_COUNT == get_downstream_size()
&& ObTxState::PREPARE == get_downstream_state()
&& ObTxState::PREPARE == get_upstream_state()) {
if (OB_FAIL(try_enter_pre_commit_state())) {
TRANS_LOG(WARN, "try enter into pre commit state failed", K(ret));
&& !is_2pc_logging()) {
if (get_upstream_state() == get_downstream_state()
&& ObTxState::CLEAR > get_downstream_state()
&& ObTxState::PREPARE <= get_downstream_state()) {
ObTxState next_state = decide_next_state_(get_downstream_state());
if (OB_TMP_FAIL(drive_self_2pc_phase(next_state))) {
TRANS_LOG(WARN, "enter next phase failed", K(tmp_ret), K(*this));
}
}
// If a distributed txn has one participant and its state is pre_commit
// and cannot drive its state by msg, then we will try to apply the
// pre_commit.
//
// NOTE 2pc with other state will drive it self with log
if (ObTxState::PRE_COMMIT == get_upstream_state()
&& ObTxState::PREPARE == get_downstream_state()) {
if (OB_TMP_FAIL(on_pre_commit())) {
TRANS_LOG(WARN, "apply pre commit failed", K(tmp_ret), K(*this));
}
}
}
}
@ -302,6 +319,45 @@ int ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_()
return ret;
}
ObTxState ObTxCycleTwoPhaseCommitter::decide_next_state_(const ObTxState cur_state)
{
ObTxState next_state = ObTxState::UNKNOWN;
switch (cur_state)
{
case ObTxState::INIT: {
next_state = ObTxState::REDO_COMPLETE;
break;
}
case ObTxState::REDO_COMPLETE: {
next_state = ObTxState::PREPARE;
break;
}
case ObTxState::PREPARE: {
next_state = ObTxState::PRE_COMMIT;
break;
}
case ObTxState::PRE_COMMIT: {
next_state = ObTxState::COMMIT;
break;
}
case ObTxState::COMMIT: {
next_state = ObTxState::CLEAR;
break;
}
case ObTxState::ABORT: {
next_state = ObTxState::CLEAR;
break;
}
default: {
next_state = ObTxState::UNKNOWN;
break;
}
}
return next_state;
}
int ObTxCycleTwoPhaseCommitter::decide_downstream_msg_type_(bool &need_submit,
ObTwoPhaseCommitMsgType &msg_type)
{

View File

@ -12,6 +12,8 @@
#include <gtest/gtest.h>
#include <vector>
#define private public
#define protected public
#include "ob_mock_2pc_ctx.h"
namespace oceanbase
@ -228,6 +230,81 @@ TEST_F(TestMockOb2pcCtx, test_simple_abort2)
EXPECT_EQ(true, ctx2.check_status_valid(false/*should commit*/));
}
TEST_F(TestMockOb2pcCtx, test_single_participants_prepare)
{
MockOb2pcCtx ctx1;
ctx1.init(&mailbox_mgr_);
auto addr1 = ctx1.get_addr();
MockObParticipants participants;
participants.push_back(addr1);
ctx1.participants_.assign(participants.begin(), participants.end());
// ========== Two Phase Commit prepare Phase ==========
// ctx1 start prepare state
ctx1.downstream_state_ = ObTxState::PREPARE;
ctx1.set_upstream_state(ObTxState::PREPARE);
ctx1.handle_timeout();
EXPECT_EQ(ObTxState::COMMIT, ctx1.get_upstream_state());
EXPECT_EQ(ObTxState::PRE_COMMIT, ctx1.get_downstream_state());
}
TEST_F(TestMockOb2pcCtx, test_single_participants_precommit)
{
MockOb2pcCtx ctx1;
ctx1.init(&mailbox_mgr_);
auto addr1 = ctx1.get_addr();
MockObParticipants participants;
participants.push_back(addr1);
ctx1.participants_.assign(participants.begin(), participants.end());
// ========== Two Phase Commit precommit Phase ==========
ctx1.downstream_state_ = ObTxState::PREPARE;
ctx1.set_upstream_state(ObTxState::PRE_COMMIT);
ctx1.handle_timeout();
EXPECT_EQ(ObTxState::COMMIT, ctx1.get_upstream_state());
EXPECT_EQ(ObTxState::PRE_COMMIT, ctx1.get_downstream_state());
}
TEST_F(TestMockOb2pcCtx, test_single_participants_precommit2)
{
MockOb2pcCtx ctx1;
ctx1.init(&mailbox_mgr_);
auto addr1 = ctx1.get_addr();
MockObParticipants participants;
participants.push_back(addr1);
ctx1.participants_.assign(participants.begin(), participants.end());
// ========== Two Phase Commit precommit Phase ==========
ctx1.downstream_state_ = ObTxState::PRE_COMMIT;
ctx1.set_upstream_state(ObTxState::PRE_COMMIT);
ctx1.handle_timeout();
EXPECT_EQ(ObTxState::COMMIT, ctx1.get_upstream_state());
EXPECT_EQ(ObTxState::PRE_COMMIT, ctx1.get_downstream_state());
ctx1.apply();
EXPECT_EQ(ObTxState::CLEAR, ctx1.get_upstream_state());
EXPECT_EQ(ObTxState::COMMIT, ctx1.get_downstream_state());
}
TEST_F(TestMockOb2pcCtx, test_single_participants_commit)
{
MockOb2pcCtx ctx1;
ctx1.init(&mailbox_mgr_);
auto addr1 = ctx1.get_addr();
MockObParticipants participants;
participants.push_back(addr1);
ctx1.participants_.assign(participants.begin(), participants.end());
// ========== Two Phase Commit precommit Phase ==========
ctx1.downstream_state_ = ObTxState::COMMIT;
ctx1.set_upstream_state(ObTxState::COMMIT);
ctx1.handle_timeout();
EXPECT_EQ(ObTxState::CLEAR, ctx1.get_upstream_state());
EXPECT_EQ(ObTxState::COMMIT, ctx1.get_downstream_state());
ctx1.apply();
EXPECT_EQ(ObTxState::CLEAR, ctx1.get_upstream_state());
EXPECT_EQ(ObTxState::CLEAR, ctx1.get_downstream_state());
}
}
}