[master] do not abort tx-ctx when switch to leader fail
This commit is contained in:
		@ -4587,9 +4587,7 @@ int ObPartTransCtx::switch_to_leader(const SCN &start_working_ts)
 | 
			
		||||
      } else {
 | 
			
		||||
        TRANS_LOG(WARN, "txn data incomplete, will be aborted", K(contain_table_lock), KPC(this));
 | 
			
		||||
        if (has_persisted_log_()) {
 | 
			
		||||
          if (OB_FAIL(do_local_tx_end_(TxEndAction::ABORT_TX))) {
 | 
			
		||||
            TRANS_LOG(WARN, "abort tx failed", KR(ret), K(*this));
 | 
			
		||||
          }
 | 
			
		||||
          sub_state_.set_force_abort();
 | 
			
		||||
        } else {
 | 
			
		||||
          TRANS_LOG(ERROR, "unexpected trx which has not persisted log", K(*this));
 | 
			
		||||
        }
 | 
			
		||||
@ -4762,7 +4760,9 @@ int ObPartTransCtx::switch_to_follower_gracefully(ObIArray<ObTxCommitCallback> &
 | 
			
		||||
  } else if (is_exiting_) {
 | 
			
		||||
    // do nothing
 | 
			
		||||
  } else if (sub_state_.is_force_abort()) {
 | 
			
		||||
    // is aborting, skip
 | 
			
		||||
    if (exec_info_.state_ == ObTxState::INIT && OB_FAIL(mt_ctx_.commit_to_replay())) {
 | 
			
		||||
      TRANS_LOG(WARN, "commit to replay error", KR(ret), "context", *this);
 | 
			
		||||
    }
 | 
			
		||||
  } else if (is_follower_()) {
 | 
			
		||||
    TRANS_LOG(INFO, "current tx already follower", K(*this));
 | 
			
		||||
  } else if (OB_FAIL(state_helper.switch_state(TxCtxOps::SWITCH_GRACEFUL))) {
 | 
			
		||||
@ -5567,7 +5567,10 @@ int ObPartTransCtx::sub_prepare(const ObLSArray &parts,
 | 
			
		||||
    exec_info_.trans_type_ = TransType::DIST_TRANS;
 | 
			
		||||
    exec_info_.xid_ = xid;
 | 
			
		||||
    // (void)set_sub2pc_coord_state(Ob2PCPrepareState::REDO_PREPARING);
 | 
			
		||||
    if (OB_FAIL(prepare_redo())) {
 | 
			
		||||
    if (sub_state_.is_force_abort()) {
 | 
			
		||||
      TRANS_LOG(WARN, "tx was marked force abort");
 | 
			
		||||
      ret = OB_TRANS_KILLED;
 | 
			
		||||
    } else if (OB_FAIL(prepare_redo())) {
 | 
			
		||||
      TRANS_LOG(WARN, "fail to execute sub prepare", K(ret), KPC(this));
 | 
			
		||||
    } else {
 | 
			
		||||
      part_trans_action_ = ObPartTransAction::COMMIT;
 | 
			
		||||
@ -5799,7 +5802,7 @@ int ObPartTransCtx::rollback_to_savepoint(const int64_t op_sn,
 | 
			
		||||
    ret = OB_TRANS_SQL_SEQUENCE_ILLEGAL;
 | 
			
		||||
  } else if (op_sn > last_op_sn_ && last_scn_ <= to_scn) {
 | 
			
		||||
    last_op_sn_ = op_sn;
 | 
			
		||||
    TRANS_LOG(INFO, "rollback succeed trivially", K(op_sn), K(to_scn), K_(last_scn));
 | 
			
		||||
    TRANS_LOG(INFO, "rollback succeed trivially", K(op_sn), K(to_scn), K_(last_scn), KP(this), K_(ls_id));
 | 
			
		||||
  } else if (op_sn > last_op_sn_ && pending_write_ > 0) {
 | 
			
		||||
    ret = OB_NEED_RETRY;
 | 
			
		||||
    TRANS_LOG(WARN, "has pending write, rollback blocked",
 | 
			
		||||
@ -6034,7 +6037,13 @@ int ObPartTransCtx::do_local_tx_end_(TxEndAction tx_end_action)
 | 
			
		||||
 | 
			
		||||
    switch (tx_end_action) {
 | 
			
		||||
    case TxEndAction::COMMIT_TX: {
 | 
			
		||||
      ret = do_local_commit_tx_();
 | 
			
		||||
      if (sub_state_.is_force_abort()) {
 | 
			
		||||
        if (OB_SUCC(do_local_abort_tx_())) {
 | 
			
		||||
          ret = OB_TRANS_KILLED;
 | 
			
		||||
        }
 | 
			
		||||
      } else {
 | 
			
		||||
        ret = do_local_commit_tx_();
 | 
			
		||||
      }
 | 
			
		||||
      // part_trans_action_ will be set as commit in ObPartTransCtx::commit function
 | 
			
		||||
      break;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -619,6 +619,7 @@ protected:
 | 
			
		||||
 | 
			
		||||
  // Caller need ensuere the participants array has already been set and the
 | 
			
		||||
  // size of the participants array is larger or equal than one.
 | 
			
		||||
  virtual int do_prepare_redo();
 | 
			
		||||
  virtual int do_prepare(bool &no_need_submit_log) override;
 | 
			
		||||
  virtual int on_prepare() override;
 | 
			
		||||
  virtual int do_pre_commit(bool &need_wait) override;
 | 
			
		||||
 | 
			
		||||
@ -197,6 +197,7 @@ public:
 | 
			
		||||
  //
 | 
			
		||||
  // NB: The implementation need guarantee the method is failure atomic, So the
 | 
			
		||||
  // method should never report an error.
 | 
			
		||||
  virtual int do_prepare_redo() = 0;
 | 
			
		||||
  virtual int do_prepare(bool &no_need_submit_log) = 0;
 | 
			
		||||
  virtual int do_pre_commit(bool& need_wait) = 0;
 | 
			
		||||
  virtual int do_commit() = 0;
 | 
			
		||||
 | 
			
		||||
@ -107,6 +107,17 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_redo_request_impl_()
 | 
			
		||||
    TRANS_LOG(WARN, "unexpected operation", K(ret), K(*this));
 | 
			
		||||
  } else if (is_2pc_logging()) {
 | 
			
		||||
    TRANS_LOG(INFO, "committer is under logging", K(ret), K(*this));
 | 
			
		||||
  } else if (OB_FAIL(do_prepare_redo())) {
 | 
			
		||||
    TRANS_LOG(WARN, "do prepare redo fail", K(ret));
 | 
			
		||||
    if (OB_FAIL(drive_self_2pc_phase(ObTxState::ABORT))) {
 | 
			
		||||
      TRANS_LOG(WARN, "drive self abort fail", KR(tmp_ret), KPC(this));
 | 
			
		||||
    }
 | 
			
		||||
    if (is_internal() && OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_REQ))) {
 | 
			
		||||
      TRANS_LOG(WARN, "post downstream abort msg failed", KR(tmp_ret), KPC(this));
 | 
			
		||||
    }
 | 
			
		||||
    if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_RESP, OB_C2PC_UPSTREAM_ID))) {
 | 
			
		||||
      TRANS_LOG(WARN, "post upstream abort resp msg failed", KR(tmp_ret), KPC(this));
 | 
			
		||||
    }
 | 
			
		||||
  } else if (OB_TMP_FAIL(submit_log(ObTwoPhaseCommitLogType::OB_LOG_TX_COMMIT_INFO))) {
 | 
			
		||||
    if (OB_BLOCK_FROZEN == tmp_ret) {
 | 
			
		||||
      // memtable is freezing, can not submit log right now.
 | 
			
		||||
 | 
			
		||||
@ -337,7 +337,16 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_request_impl_() {
 | 
			
		||||
    TRANS_LOG(WARN, "apply msg failed", K(ret), KPC(this));
 | 
			
		||||
  } else if (OB_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) {
 | 
			
		||||
    TRANS_LOG(WARN, "do prepare failed", K(ret), K(*this));
 | 
			
		||||
  } else {
 | 
			
		||||
    if (OB_TRANS_NEED_ROLLBACK == ret) {
 | 
			
		||||
      if (OB_FAIL(drive_self_2pc_phase(ObTxState::ABORT))) {
 | 
			
		||||
        TRANS_LOG(WARN, "drive abort failed", K(ret), K(*this));
 | 
			
		||||
      } else if (OB_TMP_FAIL(post_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_RESP, OB_C2PC_UPSTREAM_ID))) {
 | 
			
		||||
        TRANS_LOG(WARN, "post abort resp msg failed", K(tmp_ret), K(*this));
 | 
			
		||||
      }
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  if (OB_SUCC(ret)) {
 | 
			
		||||
    switch (get_2pc_role()) {
 | 
			
		||||
    case Ob2PCRole::ROOT: {
 | 
			
		||||
      ret = OB_ERR_UNEXPECTED;
 | 
			
		||||
@ -345,9 +354,8 @@ int ObTxCycleTwoPhaseCommitter::handle_2pc_prepare_request_impl_() {
 | 
			
		||||
      break;
 | 
			
		||||
    }
 | 
			
		||||
    case Ob2PCRole::INTERNAL: {
 | 
			
		||||
 | 
			
		||||
      if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ))) {
 | 
			
		||||
        TRANS_LOG(WARN, "post prepare msg failed", KR(ret));
 | 
			
		||||
      if (OB_TMP_FAIL(retransmit_downstream_msg_())) {
 | 
			
		||||
        TRANS_LOG(WARN, "post downstream msg failed", KR(ret));
 | 
			
		||||
      }
 | 
			
		||||
      break;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -40,6 +40,14 @@ int ObTxCycleTwoPhaseCommitter::two_phase_commit()
 | 
			
		||||
    TRANS_LOG(INFO, "committer is under logging", K(ret), K(*this));
 | 
			
		||||
  } else if (OB_FAIL(drive_self_2pc_phase(ObTxState::PREPARE))) {
 | 
			
		||||
    TRANS_LOG(WARN, "enter prepare phase failed", K(ret), K(*this));
 | 
			
		||||
    if (OB_TRANS_NEED_ROLLBACK == ret) {
 | 
			
		||||
      if (OB_FAIL(drive_self_2pc_phase(ObTxState::ABORT))) {
 | 
			
		||||
        TRANS_LOG(WARN, "enter abort phase failed", K(ret), K(*this));
 | 
			
		||||
      } else if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_ABORT_REQ))) {
 | 
			
		||||
        TRANS_LOG(WARN, "post prepare requests failed", K(tmp_ret));
 | 
			
		||||
      }
 | 
			
		||||
      ret = OB_TRANS_KILLED;
 | 
			
		||||
    }
 | 
			
		||||
  } else {
 | 
			
		||||
    if (OB_TMP_FAIL(post_downstream_msg(ObTwoPhaseCommitMsgType::OB_MSG_TX_PREPARE_REQ))) {
 | 
			
		||||
      TRANS_LOG(WARN, "post prepare requests failed", K(tmp_ret));
 | 
			
		||||
 | 
			
		||||
@ -65,6 +65,16 @@ int ObPartTransCtx::restart_2pc_trans_timer_()
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int ObPartTransCtx::do_prepare_redo()
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  if (sub_state_.is_force_abort()) {
 | 
			
		||||
    ret = OB_TRANS_NEED_ROLLBACK;
 | 
			
		||||
    TRANS_LOG(WARN, "tx was marked force abort", K(ret));
 | 
			
		||||
  }
 | 
			
		||||
  return ret;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
 * If no_need_submit_log is true, it will not submit prepare log after do_prepare.
 | 
			
		||||
 * XA and dup_table will submit commit info log in do_prepare and drive to submit prepare log after the conditions are met.
 | 
			
		||||
@ -74,8 +84,11 @@ int ObPartTransCtx::do_prepare(bool &no_need_submit_log)
 | 
			
		||||
{
 | 
			
		||||
  int ret = OB_SUCCESS;
 | 
			
		||||
  no_need_submit_log = false;
 | 
			
		||||
 | 
			
		||||
  if (exec_info_.is_dup_tx_ || OB_SUCC(search_unsubmitted_dup_table_redo_())) {
 | 
			
		||||
  if (sub_state_.is_force_abort()) {
 | 
			
		||||
    // txn has marked force_abort, prepare should fail
 | 
			
		||||
    ret = OB_TRANS_NEED_ROLLBACK;
 | 
			
		||||
    TRANS_LOG(WARN, "tx was marked force_abort", K(ret));
 | 
			
		||||
  } else if (exec_info_.is_dup_tx_ || OB_SUCC(search_unsubmitted_dup_table_redo_())) {
 | 
			
		||||
    no_need_submit_log = true;
 | 
			
		||||
    if (OB_FAIL(dup_table_tx_redo_sync_())) {
 | 
			
		||||
      TRANS_LOG(WARN, "dup table tx  redo sync failed", K(ret));
 | 
			
		||||
 | 
			
		||||
@ -1411,7 +1411,7 @@ inline int ObTransService::rollback_savepoint_slowpath_(ObTxDesc &tx,
 | 
			
		||||
  ARRAY_FOREACH_NORET(parts, i) {
 | 
			
		||||
    if (parts[i].epoch_ <= 0) {
 | 
			
		||||
      int64_t len = tx.get_serialize_size() + sizeof(ObTxDesc);
 | 
			
		||||
      char *buf = (char*)ob_malloc(len);
 | 
			
		||||
      char *buf = (char*)ob_malloc(len, "TxRollbackSP");
 | 
			
		||||
      int64_t pos = sizeof(ObTxDesc);
 | 
			
		||||
      if (OB_FAIL(tx.serialize(buf, len, pos))) {
 | 
			
		||||
        TRANS_LOG(WARN, "serialize tx fail", KR(ret), K(tx));
 | 
			
		||||
 | 
			
		||||
@ -91,7 +91,7 @@ OB_DEF_DESERIALIZE(ObTxRollbackSPMsg)
 | 
			
		||||
    bool has_tx_ptr = false;
 | 
			
		||||
    OB_UNIS_DECODE(has_tx_ptr);
 | 
			
		||||
    if (has_tx_ptr) {
 | 
			
		||||
      void *buffer = ob_malloc(sizeof(ObTxDesc));
 | 
			
		||||
      void *buffer = ob_malloc(sizeof(ObTxDesc), "TxRollbackSP");
 | 
			
		||||
      if (OB_ISNULL(buffer)) {
 | 
			
		||||
        ret = OB_ALLOCATE_MEMORY_FAILED;
 | 
			
		||||
      } else {
 | 
			
		||||
 | 
			
		||||
@ -74,7 +74,10 @@ int MockOb2pcCtx::commit(const MockObParticipants& participants)
 | 
			
		||||
  participants_.assign(participants.begin(), participants.end());
 | 
			
		||||
  return two_phase_commit();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
int MockOb2pcCtx::do_prepare_redo()
 | 
			
		||||
{
 | 
			
		||||
  return OB_SUCCESS;
 | 
			
		||||
}
 | 
			
		||||
int MockOb2pcCtx::do_prepare(bool &no_need_submit_log)
 | 
			
		||||
{
 | 
			
		||||
  no_need_submit_log = false;
 | 
			
		||||
 | 
			
		||||
@ -94,6 +94,7 @@ protected:
 | 
			
		||||
  // decide final transaction state. In Oceanbase's optimized, do_pre_commit is used to
 | 
			
		||||
  // optimize single machine read latency and do/on_clear is used to maintain the state
 | 
			
		||||
  // to recovery
 | 
			
		||||
  virtual int do_prepare_redo() override;
 | 
			
		||||
  virtual int do_prepare(bool &no_need_submit_log) override;
 | 
			
		||||
  virtual int on_prepare() override;
 | 
			
		||||
  virtual int do_pre_commit(bool& need_wait) override;
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user