From a2269414c1dbddf243d8ae1655cab413889ac060 Mon Sep 17 00:00:00 2001 From: Handora Date: Tue, 30 Apr 2024 18:35:30 +0000 Subject: [PATCH] [BUG] add fullback mechanism to submit log for freezing --- src/storage/ls/ob_freezer.cpp | 30 +++++++++++++++++---- src/storage/ls/ob_freezer.h | 3 ++- src/storage/memtable/mvcc/ob_mvcc_ctx.cpp | 19 ++++++++++--- src/storage/tx/ob_trans_part_ctx.cpp | 33 ++++++++++++++++++++--- 4 files changed, 73 insertions(+), 12 deletions(-) diff --git a/src/storage/ls/ob_freezer.cpp b/src/storage/ls/ob_freezer.cpp index 3f030aa4ba..e7a1246085 100644 --- a/src/storage/ls/ob_freezer.cpp +++ b/src/storage/ls/ob_freezer.cpp @@ -1081,11 +1081,15 @@ int ObFreezer::wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tab { share::ObLSID ls_id = get_ls_id(); const int64_t start = ObTimeUtility::current_time(); + int64_t last_submit_log_time = start; int ret = OB_SUCCESS; bool ready_for_flush = false; do { - if (OB_FAIL(try_wait_memtable_ready_for_flush_with_ls_lock(tablet_memtable, ready_for_flush, start))) { + if (OB_FAIL(try_wait_memtable_ready_for_flush_with_ls_lock(tablet_memtable, + ready_for_flush, + start, + last_submit_log_time))) { TRANS_LOG(WARN, "[Freezer] memtable is not ready_for_flush", K(ret)); } } while (OB_SUCC(ret) && !ready_for_flush); @@ -1095,7 +1099,8 @@ int ObFreezer::wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tab int ObFreezer::try_wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tablet_memtable, bool &ready_for_flush, - const int64_t start) + const int64_t start, + int64_t &last_submit_log_time) { int ret = OB_SUCCESS; int64_t read_lock = LSLOCKALL; @@ -1109,7 +1114,12 @@ int ObFreezer::try_wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable } else if (FALSE_IT(ready_for_flush = tablet_memtable->ready_for_flush())) { } else if (!ready_for_flush) { if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) { - if (need_resubmit_log()) { + if (need_resubmit_log() || + // In order to prevent the txn has already passed the try_submit test + // while failing to submit some logs due to an unexpected bug, we need + // retry to submit the log to go around the above case + (ObTimeUtility::current_time() - last_submit_log_time >= 1_min)) { + last_submit_log_time = ObTimeUtility::current_time(); submit_log_for_freeze(true/*tablet freeze*/, false/*try*/); TRANS_LOG(INFO, "[Freezer] resubmit log", K(ret)); } @@ -1278,6 +1288,7 @@ int ObFreezer::batch_tablet_freeze_task(ObTableHandleArray tables_array) int ret = OB_SUCCESS; share::ObLSID ls_id = get_ls_id(); const int64_t start = ObTimeUtility::current_time(); + int64_t last_submit_log_time = start; while (OB_SUCC(ret) && tables_array.count() > 0) { for (int i = 0; OB_SUCC(ret) && i < tables_array.count(); ++i) { @@ -1289,7 +1300,10 @@ int ObFreezer::batch_tablet_freeze_task(ObTableHandleArray tables_array) TRANS_LOG(WARN, "memtable cannot be null", K(ret), K(ls_id)); } else if (OB_FAIL(handle.get_tablet_memtable(tablet_memtable))) { LOG_WARN("fail to get memtable", K(ret)); - } else if (OB_FAIL(try_wait_memtable_ready_for_flush_with_ls_lock(tablet_memtable, ready_for_flush, start))) { + } else if (OB_FAIL(try_wait_memtable_ready_for_flush_with_ls_lock(tablet_memtable, + ready_for_flush, + start, + last_submit_log_time))) { TRANS_LOG(WARN, "[Freezer] fail to wait memtable ready_for_flush", K(ret), K(ls_id)); } else if (!ready_for_flush) { } else if (OB_FAIL(finish_freeze_with_ls_lock(tablet_memtable))) { @@ -1530,11 +1544,17 @@ void ObFreezer::wait_memtable_ready_for_flush(ObITabletMemtable *tablet_memtable { share::ObLSID ls_id = get_ls_id(); const int64_t start = ObTimeUtility::current_time(); + int64_t last_submit_log_time = start; int ret = OB_SUCCESS; while (!tablet_memtable->ready_for_flush()) { if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) { - if (need_resubmit_log()) { + if (need_resubmit_log() || + // In order to prevent the txn has already passed the try_submit test + // while failing to submit some logs due to an unexpected bug, we need + // retry to submit the log to go around the above case + (ObTimeUtility::current_time() - last_submit_log_time >= 1_min)) { + last_submit_log_time = ObTimeUtility::current_time(); submit_log_for_freeze(true/*tablet freeze*/, false/*try*/); TRANS_LOG(INFO, "[Freezer] resubmit log for tablet_freeze", K(ls_id)); } diff --git a/src/storage/ls/ob_freezer.h b/src/storage/ls/ob_freezer.h index 7ccfb00113..7de9e84d68 100644 --- a/src/storage/ls/ob_freezer.h +++ b/src/storage/ls/ob_freezer.h @@ -308,7 +308,8 @@ private: int finish_freeze_with_ls_lock(ObITabletMemtable *tablet_memtable); int try_wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tablet_memtable, bool &ready_for_flush, - const int64_t start); + const int64_t start, + int64_t &last_submit_log_time); void submit_checkpoint_task(); private: // flag whether the logsteram is freezing diff --git a/src/storage/memtable/mvcc/ob_mvcc_ctx.cpp b/src/storage/memtable/mvcc/ob_mvcc_ctx.cpp index 21f079848b..7c24813f81 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_ctx.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_ctx.cpp @@ -293,9 +293,22 @@ ObMvccWriteGuard::~ObMvccWriteGuard() int ret = OB_SUCCESS; transaction::ObPartTransCtx *tx_ctx = ctx_->get_trans_ctx(); ctx_->write_done(); - if (write_ret_ && OB_SUCCESS == *write_ret_ - && OB_NOT_NULL(memtable_) - && try_flush_redo_) { + + if (OB_NOT_NULL(memtable_) + // Case1: The memtable is frozen, therefore we must submit the logs + // (forcely), otherwise, the data written concurrently may not be + // scanned by the background freezing worker, leading to missed data + // submissions. + && (memtable_->is_frozen_memtable() + // Case2: The data writes are guaranteed not to rollback and are not in + // the middle of write(such as the main table write of the insert + // ignore), allowing us to trigger immediate logging. (Especially, it + // should be noted that allowing immediate logging at any time could + // lead to the bad case that lots of rollback logs will be generated in + // insert ignore scenarios.) + || (write_ret_ + && OB_SUCCESS == *write_ret_ + && try_flush_redo_))) { bool is_freeze = memtable_->is_frozen_memtable(); ret = tx_ctx->submit_redo_after_write(is_freeze/*force*/, write_seq_no_); if (OB_FAIL(ret)) { diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index c53cb2f29e..c73ce4aa0c 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -1889,7 +1889,7 @@ int ObPartTransCtx::submit_redo_log_for_freeze(const uint32_t freeze_clock) int ObPartTransCtx::submit_redo_after_write(const bool force, const ObTxSEQ &write_seq_no) { int ret = OB_SUCCESS; - TRANS_LOG(TRACE, "", K(force), K(write_seq_no), K_(trans_id), K_(ls_id), + TRANS_LOG(TRACE, "submit_redo_after_write", K(force), K(write_seq_no), K_(trans_id), K_(ls_id), K(mt_ctx_.get_pending_log_size())); ObTimeGuard tg("submit_redo_for_after_write", 100000); if (force || mt_ctx_.pending_log_size_too_large(write_seq_no)) { @@ -1897,7 +1897,15 @@ int ObPartTransCtx::submit_redo_after_write(const bool force, const ObTxSEQ &wri #define LOAD_PARALLEL_LOGGING parallel_logging = exec_info_.serial_final_scn_.atomic_load().is_valid() LOAD_PARALLEL_LOGGING; if (!parallel_logging) { - if (OB_SUCCESS == lock_.try_lock()) { + if (force) { + // For the frozen memtable, we must guarantee + CtxLockGuard guard(lock_); + // double check parallel_logging is on + LOAD_PARALLEL_LOGGING; + if (!parallel_logging) { + ret = serial_submit_redo_after_write_(); + } + } else if (OB_SUCCESS == lock_.try_lock()) { CtxLockGuard guard(lock_, false); // double check parallel_logging is on LOAD_PARALLEL_LOGGING; @@ -1913,7 +1921,26 @@ int ObPartTransCtx::submit_redo_after_write(const bool force, const ObTxSEQ &wri #undef LOAD_PARALLEL_LOGGING tg.click("serial_log"); if (parallel_logging) { - if (OB_SUCC(lock_.try_rdlock_flush_redo())) { + if (force) { + CtxLockGuard guard(lock_, CtxLockGuard::MODE::REDO_FLUSH_R); + if (OB_SUCC(check_can_submit_redo_()) && !is_committing_()) { + // TODO(handora.qc): Currently, parallel_submit might return + // OB_SUCCESS without having fully submitted all necessary logs, + // resulting in an inability to mark need_resubmit on the freezer. + // Therefore, we rely on the fallback mechanism of freezer_task to + // retry submitting logs to meet the requirement of freezing that + // waiting for all logs to be submitted. + // + // However, we definitely need to ensure in the future that OB_SUCCESS + // is only returned only when the logs are fully submitted. + ObTxRedoSubmitter submitter(*this, mt_ctx_); + if (OB_FAIL(submitter.parallel_submit(write_seq_no))) { + if (OB_ITER_END == ret || OB_EAGAIN == ret) { + ret = OB_SUCCESS; + } + } + } + } else if (OB_SUCC(lock_.try_rdlock_flush_redo())) { if (OB_SUCC(check_can_submit_redo_()) && !is_committing_()) { ObTxRedoSubmitter submitter(*this, mt_ctx_); if (OB_FAIL(submitter.parallel_submit(write_seq_no))) {