diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index d799628e48..f0902c0446 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -3891,6 +3891,12 @@ int ObPartTransCtx::submit_log_impl_(const ObTxLogType log_type) } if (OB_SWITCHING_TO_FOLLOWER_GRACEFULLY == ERRSIM_DELAY_TX_SUBMIT_LOG && 1002 == MTL_ID() && 1001 == ls_id_.id()) { ret = ERRSIM_DELAY_TX_SUBMIT_LOG; + } else if (ERRSIM_DELAY_TX_SUBMIT_LOG < int(-9999)) { + // reject trans_id and tenant id to contorl trans sate + ret = process_errsim_for_standby_read_(int(ERRSIM_DELAY_TX_SUBMIT_LOG), + OB_SWITCHING_TO_FOLLOWER_GRACEFULLY); + TRANS_LOG(INFO, "reject delay tx submit log when submit commit log", + K(ret), K(ERRSIM_DELAY_TX_SUBMIT_LOG)); } if (OB_SUCC(ret) && mt_ctx_.is_prepared()) { ret = submit_commit_log_(); @@ -6803,6 +6809,23 @@ int ObPartTransCtx::errism_dup_table_redo_sync_() return ret; } +int ObPartTransCtx::process_errsim_for_standby_read_(int err_code, int ret_code) +{ + // reject trans_id and tenant id to contorl trans sate + int ret = OB_SUCCESS; + + int64_t rej_code = abs(err_code); + int64_t rej_tenant_id = rej_code % 10000; + int64_t rej_trans_id = (rej_code - rej_tenant_id) / 10000; + if (trans_id_.get_id() == rej_trans_id && MTL_ID() == rej_tenant_id) { + ret = ret_code; + TRANS_LOG(INFO, "reject special code for control trans state", K(ret), K(err_code), + K(rej_code), K(rej_tenant_id), K(rej_trans_id), K(trans_id_)); + } + + return ret; +} + OB_NOINLINE int ObPartTransCtx::errism_submit_prepare_log_() { @@ -6812,6 +6835,10 @@ OB_NOINLINE int ObPartTransCtx::errism_submit_prepare_log_() ret = ERRSIM_DELAY_TX_SUBMIT_LOG; } else if (OB_BLOCK_FROZEN == ERRSIM_DELAY_TX_SUBMIT_LOG && !is_root()) { ret = ERRSIM_DELAY_TX_SUBMIT_LOG; + } else if (ERRSIM_DELAY_TX_SUBMIT_LOG < int(-9999)) { + // reject trans_id and tenant id to contorl trans sate + ret = process_errsim_for_standby_read_(int(ERRSIM_DELAY_TX_SUBMIT_LOG), OB_NOT_MASTER); + TRANS_LOG(INFO, "reject delay tx submit log", K(ret), K(ERRSIM_DELAY_TX_SUBMIT_LOG)); } #ifdef ERRSIM @@ -8109,6 +8136,15 @@ int ObPartTransCtx::tx_keepalive_response_(const int64_t status) if (OB_SWITCHING_TO_FOLLOWER_GRACEFULLY == ERRSIM_DELAY_TX_SUBMIT_LOG) { return ret; + } else if (ERRSIM_DELAY_TX_SUBMIT_LOG < int(-9999)) { + // reject trans_id and tenant id to contorl trans sate + ret = process_errsim_for_standby_read_(int(ERRSIM_DELAY_TX_SUBMIT_LOG), + OB_SWITCHING_TO_FOLLOWER_GRACEFULLY); + if (OB_FAIL(ret)) { + return ret; + TRANS_LOG(INFO, "reject delay tx submit log when response to keepalive", + K(ret), K(ERRSIM_DELAY_TX_SUBMIT_LOG)); + } } if ((OB_TRANS_CTX_NOT_EXIST == status || OB_TRANS_ROLLBACKED == status || diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 77eb094319..d777d803f9 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -885,6 +885,7 @@ private: int build_and_post_ask_state_msg_(const share::SCN &snapshot, const share::ObLSID &ori_ls_id, const ObAddr &ori_addr); int check_ls_state_(const SCN &snapshot, const ObLSID &ls_id, const ObStandbyCheckInfo &check_info); + int process_errsim_for_standby_read_(int err_code, int ret_code); int get_ls_replica_readable_scn_(const ObLSID &ls_id, SCN &snapshot_version); int submit_redo_log_for_freeze_(bool &try_submit, const uint32_t freeze_clock); void print_first_mvcc_callback_(); diff --git a/src/storage/tx/ob_tx_2pc_msg_handler.cpp b/src/storage/tx/ob_tx_2pc_msg_handler.cpp index 2d5d062d4a..32b72d73bf 100644 --- a/src/storage/tx/ob_tx_2pc_msg_handler.cpp +++ b/src/storage/tx/ob_tx_2pc_msg_handler.cpp @@ -703,6 +703,15 @@ int ObPartTransCtx::handle_tx_2pc_prepare_resp(const Ob2pcPrepareRespMsg &msg) if (OB_NOT_INIT == ERRSIM_DELAY_TX_COMMIT) { return ret; + } else if (ERRSIM_DELAY_TX_COMMIT < int(-9999)) { + // reject trans_id and tenant id to contorl trans sate + ret = process_errsim_for_standby_read_(int(ERRSIM_DELAY_TX_COMMIT), + OB_NOT_INIT); + if (OB_FAIL(ret)) { + return ret; + TRANS_LOG(INFO, "reject delay tx commit", + K(ret), K(ERRSIM_DELAY_TX_COMMIT)); + } } ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type()); @@ -850,6 +859,15 @@ int ObPartTransCtx::handle_tx_2pc_pre_commit_req(const Ob2pcPreCommitReqMsg &msg if (OB_NOT_SUPPORTED == ERRSIM_DELAY_TX_COMMIT) { return ret; + } else if (ERRSIM_DELAY_TX_COMMIT < int(-9999)) { + // reject trans_id and tenant id to contorl trans sate + ret = process_errsim_for_standby_read_(int(ERRSIM_DELAY_TX_COMMIT), + OB_NOT_SUPPORTED); + if (OB_FAIL(ret)) { + return ret; + TRANS_LOG(INFO, "reject delay tx commit when pre commit", + K(ret), K(ERRSIM_DELAY_TX_COMMIT)); + } } ObTwoPhaseCommitMsgType msg_type = switch_msg_type_(msg.get_msg_type());