From d5668ff4d43ec9c67d7fb0fb62ceb2faf1f6ac7c Mon Sep 17 00:00:00 2001 From: chinaxing Date: Tue, 14 May 2024 06:13:46 +0000 Subject: [PATCH] [refine] writer thread force logging after finished submit log for freeze --- src/storage/memtable/mvcc/ob_mvcc_ctx.cpp | 2 +- .../memtable/mvcc/ob_mvcc_trans_ctx.cpp | 42 ++++++----- src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h | 6 +- src/storage/tx/ob_trans_part_ctx.cpp | 70 +++++++------------ src/storage/tx/ob_trans_part_ctx.h | 2 +- src/storage/tx/ob_tx_api.cpp | 2 +- src/storage/tx/ob_tx_redo_submitter.h | 15 ++-- unittest/storage/tx/test_redo_submitter.cpp | 2 +- 8 files changed, 71 insertions(+), 70 deletions(-) diff --git a/src/storage/memtable/mvcc/ob_mvcc_ctx.cpp b/src/storage/memtable/mvcc/ob_mvcc_ctx.cpp index 7c24813f81..ff35ce2bd0 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_ctx.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_ctx.cpp @@ -315,7 +315,7 @@ ObMvccWriteGuard::~ObMvccWriteGuard() if (REACH_TIME_INTERVAL(100 * 1000)) { TRANS_LOG(WARN, "failed to submit log if neccesary", K(ret), K(is_freeze), KPC(tx_ctx)); } - if (is_freeze) { + if (is_freeze && OB_BLOCK_FROZEN != ret) { memtable_->get_freezer()->set_need_resubmit_log(true); } } diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp index 8b135ee7df..e14975d50c 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp @@ -792,7 +792,11 @@ int ObTransCallbackMgr::prep_and_fill_from_list_(ObTxFillRedoCtx &ctx, return ret; } -bool ObTransCallbackMgr::check_list_has_min_epoch_(const int my_idx, const int64_t my_epoch, int64_t &min_epoch, int &min_idx) +bool ObTransCallbackMgr::check_list_has_min_epoch_(const int my_idx, + const int64_t my_epoch, + const bool require_min, + int64_t &min_epoch, + int &min_idx) { bool ret = true; int list_cnt = get_logging_list_count(); @@ -802,10 +806,16 @@ bool ObTransCallbackMgr::check_list_has_min_epoch_(const int my_idx, const int64 int64_t epoch_i = list->get_log_epoch(); if (epoch_i < my_epoch) { ret = false; - min_epoch = epoch_i; - min_idx = i; - TRANS_LOG(DEBUG, "hit", K(epoch_i), K(i)); - break; + if (require_min) { + if (min_epoch == 0 || epoch_i < min_epoch) { + min_epoch = epoch_i; + min_idx = i; + } + } else { + min_epoch = epoch_i; + min_idx = i; + break; + } } } } @@ -819,11 +829,11 @@ bool ObTransCallbackMgr::check_list_has_min_epoch_(const int my_idx, const int64 // - OB_BLOCK_FROZEN: next to logging callback's memtable was logging blocked int ObTransCallbackMgr::get_log_guard(const transaction::ObTxSEQ &write_seq, ObCallbackListLogGuard &lock_guard, - int &list_idx) + int &ret_list_idx) { int ret = OB_SUCCESS; RDLockGuard guard(rwlock_); - list_idx = (write_seq.get_branch() % MAX_CALLBACK_LIST_COUNT); + int list_idx = (write_seq.get_branch() % MAX_CALLBACK_LIST_COUNT); ObTxCallbackList *list = get_callback_list_(list_idx, true); if (OB_ISNULL(list)) { ret = OB_ENTRY_NOT_EXIST; @@ -831,7 +841,7 @@ int ObTransCallbackMgr::get_log_guard(const transaction::ObTxSEQ &write_seq, int64_t my_epoch = list->get_log_epoch(); int64_t min_epoch = 0; int min_epoch_idx =-1; - bool flush_min_epoch_list = false; + bool pending_too_large = false; common::ObByteLock *log_lock = NULL; if (my_epoch == INT64_MAX) { ret = OB_ENTRY_NOT_EXIST; @@ -839,24 +849,23 @@ int ObTransCallbackMgr::get_log_guard(const transaction::ObTxSEQ &write_seq, ret = OB_BLOCK_FROZEN; } else if (OB_ISNULL(log_lock = list->try_lock_log())) { ret = OB_NEED_RETRY; - } else if (!check_list_has_min_epoch_(list_idx, my_epoch, min_epoch, min_epoch_idx)) { + // if current list pending size too large, try to submit the min_epoch list + } else if (FALSE_IT(pending_too_large = list->pending_log_too_large(GCONF._private_buffer_size * 10))) { + } else if (!check_list_has_min_epoch_(list_idx, my_epoch, pending_too_large, min_epoch, min_epoch_idx)) { ret = OB_EAGAIN; ObIMemtable *to_log_memtable = list->get_log_cursor()->get_memtable(); if (TC_REACH_TIME_INTERVAL(1_s)) { TRANS_LOG(WARN, "has smaller epoch unlogged", KPC(this), K(list_idx), K(write_seq), K(my_epoch), K(min_epoch), K(min_epoch_idx), KP(to_log_memtable)); } - // if current list pending size too large, try to submit the min_epoch list - if (OB_UNLIKELY(list->pending_log_too_large(GCONF._private_buffer_size * 10))) { - flush_min_epoch_list = true; - } } else { + ret_list_idx = list_idx; lock_guard.set(log_lock); } if (OB_FAIL(ret) && log_lock) { log_lock->unlock(); } - if (OB_UNLIKELY(flush_min_epoch_list)) { + if (OB_EAGAIN == ret && OB_UNLIKELY(pending_too_large)) { ObTxCallbackList *min_epoch_list = get_callback_list_(min_epoch_idx, false); if (OB_ISNULL(log_lock = min_epoch_list->try_lock_log())) { // lock conflict, acquired by others @@ -864,9 +873,8 @@ int ObTransCallbackMgr::get_log_guard(const transaction::ObTxSEQ &write_seq, if (REACH_TIME_INTERVAL(1_s)) { TRANS_LOG(INFO, "decide to flush callback list with min_epoch", KPC(this), K(min_epoch), K(min_epoch_idx)); } - list_idx = min_epoch_idx; + ret_list_idx = min_epoch_idx; lock_guard.set(log_lock); - ret = OB_SUCCESS; } } } @@ -936,7 +944,7 @@ int ObTransCallbackMgr::fill_from_one_list(ObTxFillRedoCtx &ctx, // - OB_SUCCESS: all callbacks from all callback-list filled // - OB_EAGAIN: due to parallel logging, must return to flush this list and retry others // - OB_BLOCK_FROZEN: stopped due to can not logging waiting memtable frozen -// - OB_ITER_END: stopped due to has smaller write_epoch whos log isn't submitted +// - OB_ITER_END: stopped due to has smaller write_epoch whose log hasn't submitted // - OB_BUF_NOT_ENOUGH: stopped due to buffer can not hold current node // return policy: // - if parallel_logging, return if need switch to next list and has diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h index 8a5589ca9b..7e4caced09 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h @@ -365,7 +365,11 @@ private: } int fill_from_one_list(ObTxFillRedoCtx &ctx, const int list_idx, ObITxFillRedoFunctor &func); int fill_from_all_list(ObTxFillRedoCtx &ctx, ObITxFillRedoFunctor &func); - bool check_list_has_min_epoch_(const int my_idx, const int64_t my_epoch, int64_t &min_epoch, int &min_idx); + bool check_list_has_min_epoch_(const int my_idx, + const int64_t my_epoch, + const bool require_min, + int64_t &min_epoch, + int &min_idx); void calc_list_fill_log_epoch_(const int list_idx, int64_t &epoch_from, int64_t &epoch_to); void calc_next_to_fill_log_info_(const ObIArray &arr, int &index, diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 9c96d2e9bb..4480da9375 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -1905,69 +1905,50 @@ int ObPartTransCtx::submit_redo_after_write(const bool force, const ObTxSEQ &wri #define LOAD_PARALLEL_LOGGING parallel_logging = exec_info_.serial_final_scn_.atomic_load().is_valid() LOAD_PARALLEL_LOGGING; if (!parallel_logging) { - if (force) { - // For the frozen memtable, we must guarantee - CtxLockGuard guard(lock_); + int submitted_cnt = 0; + if (force || OB_SUCCESS == lock_.try_lock()) { + CtxLockGuard guard(lock_, force /* need lock */); // double check parallel_logging is on LOAD_PARALLEL_LOGGING; if (!parallel_logging) { - ret = serial_submit_redo_after_write_(); - } - } else if (OB_SUCCESS == lock_.try_lock()) { - CtxLockGuard guard(lock_, false); - // double check parallel_logging is on - LOAD_PARALLEL_LOGGING; - if (!parallel_logging) { - ret = serial_submit_redo_after_write_(); + ret = serial_submit_redo_after_write_(submitted_cnt); } } - if (OB_BUF_NOT_ENOUGH == ret) { - // bufer full, try fill after switch to parallel logging + if (submitted_cnt > 0 && OB_EAGAIN == ret) { + // has remains, try fill after switch to parallel logging LOAD_PARALLEL_LOGGING; } } #undef LOAD_PARALLEL_LOGGING tg.click("serial_log"); - if (parallel_logging) { - if (force) { - CtxLockGuard guard(lock_, CtxLockGuard::MODE::REDO_FLUSH_R); - if (OB_SUCC(check_can_submit_redo_()) && !is_committing_()) { - // TODO(handora.qc): Currently, parallel_submit might return - // OB_SUCCESS without having fully submitted all necessary logs, - // resulting in an inability to mark need_resubmit on the freezer. - // Therefore, we rely on the fallback mechanism of freezer_task to - // retry submitting logs to meet the requirement of freezing that - // waiting for all logs to be submitted. - // - // However, we definitely need to ensure in the future that OB_SUCCESS - // is only returned only when the logs are fully submitted. + if (parallel_logging && OB_SUCC(lock_.try_rdlock_flush_redo())) { + if (OB_SUCC(check_can_submit_redo_())) { + if (is_committing_()) { + ret = force ? OB_TRANS_HAS_DECIDED : OB_SUCCESS; + } else { ObTxRedoSubmitter submitter(*this, mt_ctx_); if (OB_FAIL(submitter.parallel_submit(write_seq_no))) { - if (OB_ITER_END == ret || OB_EAGAIN == ret) { + if (!force && (OB_ITER_END == ret // blocked by others, current remains + || OB_NEED_RETRY == ret // acquire lock failed + )) { ret = OB_SUCCESS; } } } - } else if (OB_SUCC(lock_.try_rdlock_flush_redo())) { - if (OB_SUCC(check_can_submit_redo_()) && !is_committing_()) { - ObTxRedoSubmitter submitter(*this, mt_ctx_); - if (OB_FAIL(submitter.parallel_submit(write_seq_no))) { - if (OB_ITER_END == ret || OB_EAGAIN == ret) { - ret = OB_SUCCESS; - } - } - } - lock_.unlock_flush_redo(); } + lock_.unlock_flush_redo(); + } + if (!force && (OB_TRANS_HAS_DECIDED == ret // do committing + || OB_BLOCK_FROZEN == ret // memtable logging blocked + || OB_EAGAIN == ret // partial submitted or submit to log-service fail + )) { + ret = OB_SUCCESS; } - } - if (OB_TRANS_HAS_DECIDED == ret || OB_BLOCK_FROZEN == ret) { - ret = OB_SUCCESS; } return ret; } -int ObPartTransCtx::serial_submit_redo_after_write_() +int ObPartTransCtx::serial_submit_redo_after_write_(int &submitted_cnt) { int ret = OB_SUCCESS; if (OB_SUCC(check_can_submit_redo_())) { @@ -1975,7 +1956,8 @@ int ObPartTransCtx::serial_submit_redo_after_write_() bool should_switch = should_switch_to_parallel_logging_(); ObTxRedoSubmitter submitter(*this, mt_ctx_); ret = submitter.serial_submit(should_switch); - if (should_switch && submitter.get_submitted_cnt() > 0) { + submitted_cnt = submitter.get_submitted_cnt(); + if (should_switch && submitted_cnt > 0) { const share::SCN serial_final_scn = submitter.get_submitted_scn(); int tmp_ret = switch_to_parallel_logging_(serial_final_scn, exec_info_.max_submitted_seq_no_); TRANS_LOG(INFO, "**leader switch to parallel logging**", K(tmp_ret), @@ -9776,7 +9758,9 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param, // promise redo log before move log bool submitted = false; ObTxRedoSubmitter submitter(*this, mt_ctx_); - if (OB_FAIL(submitter.serial_submit(false))) { + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(submitter.serial_submit(false)) && submitter.get_submitted_cnt() <= 0) { + ret = tmp_ret; TRANS_LOG(WARN, "submit redo failed", K(ret), KPC(this)); } else { sub_state_.set_transfer_blocking(); diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 3fb885aee8..307d62e5a7 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -561,7 +561,7 @@ private: void recovery_parallel_logging_(); int check_can_submit_redo_(); void force_no_need_replay_checksum_(const bool parallel_replay, const share::SCN &log_ts); - int serial_submit_redo_after_write_(); + int serial_submit_redo_after_write_(int &submitted_cnt); int submit_big_segment_log_(); int prepare_big_segment_submit_(ObTxLogCb *segment_cb, const share::SCN &base_scn, diff --git a/src/storage/tx/ob_tx_api.cpp b/src/storage/tx/ob_tx_api.cpp index bce0f205fe..3ccf6569f9 100644 --- a/src/storage/tx/ob_tx_api.cpp +++ b/src/storage/tx/ob_tx_api.cpp @@ -1658,7 +1658,7 @@ int ObTransService::ls_rollback_to_savepoint_(const ObTransID &tx_id, if (verify_epoch > 0 && ctx->epoch_ != verify_epoch) { ret = OB_TRANS_CTX_NOT_EXIST; TRANS_LOG(WARN, "current ctx illegal, born epoch not match", K(ret), K(ls), K(tx_id), - K(verify_epoch), KPC(ctx)); + K(verify_epoch), K(ctx_born_epoch), KPC(ctx)); } else if(OB_FAIL(ls_sync_rollback_savepoint__(ctx, savepoint, op_sn, diff --git a/src/storage/tx/ob_tx_redo_submitter.h b/src/storage/tx/ob_tx_redo_submitter.h index 4cc5f1a610..2b7d4ed42c 100644 --- a/src/storage/tx/ob_tx_redo_submitter.h +++ b/src/storage/tx/ob_tx_redo_submitter.h @@ -136,6 +136,7 @@ ObTxRedoSubmitter::~ObTxRedoSubmitter() int ObTxRedoSubmitter::parallel_submit(const ObTxSEQ &write_seq_no) { int ret = OB_SUCCESS; + int save_ret = OB_SUCCESS; from_all_list_ = false; flush_all_ = false; write_seq_no_ = write_seq_no; @@ -146,20 +147,23 @@ int ObTxRedoSubmitter::parallel_submit(const ObTxSEQ &write_seq_no) submit_if_not_full_ = true; bool do_submit = false; memtable::ObCallbackListLogGuard log_lock_guard; + submit_cb_list_idx_ = -1; if (OB_FAIL(mt_ctx_.get_log_guard(write_seq_no, log_lock_guard, submit_cb_list_idx_))) { if (OB_NEED_RETRY == ret) { - // give up, lock conflict - ret = OB_SUCCESS; + // lock conflict } else if (OB_BLOCK_FROZEN == ret) { // memtable is logging blocked } else if (OB_EAGAIN == ret) { // others need flush firstly - // TODO: try flush others out and retry if (TC_REACH_TIME_INTERVAL(5_s)) { TRANS_LOG(WARN, "blocked by other list has smaller wirte epoch unlogged", K(write_seq_no), K(tx_ctx_.get_trans_id())); } - ret = OB_SUCCESS; + if (submit_cb_list_idx_ >= 0) { + // try to submit blocking list + save_ret = OB_EAGAIN; + do_submit = true; + } } else if (OB_ENTRY_NOT_EXIST == ret) { // no callback to log ret = OB_SUCCESS; @@ -171,6 +175,7 @@ int ObTxRedoSubmitter::parallel_submit(const ObTxSEQ &write_seq_no) } if (do_submit) { ret = _submit_redo_pipeline_(false); + ret = OB_SUCCESS == ret ? save_ret : ret; } return ret; } @@ -263,7 +268,7 @@ int ObTxRedoSubmitter::_submit_redo_pipeline_(const bool display_blocked_info) #endif break; } else { - fill_ret = OB_SUCCESS; + fill_ret = OB_EAGAIN; } } else if (OB_BLOCK_FROZEN == fill_ret) { if (is_parallel_logging) { diff --git a/unittest/storage/tx/test_redo_submitter.cpp b/unittest/storage/tx/test_redo_submitter.cpp index ff31e48ae5..ad3e5e353d 100644 --- a/unittest/storage/tx/test_redo_submitter.cpp +++ b/unittest/storage/tx/test_redo_submitter.cpp @@ -198,7 +198,7 @@ TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_BUF_NOT_ENOUGH) .Times(1) .WillOnce(Invoke(succ_submit_redo_log_out)); ObTxRedoSubmitter submitter(tx_ctx, mt_ctx); - EXPECT_EQ(OB_SUCCESS, submitter.serial_submit(false)); + EXPECT_EQ(OB_EAGAIN, submitter.serial_submit(false)); } }