fix busy log_cb not return when log sync success callback and TxCtx is exiting
This commit is contained in:
@ -427,11 +427,6 @@ void ObPartTransCtx::reset_log_cb_list_(common::ObDList<ObTxLogCb> &cb_list)
|
|||||||
ObTxLogCb *cb = NULL;
|
ObTxLogCb *cb = NULL;
|
||||||
while (OB_NOT_NULL(cb = cb_list.remove_first())) {
|
while (OB_NOT_NULL(cb = cb_list.remove_first())) {
|
||||||
const bool is_dynamic = cb->is_dynamic();
|
const bool is_dynamic = cb->is_dynamic();
|
||||||
if (OB_NOT_NULL(cb->get_tx_data())) {
|
|
||||||
ObTxData *tx_data = cb->get_tx_data();
|
|
||||||
ctx_tx_data_.free_tmp_tx_data(tx_data);
|
|
||||||
cb->set_tx_data(nullptr);
|
|
||||||
}
|
|
||||||
if (!is_dynamic) {
|
if (!is_dynamic) {
|
||||||
cb->reset();
|
cb->reset();
|
||||||
} else {
|
} else {
|
||||||
@ -664,8 +659,8 @@ int ObPartTransCtx::handle_timeout(const int64_t delay)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// retry submit abort log
|
// retry submit abort log for local tx abort
|
||||||
if (!is_follower_()
|
if (!is_follower_() && is_local_tx_()
|
||||||
&& get_upstream_state() == ObTxState::ABORT
|
&& get_upstream_state() == ObTxState::ABORT
|
||||||
&& get_upstream_state() != get_downstream_state()) {
|
&& get_upstream_state() != get_downstream_state()) {
|
||||||
if (OB_FAIL(compensate_abort_log_())) {
|
if (OB_FAIL(compensate_abort_log_())) {
|
||||||
@ -2222,11 +2217,12 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb)
|
|||||||
return_log_cb_(log_cb);
|
return_log_cb_(log_cb);
|
||||||
} else if (is_exiting_) {
|
} else if (is_exiting_) {
|
||||||
// the TxCtx maybe has been killed forcedly by background GC thread
|
// the TxCtx maybe has been killed forcedly by background GC thread
|
||||||
// because the log_cb process skipped, here don't free log_cb directly
|
// the log_cb process has been skipped
|
||||||
// the cleanup routine in destroy will handle, see `reset_log_cb_list_`
|
|
||||||
if (sub_state_.is_force_abort()) {
|
if (sub_state_.is_force_abort()) {
|
||||||
TRANS_LOG(WARN, "ctx has been aborted forcedly before log sync successfully", KPC(this));
|
TRANS_LOG(WARN, "ctx has been aborted forcedly before log sync successfully", KPC(this));
|
||||||
print_trace_log_();
|
print_trace_log_();
|
||||||
|
busy_cbs_.remove(log_cb);
|
||||||
|
return_log_cb_(log_cb);
|
||||||
} else {
|
} else {
|
||||||
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "callback was missed when tx ctx exiting", KPC(log_cb), KPC(this));
|
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "callback was missed when tx ctx exiting", KPC(log_cb), KPC(this));
|
||||||
print_trace_log_();
|
print_trace_log_();
|
||||||
@ -2321,7 +2317,7 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb)
|
|||||||
// do nothing
|
// do nothing
|
||||||
} else if (ObTxLogType::TX_MULTI_DATA_SOURCE_LOG == log_type) {
|
} else if (ObTxLogType::TX_MULTI_DATA_SOURCE_LOG == log_type) {
|
||||||
share::SCN notify_redo_scn =
|
share::SCN notify_redo_scn =
|
||||||
log_cb->get_first_part_scn().is_valid() ? log_cb->get_first_part_scn() : log_ts;
|
log_cb->get_first_part_scn().is_valid() ? log_cb->get_first_part_scn() : log_ts;
|
||||||
if (OB_FAIL(log_cb->get_mds_range().move_from_cache_to_arr(mds_cache_,
|
if (OB_FAIL(log_cb->get_mds_range().move_from_cache_to_arr(mds_cache_,
|
||||||
exec_info_.multi_data_source_))) {
|
exec_info_.multi_data_source_))) {
|
||||||
TRANS_LOG(WARN, "move from mds cache to durable arr failed", K(ret));
|
TRANS_LOG(WARN, "move from mds cache to durable arr failed", K(ret));
|
||||||
@ -2490,8 +2486,8 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb)
|
|||||||
if (OB_FAIL(ctx_tx_data_.set_end_log_ts(log_ts))) {
|
if (OB_FAIL(ctx_tx_data_.set_end_log_ts(log_ts))) {
|
||||||
TRANS_LOG(WARN, "set end log ts failed", K(ret));
|
TRANS_LOG(WARN, "set end log ts failed", K(ret));
|
||||||
} else if (OB_FAIL(mds_cache_.generate_final_notify_array(exec_info_.multi_data_source_,
|
} else if (OB_FAIL(mds_cache_.generate_final_notify_array(exec_info_.multi_data_source_,
|
||||||
true /*need_merge_cache*/,
|
true /*need_merge_cache*/,
|
||||||
true /*allow_log_overflow*/))) {
|
true /*allow_log_overflow*/))) {
|
||||||
TRANS_LOG(WARN, "gen total mds array failed", K(ret));
|
TRANS_LOG(WARN, "gen total mds array failed", K(ret));
|
||||||
} else if (OB_FAIL(notify_data_source_(type, log_ts, false, mds_cache_.get_final_notify_array()))) {
|
} else if (OB_FAIL(notify_data_source_(type, log_ts, false, mds_cache_.get_final_notify_array()))) {
|
||||||
TRANS_LOG(WARN, "notify data source failed", KR(ret), K(*this));
|
TRANS_LOG(WARN, "notify data source failed", KR(ret), K(*this));
|
||||||
@ -2526,11 +2522,11 @@ int ObPartTransCtx::on_success_ops_(ObTxLogCb *log_cb)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
REC_TRANS_TRACE_EXT(tlog_, log_sync_succ_cb,
|
REC_TRANS_TRACE_EXT(tlog_, log_sync_succ_cb,
|
||||||
OB_ID(ret), ret,
|
OB_ID(ret), ret,
|
||||||
OB_ID(log_type), (void*)log_type,
|
OB_ID(log_type), (void*)log_type,
|
||||||
OB_ID(t), log_ts,
|
OB_ID(t), log_ts,
|
||||||
OB_ID(offset), log_lsn,
|
OB_ID(offset), log_lsn,
|
||||||
OB_ID(ref), get_ref());
|
OB_ID(ref), get_ref());
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -364,7 +364,8 @@ int ObPartTransCtx::do_clear()
|
|||||||
|
|
||||||
if (is_root()) {
|
if (is_root()) {
|
||||||
// response scheduler after all participant commit log sycned
|
// response scheduler after all participant commit log sycned
|
||||||
check_and_response_scheduler_(ObTxState::CLEAR, OB_SUCCESS);
|
const int result = ctx_tx_data_.get_state() == ObTxCommitData::COMMIT ? OB_SUCCESS : OB_TRANS_ROLLBACKED;
|
||||||
|
check_and_response_scheduler_(ObTxState::CLEAR, result);
|
||||||
}
|
}
|
||||||
// currently do nothing
|
// currently do nothing
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user