fix log sync success callback after txCtx has been killed forcedly

This commit is contained in:
chinaxing
2024-03-05 08:45:03 +00:00
committed by ob-robot
parent 799b36e4f5
commit acbbcc4d45

View File

@ -2155,14 +2155,16 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb)
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS;
const int64_t cur_ts = ObTimeUtility::current_time(); const int64_t cur_ts = ObTimeUtility::current_time();
const int64_t log_sync_used_time = cur_ts - log_cb->get_submit_ts(); bool handle_fast_commit = false;
ObTransStatistic::get_instance().add_clog_sync_time(tenant_id_, log_sync_used_time);
ObTransStatistic::get_instance().add_clog_sync_count(tenant_id_, 1);
bool skip_fast_commit = false;
bool try_submit_next_log = false; bool try_submit_next_log = false;
bool need_return_log_cb = false;
{ {
// allow fill redo concurrently with log callback // allow fill redo concurrently with log callback
CtxLockGuard guard(lock_, is_committing_() ? CtxLockGuard::MODE::ALL : CtxLockGuard::MODE::CTX); CtxLockGuard guard(lock_, is_committing_() ? CtxLockGuard::MODE::ALL : CtxLockGuard::MODE::CTX);
const int64_t log_sync_used_time = cur_ts - log_cb->get_submit_ts();
ObTransStatistic::get_instance().add_clog_sync_time(tenant_id_, log_sync_used_time);
ObTransStatistic::get_instance().add_clog_sync_count(tenant_id_, 1);
const int64_t ctx_lock_wait_time = guard.get_lock_acquire_used_time(); const int64_t ctx_lock_wait_time = guard.get_lock_acquire_used_time();
if (log_sync_used_time + ctx_lock_wait_time >= ObServerConfig::get_instance().clog_sync_time_warn_threshold) { if (log_sync_used_time + ctx_lock_wait_time >= ObServerConfig::get_instance().clog_sync_time_warn_threshold) {
TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "transaction log sync use too much time", KPC(log_cb), TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "transaction log sync use too much time", KPC(log_cb),
@ -2179,15 +2181,25 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb)
#ifndef NDEBUG #ifndef NDEBUG
TRANS_LOG(INFO, "cb has been callbacked", KPC(log_cb)); TRANS_LOG(INFO, "cb has been callbacked", KPC(log_cb));
#endif #endif
busy_cbs_.remove(log_cb);
return_log_cb_(log_cb);
} else if (is_exiting_) { } else if (is_exiting_) {
// maybe because commit log callbacked before redo log, and ctx is already released // the TxCtx maybe has been killed forcedly by background GC thread
// if there is tx data, it would be released when reset_log_cbs_ // because the log_cb process skipped, here don't free log_cb directly
TRANS_LOG(ERROR, "this callback was missed when tx ctx exiting", KPC(log_cb)); // the cleanup routine in destroy will handle, see `reset_log_cb_list_`
ob_abort(); if (sub_state_.is_force_abort()) {
TRANS_LOG(WARN, "ctx has been aborted forcedly before log sync successfully", KPC(this));
print_trace_log_();
} else {
TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "callback was missed when tx ctx exiting", KPC(log_cb), KPC(this));
print_trace_log_();
ob_abort();
}
} else { } else {
// save the first error code // save the first error code
int save_ret = OB_SUCCESS; int save_ret = OB_SUCCESS;
ObTxLogCb *cur_cb = busy_cbs_.get_first(); ObTxLogCb *cur_cb = busy_cbs_.get_first();
// process all preceding log_cbs
for (int64_t i = 0; i < busy_cbs_.get_size(); i++) { for (int64_t i = 0; i < busy_cbs_.get_size(); i++) {
if (cur_cb->is_callbacked()) { if (cur_cb->is_callbacked()) {
// do nothing // do nothing
@ -2220,28 +2232,30 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb)
// return first error code // return first error code
ret = save_ret; ret = save_ret;
} }
} // try submit record log under CtxLock
if (sub_state_.is_state_log_submitted() || log_cb->get_callbacks().count() == 0) { if (need_record_log_()) {
skip_fast_commit = true; // ignore error
} if (OB_SUCCESS != (tmp_ret = submit_record_log_())) {
if (need_record_log_()) { TRANS_LOG(WARN, "failed to submit record log", K(tmp_ret), K(*this));
// ignore error }
if (OB_SUCCESS != (tmp_ret = submit_record_log_())) {
TRANS_LOG(WARN, "failed to submit record log", K(tmp_ret), K(*this));
} }
handle_fast_commit = !(sub_state_.is_state_log_submitted() || log_cb->get_callbacks().count() == 0);
try_submit_next_log = !ObTxLogTypeChecker::is_state_log(log_cb->get_last_log_type()) && is_committing_();
busy_cbs_.remove(log_cb);
need_return_log_cb = true;
} }
try_submit_next_log = !ObTxLogTypeChecker::is_state_log(log_cb->get_last_log_type()) && is_committing_();
busy_cbs_.remove(log_cb);
} }
// let fast commit out of ctx's lock, because it is time consuming in calculating checksum // let fast commit out of ctx's lock, because it is time consuming in calculating checksum
if (!skip_fast_commit) { if (handle_fast_commit) {
// acquire REDO_FLUSH_READ LOCK, which allow other thread flush redo // acquire REDO_FLUSH_READ LOCK, which allow other thread flush redo
// but disable other manage operation on ctx // but disable other manage operation on ctx
// FIXME: acquire CTX's READ lock maybe better // FIXME: acquire CTX's READ lock maybe better
CtxLockGuard guard(lock_, CtxLockGuard::MODE::REDO_FLUSH_R); CtxLockGuard guard(lock_, CtxLockGuard::MODE::REDO_FLUSH_R);
mt_ctx_.remove_callbacks_for_fast_commit(log_cb->get_callbacks()); mt_ctx_.remove_callbacks_for_fast_commit(log_cb->get_callbacks());
} }
return_log_cb_(log_cb); if (need_return_log_cb) {
return_log_cb_(log_cb);
}
// try submit log if txn is in commit phase // try submit log if txn is in commit phase
if (try_submit_next_log) { if (try_submit_next_log) {
// in commiting, acquire CTX lock is enough, because redo flushing must finished // in commiting, acquire CTX lock is enough, because redo flushing must finished