From acbbcc4d45bc7c7f1302b6c56151c3f129ff2ff6 Mon Sep 17 00:00:00 2001 From: chinaxing Date: Tue, 5 Mar 2024 08:45:03 +0000 Subject: [PATCH] fix log sync success callback after txCtx has been killed forcedly --- src/storage/tx/ob_trans_part_ctx.cpp | 54 +++++++++++++++++----------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 5134a1516f..fe517f5a26 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -2155,14 +2155,16 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb) int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; const int64_t cur_ts = ObTimeUtility::current_time(); - const int64_t log_sync_used_time = cur_ts - log_cb->get_submit_ts(); - ObTransStatistic::get_instance().add_clog_sync_time(tenant_id_, log_sync_used_time); - ObTransStatistic::get_instance().add_clog_sync_count(tenant_id_, 1); - bool skip_fast_commit = false; + bool handle_fast_commit = false; bool try_submit_next_log = false; + bool need_return_log_cb = false; { // allow fill redo concurrently with log callback CtxLockGuard guard(lock_, is_committing_() ? CtxLockGuard::MODE::ALL : CtxLockGuard::MODE::CTX); + + const int64_t log_sync_used_time = cur_ts - log_cb->get_submit_ts(); + ObTransStatistic::get_instance().add_clog_sync_time(tenant_id_, log_sync_used_time); + ObTransStatistic::get_instance().add_clog_sync_count(tenant_id_, 1); const int64_t ctx_lock_wait_time = guard.get_lock_acquire_used_time(); if (log_sync_used_time + ctx_lock_wait_time >= ObServerConfig::get_instance().clog_sync_time_warn_threshold) { TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "transaction log sync use too much time", KPC(log_cb), @@ -2179,15 +2181,25 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb) #ifndef NDEBUG TRANS_LOG(INFO, "cb has been callbacked", KPC(log_cb)); #endif + busy_cbs_.remove(log_cb); + return_log_cb_(log_cb); } else if (is_exiting_) { - // maybe because commit log callbacked before redo log, and ctx is already released - // if there is tx data, it would be released when reset_log_cbs_ - TRANS_LOG(ERROR, "this callback was missed when tx ctx exiting", KPC(log_cb)); - ob_abort(); + // the TxCtx maybe has been killed forcedly by background GC thread + // because the log_cb process skipped, here don't free log_cb directly + // the cleanup routine in destroy will handle, see `reset_log_cb_list_` + if (sub_state_.is_force_abort()) { + TRANS_LOG(WARN, "ctx has been aborted forcedly before log sync successfully", KPC(this)); + print_trace_log_(); + } else { + TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "callback was missed when tx ctx exiting", KPC(log_cb), KPC(this)); + print_trace_log_(); + ob_abort(); + } } else { // save the first error code int save_ret = OB_SUCCESS; ObTxLogCb *cur_cb = busy_cbs_.get_first(); + // process all preceding log_cbs for (int64_t i = 0; i < busy_cbs_.get_size(); i++) { if (cur_cb->is_callbacked()) { // do nothing @@ -2220,28 +2232,30 @@ int ObPartTransCtx::on_success(ObTxLogCb *log_cb) // return first error code ret = save_ret; } - } - if (sub_state_.is_state_log_submitted() || log_cb->get_callbacks().count() == 0) { - skip_fast_commit = true; - } - if (need_record_log_()) { - // ignore error - if (OB_SUCCESS != (tmp_ret = submit_record_log_())) { - TRANS_LOG(WARN, "failed to submit record log", K(tmp_ret), K(*this)); + // try submit record log under CtxLock + if (need_record_log_()) { + // ignore error + if (OB_SUCCESS != (tmp_ret = submit_record_log_())) { + TRANS_LOG(WARN, "failed to submit record log", K(tmp_ret), K(*this)); + } } + handle_fast_commit = !(sub_state_.is_state_log_submitted() || log_cb->get_callbacks().count() == 0); + try_submit_next_log = !ObTxLogTypeChecker::is_state_log(log_cb->get_last_log_type()) && is_committing_(); + busy_cbs_.remove(log_cb); + need_return_log_cb = true; } - try_submit_next_log = !ObTxLogTypeChecker::is_state_log(log_cb->get_last_log_type()) && is_committing_(); - busy_cbs_.remove(log_cb); } // let fast commit out of ctx's lock, because it is time consuming in calculating checksum - if (!skip_fast_commit) { + if (handle_fast_commit) { // acquire REDO_FLUSH_READ LOCK, which allow other thread flush redo // but disable other manage operation on ctx // FIXME: acquire CTX's READ lock maybe better CtxLockGuard guard(lock_, CtxLockGuard::MODE::REDO_FLUSH_R); mt_ctx_.remove_callbacks_for_fast_commit(log_cb->get_callbacks()); } - return_log_cb_(log_cb); + if (need_return_log_cb) { + return_log_cb_(log_cb); + } // try submit log if txn is in commit phase if (try_submit_next_log) { // in commiting, acquire CTX lock is enough, because redo flushing must finished