[2pc] drive 2pc with 1 participants
This commit is contained in:
		@ -351,6 +351,12 @@ private:
 | 
			
		||||
 | 
			
		||||
  virtual int apply_2pc_msg_(const ObTwoPhaseCommitMsgType msg_type) = 0;
 | 
			
		||||
 | 
			
		||||
  // The next state of current state, it is based on the order of ObTxState.
 | 
			
		||||
  //
 | 
			
		||||
  // It will return ObTxState::UNKNOWN if you pass the state which is smaller
 | 
			
		||||
  // than ObTxState::INIT or larger than ObTxState::CLEAR.
 | 
			
		||||
  ObTxState decide_next_state_(const ObTxState cur_state);
 | 
			
		||||
 | 
			
		||||
  // Because the post_msg is best effect, we need retry to post the msg under
 | 
			
		||||
  // exception.
 | 
			
		||||
  //
 | 
			
		||||
 | 
			
		||||
@ -242,17 +242,34 @@ int ObTxCycleTwoPhaseCommitter::handle_timeout()
 | 
			
		||||
    TRANS_LOG(WARN, "retransmit upstream msg failed", KR(tmp_ret));
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  // if a distributed trans has one participant and its 2pc state (up and down) is prepare,
 | 
			
		||||
  // then try enter into pre commit state.
 | 
			
		||||
  // NOTE that if a distributed trans has at least two participants,
 | 
			
		||||
  // the state can be drived by message.
 | 
			
		||||
  // If a distributed txn has one participant and it cannot drive its state by
 | 
			
		||||
  // msg or log, then we will try enter into the next state by timeout.
 | 
			
		||||
  //
 | 
			
		||||
  // NOTE that if a distributed txn has at least two participants, the state
 | 
			
		||||
  // can be drived by message.
 | 
			
		||||
  if (is_root()) {
 | 
			
		||||
    const int SINGLE_COUNT = 1;
 | 
			
		||||
    if (SINGLE_COUNT == get_downstream_size()
 | 
			
		||||
        && ObTxState::PREPARE == get_downstream_state()
 | 
			
		||||
        && ObTxState::PREPARE == get_upstream_state()) {
 | 
			
		||||
      if (OB_FAIL(try_enter_pre_commit_state())) {
 | 
			
		||||
        TRANS_LOG(WARN, "try enter into pre commit state failed", K(ret));
 | 
			
		||||
        && !is_2pc_logging()) {
 | 
			
		||||
      if (get_upstream_state() == get_downstream_state()
 | 
			
		||||
          && ObTxState::CLEAR > get_downstream_state()
 | 
			
		||||
          && ObTxState::PREPARE <= get_downstream_state()) {
 | 
			
		||||
        ObTxState next_state = decide_next_state_(get_downstream_state());
 | 
			
		||||
        if (OB_TMP_FAIL(drive_self_2pc_phase(next_state))) {
 | 
			
		||||
          TRANS_LOG(WARN, "enter next phase failed", K(tmp_ret), K(*this));
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
      // If a distributed txn has one participant and its state is pre_commit
 | 
			
		||||
      // and cannot drive its state by msg, then we will try to apply the
 | 
			
		||||
      // pre_commit.
 | 
			
		||||
      //
 | 
			
		||||
      // NOTE 2pc with other state will drive it self with log
 | 
			
		||||
      if (ObTxState::PRE_COMMIT == get_upstream_state()
 | 
			
		||||
          && ObTxState::PREPARE == get_downstream_state()) {
 | 
			
		||||
        if (OB_TMP_FAIL(on_pre_commit())) {
 | 
			
		||||
          TRANS_LOG(WARN, "apply pre commit failed", K(tmp_ret), K(*this));
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
@ -302,6 +319,45 @@ int ObTxCycleTwoPhaseCommitter::retransmit_downstream_msg_()
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
ObTxState ObTxCycleTwoPhaseCommitter::decide_next_state_(const ObTxState cur_state)
 | 
			
		||||
{
 | 
			
		||||
  ObTxState next_state = ObTxState::UNKNOWN;
 | 
			
		||||
 | 
			
		||||
  switch (cur_state)
 | 
			
		||||
  {
 | 
			
		||||
  case ObTxState::INIT: {
 | 
			
		||||
    next_state = ObTxState::REDO_COMPLETE;
 | 
			
		||||
    break;
 | 
			
		||||
  }
 | 
			
		||||
  case ObTxState::REDO_COMPLETE: {
 | 
			
		||||
    next_state = ObTxState::PREPARE;
 | 
			
		||||
    break;
 | 
			
		||||
  }
 | 
			
		||||
  case ObTxState::PREPARE: {
 | 
			
		||||
    next_state = ObTxState::PRE_COMMIT;
 | 
			
		||||
    break;
 | 
			
		||||
  }
 | 
			
		||||
  case ObTxState::PRE_COMMIT: {
 | 
			
		||||
    next_state = ObTxState::COMMIT;
 | 
			
		||||
    break;
 | 
			
		||||
  }
 | 
			
		||||
  case ObTxState::COMMIT: {
 | 
			
		||||
    next_state = ObTxState::CLEAR;
 | 
			
		||||
    break;
 | 
			
		||||
  }
 | 
			
		||||
  case ObTxState::ABORT: {
 | 
			
		||||
    next_state = ObTxState::CLEAR;
 | 
			
		||||
    break;
 | 
			
		||||
  }
 | 
			
		||||
  default: {
 | 
			
		||||
    next_state = ObTxState::UNKNOWN;
 | 
			
		||||
    break;
 | 
			
		||||
  }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  return next_state;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObTxCycleTwoPhaseCommitter::decide_downstream_msg_type_(bool &need_submit,
 | 
			
		||||
                                                            ObTwoPhaseCommitMsgType &msg_type)
 | 
			
		||||
{
 | 
			
		||||
 | 
			
		||||
@ -12,6 +12,8 @@
 | 
			
		||||
 | 
			
		||||
#include <gtest/gtest.h>
 | 
			
		||||
#include <vector>
 | 
			
		||||
#define private public
 | 
			
		||||
#define protected public
 | 
			
		||||
#include "ob_mock_2pc_ctx.h"
 | 
			
		||||
 | 
			
		||||
namespace oceanbase
 | 
			
		||||
@ -228,6 +230,81 @@ TEST_F(TestMockOb2pcCtx, test_simple_abort2)
 | 
			
		||||
  EXPECT_EQ(true, ctx2.check_status_valid(false/*should commit*/));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(TestMockOb2pcCtx, test_single_participants_prepare)
 | 
			
		||||
{
 | 
			
		||||
  MockOb2pcCtx ctx1;
 | 
			
		||||
  ctx1.init(&mailbox_mgr_);
 | 
			
		||||
  auto addr1 = ctx1.get_addr();
 | 
			
		||||
  MockObParticipants participants;
 | 
			
		||||
  participants.push_back(addr1);
 | 
			
		||||
  ctx1.participants_.assign(participants.begin(), participants.end());
 | 
			
		||||
 | 
			
		||||
  // ========== Two Phase Commit prepare Phase ==========
 | 
			
		||||
  // ctx1 start prepare state
 | 
			
		||||
  ctx1.downstream_state_ = ObTxState::PREPARE;
 | 
			
		||||
  ctx1.set_upstream_state(ObTxState::PREPARE);
 | 
			
		||||
  ctx1.handle_timeout();
 | 
			
		||||
  EXPECT_EQ(ObTxState::COMMIT, ctx1.get_upstream_state());
 | 
			
		||||
  EXPECT_EQ(ObTxState::PRE_COMMIT, ctx1.get_downstream_state());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(TestMockOb2pcCtx, test_single_participants_precommit)
 | 
			
		||||
{
 | 
			
		||||
  MockOb2pcCtx ctx1;
 | 
			
		||||
  ctx1.init(&mailbox_mgr_);
 | 
			
		||||
  auto addr1 = ctx1.get_addr();
 | 
			
		||||
  MockObParticipants participants;
 | 
			
		||||
  participants.push_back(addr1);
 | 
			
		||||
  ctx1.participants_.assign(participants.begin(), participants.end());
 | 
			
		||||
 | 
			
		||||
  // ========== Two Phase Commit precommit Phase ==========
 | 
			
		||||
  ctx1.downstream_state_ = ObTxState::PREPARE;
 | 
			
		||||
  ctx1.set_upstream_state(ObTxState::PRE_COMMIT);
 | 
			
		||||
  ctx1.handle_timeout();
 | 
			
		||||
  EXPECT_EQ(ObTxState::COMMIT, ctx1.get_upstream_state());
 | 
			
		||||
  EXPECT_EQ(ObTxState::PRE_COMMIT, ctx1.get_downstream_state());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(TestMockOb2pcCtx, test_single_participants_precommit2)
 | 
			
		||||
{
 | 
			
		||||
  MockOb2pcCtx ctx1;
 | 
			
		||||
  ctx1.init(&mailbox_mgr_);
 | 
			
		||||
  auto addr1 = ctx1.get_addr();
 | 
			
		||||
  MockObParticipants participants;
 | 
			
		||||
  participants.push_back(addr1);
 | 
			
		||||
  ctx1.participants_.assign(participants.begin(), participants.end());
 | 
			
		||||
 | 
			
		||||
  // ========== Two Phase Commit precommit Phase ==========
 | 
			
		||||
  ctx1.downstream_state_ = ObTxState::PRE_COMMIT;
 | 
			
		||||
  ctx1.set_upstream_state(ObTxState::PRE_COMMIT);
 | 
			
		||||
  ctx1.handle_timeout();
 | 
			
		||||
  EXPECT_EQ(ObTxState::COMMIT, ctx1.get_upstream_state());
 | 
			
		||||
  EXPECT_EQ(ObTxState::PRE_COMMIT, ctx1.get_downstream_state());
 | 
			
		||||
  ctx1.apply();
 | 
			
		||||
  EXPECT_EQ(ObTxState::CLEAR, ctx1.get_upstream_state());
 | 
			
		||||
  EXPECT_EQ(ObTxState::COMMIT, ctx1.get_downstream_state());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(TestMockOb2pcCtx, test_single_participants_commit)
 | 
			
		||||
{
 | 
			
		||||
  MockOb2pcCtx ctx1;
 | 
			
		||||
  ctx1.init(&mailbox_mgr_);
 | 
			
		||||
  auto addr1 = ctx1.get_addr();
 | 
			
		||||
  MockObParticipants participants;
 | 
			
		||||
  participants.push_back(addr1);
 | 
			
		||||
  ctx1.participants_.assign(participants.begin(), participants.end());
 | 
			
		||||
 | 
			
		||||
  // ========== Two Phase Commit precommit Phase ==========
 | 
			
		||||
  ctx1.downstream_state_ = ObTxState::COMMIT;
 | 
			
		||||
  ctx1.set_upstream_state(ObTxState::COMMIT);
 | 
			
		||||
  ctx1.handle_timeout();
 | 
			
		||||
  EXPECT_EQ(ObTxState::CLEAR, ctx1.get_upstream_state());
 | 
			
		||||
  EXPECT_EQ(ObTxState::COMMIT, ctx1.get_downstream_state());
 | 
			
		||||
  ctx1.apply();
 | 
			
		||||
  EXPECT_EQ(ObTxState::CLEAR, ctx1.get_upstream_state());
 | 
			
		||||
  EXPECT_EQ(ObTxState::CLEAR, ctx1.get_downstream_state());
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user