fix delay abort bug
This commit is contained in:
@ -1658,10 +1658,11 @@ int ObPartTransCtx::compensate_abort_log_()
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (is_force_abort_logging_()) {
|
if (is_force_abort_logging_()) {
|
||||||
// do nothing
|
// do nothing
|
||||||
|
} else if(OB_FALSE_IT(sub_state_.set_force_abort())) {
|
||||||
|
|
||||||
} else if (OB_FAIL(submit_log_impl_(ObTxLogType::TX_ABORT_LOG))) {
|
} else if (OB_FAIL(submit_log_impl_(ObTxLogType::TX_ABORT_LOG))) {
|
||||||
TRANS_LOG(WARN, "submit abort log failed", KR(ret), K(*this));
|
TRANS_LOG(WARN, "submit abort log failed", KR(ret), K(*this));
|
||||||
} else {
|
} else {
|
||||||
sub_state_.set_force_abort();
|
|
||||||
}
|
}
|
||||||
TRANS_LOG(INFO, "compensate abort log", K(ret), KPC(this));
|
TRANS_LOG(INFO, "compensate abort log", K(ret), KPC(this));
|
||||||
return ret;
|
return ret;
|
||||||
@ -2556,7 +2557,11 @@ int ObPartTransCtx::submit_redo_commit_info_log_()
|
|||||||
ObTxLogBlockHeader
|
ObTxLogBlockHeader
|
||||||
log_block_header(cluster_id_, exec_info_.next_log_entry_no_, trans_id_, exec_info_.scheduler_);
|
log_block_header(cluster_id_, exec_info_.next_log_entry_no_, trans_id_, exec_info_.scheduler_);
|
||||||
|
|
||||||
if (sub_state_.is_info_log_submitted()) {
|
if (need_force_abort_() || is_force_abort_logging_()
|
||||||
|
|| get_downstream_state() == ObTxState::ABORT) {
|
||||||
|
ret = OB_TRANS_KILLED;
|
||||||
|
TRANS_LOG(WARN, "tx has been aborting, can not submit prepare log", K(ret));
|
||||||
|
} else if (sub_state_.is_info_log_submitted()) {
|
||||||
// state log already submitted, do nothing
|
// state log already submitted, do nothing
|
||||||
} else if (OB_FAIL(log_block.init(replay_hint, log_block_header))) {
|
} else if (OB_FAIL(log_block.init(replay_hint, log_block_header))) {
|
||||||
TRANS_LOG(WARN, "init log block failed", KR(ret), K(*this));
|
TRANS_LOG(WARN, "init log block failed", KR(ret), K(*this));
|
||||||
@ -2789,6 +2794,13 @@ int ObPartTransCtx::submit_prepare_log_()
|
|||||||
ObTxLogBlockHeader
|
ObTxLogBlockHeader
|
||||||
log_block_header(cluster_id_, exec_info_.next_log_entry_no_, trans_id_, exec_info_.scheduler_);
|
log_block_header(cluster_id_, exec_info_.next_log_entry_no_, trans_id_, exec_info_.scheduler_);
|
||||||
|
|
||||||
|
if (need_force_abort_() || is_force_abort_logging_()
|
||||||
|
|| get_downstream_state() == ObTxState::ABORT) {
|
||||||
|
ret = OB_TRANS_KILLED;
|
||||||
|
TRANS_LOG(WARN, "tx has been aborting, can not submit prepare log", K(ret));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
if (OB_FAIL(log_block.init(replay_hint, log_block_header))) {
|
if (OB_FAIL(log_block.init(replay_hint, log_block_header))) {
|
||||||
TRANS_LOG(WARN, "init log block failed", KR(ret), K(*this));
|
TRANS_LOG(WARN, "init log block failed", KR(ret), K(*this));
|
||||||
} else if (!sub_state_.is_info_log_submitted()) {
|
} else if (!sub_state_.is_info_log_submitted()) {
|
||||||
@ -2801,7 +2813,7 @@ int ObPartTransCtx::submit_prepare_log_()
|
|||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
ObTxLogCb *log_cb = NULL;
|
ObTxLogCb *log_cb = NULL;
|
||||||
@ -2912,7 +2924,11 @@ int ObPartTransCtx::submit_commit_log_()
|
|||||||
log_block_header(cluster_id_, exec_info_.next_log_entry_no_, trans_id_, exec_info_.scheduler_);
|
log_block_header(cluster_id_, exec_info_.next_log_entry_no_, trans_id_, exec_info_.scheduler_);
|
||||||
const bool local_tx = is_local_tx_();
|
const bool local_tx = is_local_tx_();
|
||||||
|
|
||||||
if (OB_FAIL(gen_final_mds_array_(multi_source_data))) {
|
if (need_force_abort_() || is_force_abort_logging_()
|
||||||
|
|| get_downstream_state() == ObTxState::ABORT) {
|
||||||
|
ret = OB_TRANS_KILLED;
|
||||||
|
TRANS_LOG(WARN, "tx has been aborting, can not submit prepare log", K(ret));
|
||||||
|
} else if (OB_FAIL(gen_final_mds_array_(multi_source_data))) {
|
||||||
TRANS_LOG(WARN, "gen total multi source data failed", KR(ret), K(*this));
|
TRANS_LOG(WARN, "gen total multi source data failed", KR(ret), K(*this));
|
||||||
} else {
|
} else {
|
||||||
bool use_local_block_buf = local_tx
|
bool use_local_block_buf = local_tx
|
||||||
@ -4962,7 +4978,14 @@ int ObPartTransCtx::switch_to_leader(const SCN &start_working_ts)
|
|||||||
} else {
|
} else {
|
||||||
TRANS_LOG(WARN, "txn data incomplete, will be aborted", K(contain_table_lock), KPC(this));
|
TRANS_LOG(WARN, "txn data incomplete, will be aborted", K(contain_table_lock), KPC(this));
|
||||||
if (has_persisted_log_()) {
|
if (has_persisted_log_()) {
|
||||||
if (OB_FAIL(do_local_tx_end_(TxEndAction::DELAY_ABORT_TX))) {
|
if (ObPartTransAction::COMMIT == part_trans_action_ || get_upstream_state() >= ObTxState::PREPARE) {
|
||||||
|
|
||||||
|
TRANS_LOG(WARN, "abort self instantly with a tx_commit request", K(contain_table_lock),
|
||||||
|
KPC(this));
|
||||||
|
if (OB_FAIL(do_local_tx_end_(TxEndAction::ABORT_TX))) {
|
||||||
|
TRANS_LOG(WARN, "abort tx failed", KR(ret), KPC(this));
|
||||||
|
}
|
||||||
|
} else if (OB_FAIL(do_local_tx_end_(TxEndAction::DELAY_ABORT_TX))) {
|
||||||
TRANS_LOG(WARN, "abort tx failed", KR(ret), K(*this));
|
TRANS_LOG(WARN, "abort tx failed", KR(ret), K(*this));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Reference in New Issue
Block a user