diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index ad726d7251..cff1da91aa 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -1658,10 +1658,11 @@ int ObPartTransCtx::compensate_abort_log_() int ret = OB_SUCCESS; if (is_force_abort_logging_()) { // do nothing + } else if(OB_FALSE_IT(sub_state_.set_force_abort())) { + } else if (OB_FAIL(submit_log_impl_(ObTxLogType::TX_ABORT_LOG))) { TRANS_LOG(WARN, "submit abort log failed", KR(ret), K(*this)); } else { - sub_state_.set_force_abort(); } TRANS_LOG(INFO, "compensate abort log", K(ret), KPC(this)); return ret; @@ -2556,7 +2557,11 @@ int ObPartTransCtx::submit_redo_commit_info_log_() ObTxLogBlockHeader log_block_header(cluster_id_, exec_info_.next_log_entry_no_, trans_id_, exec_info_.scheduler_); - if (sub_state_.is_info_log_submitted()) { + if (need_force_abort_() || is_force_abort_logging_() + || get_downstream_state() == ObTxState::ABORT) { + ret = OB_TRANS_KILLED; + TRANS_LOG(WARN, "tx has been aborting, can not submit prepare log", K(ret)); + } else if (sub_state_.is_info_log_submitted()) { // state log already submitted, do nothing } else if (OB_FAIL(log_block.init(replay_hint, log_block_header))) { TRANS_LOG(WARN, "init log block failed", KR(ret), K(*this)); @@ -2789,19 +2794,26 @@ int ObPartTransCtx::submit_prepare_log_() ObTxLogBlockHeader log_block_header(cluster_id_, exec_info_.next_log_entry_no_, trans_id_, exec_info_.scheduler_); - if (OB_FAIL(log_block.init(replay_hint, log_block_header))) { - TRANS_LOG(WARN, "init log block failed", KR(ret), K(*this)); - } else if (!sub_state_.is_info_log_submitted()) { - prev_lsn.reset(); - if (OB_FAIL(submit_multi_data_source_(log_block))) { - TRANS_LOG(WARN, "submit multi source data failed", KR(ret), K(*this)); - } else if (OB_FAIL(submit_redo_commit_info_log_(log_block, has_redo, helper))) { - TRANS_LOG(WARN, "submit redo commit state log failed", KR(ret), K(*this)); - } else { - // do nothing - } + if (need_force_abort_() || is_force_abort_logging_() + || get_downstream_state() == ObTxState::ABORT) { + ret = OB_TRANS_KILLED; + TRANS_LOG(WARN, "tx has been aborting, can not submit prepare log", K(ret)); } + if (OB_SUCC(ret)) { + if (OB_FAIL(log_block.init(replay_hint, log_block_header))) { + TRANS_LOG(WARN, "init log block failed", KR(ret), K(*this)); + } else if (!sub_state_.is_info_log_submitted()) { + prev_lsn.reset(); + if (OB_FAIL(submit_multi_data_source_(log_block))) { + TRANS_LOG(WARN, "submit multi source data failed", KR(ret), K(*this)); + } else if (OB_FAIL(submit_redo_commit_info_log_(log_block, has_redo, helper))) { + TRANS_LOG(WARN, "submit redo commit state log failed", KR(ret), K(*this)); + } else { + // do nothing + } + } + } if (OB_SUCC(ret)) { ObTxLogCb *log_cb = NULL; @@ -2912,7 +2924,11 @@ int ObPartTransCtx::submit_commit_log_() log_block_header(cluster_id_, exec_info_.next_log_entry_no_, trans_id_, exec_info_.scheduler_); const bool local_tx = is_local_tx_(); - if (OB_FAIL(gen_final_mds_array_(multi_source_data))) { + if (need_force_abort_() || is_force_abort_logging_() + || get_downstream_state() == ObTxState::ABORT) { + ret = OB_TRANS_KILLED; + TRANS_LOG(WARN, "tx has been aborting, can not submit prepare log", K(ret)); + } else if (OB_FAIL(gen_final_mds_array_(multi_source_data))) { TRANS_LOG(WARN, "gen total multi source data failed", KR(ret), K(*this)); } else { bool use_local_block_buf = local_tx @@ -4962,7 +4978,14 @@ int ObPartTransCtx::switch_to_leader(const SCN &start_working_ts) } else { TRANS_LOG(WARN, "txn data incomplete, will be aborted", K(contain_table_lock), KPC(this)); if (has_persisted_log_()) { - if (OB_FAIL(do_local_tx_end_(TxEndAction::DELAY_ABORT_TX))) { + if (ObPartTransAction::COMMIT == part_trans_action_ || get_upstream_state() >= ObTxState::PREPARE) { + + TRANS_LOG(WARN, "abort self instantly with a tx_commit request", K(contain_table_lock), + KPC(this)); + if (OB_FAIL(do_local_tx_end_(TxEndAction::ABORT_TX))) { + TRANS_LOG(WARN, "abort tx failed", KR(ret), KPC(this)); + } + } else if (OB_FAIL(do_local_tx_end_(TxEndAction::DELAY_ABORT_TX))) { TRANS_LOG(WARN, "abort tx failed", KR(ret), K(*this)); } } else {