diff --git a/mittest/simple_server/test_ob_minor_freeze.cpp b/mittest/simple_server/test_ob_minor_freeze.cpp index 2ac3f873d..74daeb1c0 100644 --- a/mittest/simple_server/test_ob_minor_freeze.cpp +++ b/mittest/simple_server/test_ob_minor_freeze.cpp @@ -89,14 +89,14 @@ namespace checkpoint } // namespace storage namespace transaction { -int ObPartTransCtx::submit_redo_log_for_freeze() +int ObPartTransCtx::submit_redo_log_for_freeze(const uint32_t freeze_clock) { int ret = OB_SUCCESS; int64_t sleep_time = rand() % SLEEP_TIME; ob_usleep(sleep_time); CtxLockGuard guard(lock_); bool submitted = false; - ret = submit_redo_log_for_freeze_(submitted); + ret = submit_redo_log_for_freeze_(submitted, freeze_clock); if (sleep_time > 50 && sleep_time < 90) { ret = OB_TX_NOLOGCB; } else if (sleep_time >= 90) { diff --git a/src/storage/ls/ob_freezer.cpp b/src/storage/ls/ob_freezer.cpp index 31416b192..866708443 100644 --- a/src/storage/ls/ob_freezer.cpp +++ b/src/storage/ls/ob_freezer.cpp @@ -219,13 +219,13 @@ int ObFreezerStat::get_diagnose_info(ObStringHolder &diagnose_info) return ret; } -void ObFreezerStat::set_freeze_clock(const int64_t freeze_clock) +void ObFreezerStat::set_freeze_clock(const uint32_t freeze_clock) { ObSpinLockGuard guard(lock_); freeze_clock_ = freeze_clock; } -int64_t ObFreezerStat::get_freeze_clock() +uint32_t ObFreezerStat::get_freeze_clock() { ObSpinLockGuard guard(lock_); return freeze_clock_; @@ -335,7 +335,7 @@ int ObFreezerStat::deep_copy_to(ObFreezerStat &other) return ret; } -int ObFreezerStat::begin_set_freeze_stat(const int64_t freeze_clock, +int ObFreezerStat::begin_set_freeze_stat(const uint32_t freeze_clock, const int64_t start_time, const int state, const share::SCN &freeze_snapshot_version, @@ -521,11 +521,11 @@ int ObFreezer::logstream_freeze(ObFuture *result) return ret; } -void ObFreezer::try_submit_log_for_freeze_() +void ObFreezer::try_submit_log_for_freeze_(const bool is_tablet_freeze) { int ret = OB_SUCCESS; - if (OB_FAIL(submit_log_for_freeze(true/*try*/))) { + if (OB_FAIL(submit_log_for_freeze(is_tablet_freeze, true/*try*/))) { TRANS_LOG(WARN, "fail to try submit log for freeze", K(ret)); set_need_resubmit_log(true); } @@ -538,7 +538,7 @@ int ObFreezer::inner_logstream_freeze(ObFuture *result) ObTableHandleV2 handle; if (FALSE_IT(submit_checkpoint_task())) { - } else if (FALSE_IT(try_submit_log_for_freeze_())) { + } else if (FALSE_IT(try_submit_log_for_freeze_(false/*tablet freeze*/))) { } else if (OB_FAIL(submit_freeze_task(true/*is_ls_freeze*/, result, handle))) { TRANS_LOG(ERROR, "failed to submit ls_freeze task", K(ret), K(ls_id)); stat_.add_diagnose_info("fail to submit ls_freeze_task"); @@ -587,7 +587,7 @@ int ObFreezer::ls_freeze_task() ObLSLockGuard lock_ls(ls_, ls_->lock_, read_lock, write_lock); if (OB_FAIL(check_ls_state())) { } else { - submit_log_for_freeze(false/*try*/); + submit_log_for_freeze(false/*tablet freeze*/, false/*try*/); TRANS_LOG(INFO, "[Freezer] resubmit log for ls_freeze", K(ls_id)); } } @@ -706,7 +706,7 @@ int ObFreezer::freeze_normal_tablet_(const ObTabletID &tablet_id, ObFuture TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id)); stat_.add_diagnose_info("fail to set is_tablet_freeze"); } - } else if (FALSE_IT(try_submit_log_for_freeze_())) { + } else if (FALSE_IT(try_submit_log_for_freeze_(true/*tablet freeze*/))) { } else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, result, frozen_memtable_handle))) { TRANS_LOG(WARN, "[Freezer] fail to submit tablet_freeze_task", K(ret), K(ls_id), K(tablet_id)); stat_.add_diagnose_info("fail to submit tablet_freeze_task"); @@ -846,7 +846,7 @@ int ObFreezer::tablet_freeze_with_rewrite_meta(const ObTabletID &tablet_id) TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id)); stat_.add_diagnose_info("fail to set is_tablet_freeze"); } - } else if (FALSE_IT(try_submit_log_for_freeze_())) { + } else if (FALSE_IT(try_submit_log_for_freeze_(true/*tablet freeze*/))) { } else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, nullptr, frozen_memtable_handle))) { TRANS_LOG(WARN, "[Freezer] fail to submit freeze_task", K(ret), K(ls_id), K(tablet_id)); stat_.add_diagnose_info("fail to submit freeze_task"); @@ -932,7 +932,7 @@ int ObFreezer::try_wait_memtable_ready_for_flush_with_ls_lock(memtable::ObMemtab } else if (!ready_for_flush) { if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) { if (need_resubmit_log()) { - submit_log_for_freeze(false/*try*/); + submit_log_for_freeze(true/*tablet freeze*/, false/*try*/); TRANS_LOG(INFO, "[Freezer] resubmit log", K(ret)); } const int64_t cost_time = ObTimeUtility::current_time() - start; @@ -1149,7 +1149,7 @@ int ObFreezer::batch_tablet_freeze_(const ObIArray &tablet_ids, ObFu need_freeze = false; TRANS_LOG(INFO, "[Freezer] no need to freeze batch tablets", K(ret), K(tablet_ids)); stat_.add_diagnose_info("no need to freeze batch tablets"); - } else if (FALSE_IT(try_submit_log_for_freeze_())) { + } else if (FALSE_IT(try_submit_log_for_freeze_(true/*tablet freeze*/))) { } else if (OB_FAIL(submit_batch_tablet_freeze_task(memtable_handles, result))) { TRANS_LOG(WARN, "[Freezer] fail to submit batch_tablet_freeze task", K(ret)); } else { @@ -1265,7 +1265,7 @@ int ObFreezer::handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtabl ret = OB_ERR_UNEXPECTED; } else { memtable::ObMemtable *memtable = static_cast(imemtable); - try_submit_log_for_freeze_(); + try_submit_log_for_freeze_(true/*tablet freeze*/); wait_memtable_ready_for_flush(memtable); if (OB_FAIL(memtable->finish_freeze())) { TRANS_LOG(ERROR, "[Freezer] memtable cannot be flushed", @@ -1294,7 +1294,7 @@ namespace { }; } -int ObFreezer::submit_log_for_freeze(bool is_try) +int ObFreezer::submit_log_for_freeze(const bool is_tablet_freeze, const bool is_try) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; @@ -1309,8 +1309,9 @@ int ObFreezer::submit_log_for_freeze(bool is_try) do { ret = OB_SUCCESS; transaction::ObTransID fail_tx_id; - - if (OB_FAIL(get_ls_tx_svr()->traverse_trans_to_submit_redo_log(fail_tx_id))) { + // because tablet freeze will not inc freeze clock, fake the freeze clock + const uint32_t freeze_clock = is_tablet_freeze ? get_freeze_clock() + 1 : get_freeze_clock(); + if (OB_FAIL(get_ls_tx_svr()->traverse_trans_to_submit_redo_log(fail_tx_id, freeze_clock))) { const int64_t cost_time = ObTimeUtility::current_time() - start; if (cost_time > 1000 * 1000) { if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) { @@ -1424,7 +1425,7 @@ void ObFreezer::wait_memtable_ready_for_flush(memtable::ObMemtable *memtable) while (!memtable->ready_for_flush()) { if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) { if (need_resubmit_log()) { - submit_log_for_freeze(false/*try*/); + submit_log_for_freeze(true/*tablet freeze*/, false/*try*/); TRANS_LOG(INFO, "[Freezer] resubmit log for tablet_freeze", K(ls_id)); } const int64_t cost_time = ObTimeUtility::current_time() - start; diff --git a/src/storage/ls/ob_freezer.h b/src/storage/ls/ob_freezer.h index 9badea0ac..ca7fcfb08 100644 --- a/src/storage/ls/ob_freezer.h +++ b/src/storage/ls/ob_freezer.h @@ -166,8 +166,8 @@ public: bool need_rewrite_meta(); void set_state(int state); int get_state(); - void set_freeze_clock(const int64_t freeze_clock); - int64_t get_freeze_clock(); + void set_freeze_clock(const uint32_t freeze_clock); + uint32_t get_freeze_clock(); void set_start_time(int64_t start_time); int64_t get_start_time(); void set_end_time(int64_t end_time); @@ -177,7 +177,7 @@ public: void set_freeze_snapshot_version(const share::SCN &freeze_snapshot_version); share::SCN get_freeze_snapshot_version(); int deep_copy_to(ObFreezerStat &other); - int begin_set_freeze_stat(const int64_t freeze_clock, + int begin_set_freeze_stat(const uint32_t freeze_clock, const int64_t start_time, const int state, const share::SCN &freeze_snapshot_version, @@ -191,7 +191,7 @@ private: ObTabletID tablet_id_; bool need_rewrite_meta_; int state_; - int64_t freeze_clock_; + uint32_t freeze_clock_; int64_t start_time_; int64_t end_time_; int ret_code_; @@ -298,8 +298,8 @@ private: /* inner subfunctions for freeze process */ int inner_logstream_freeze(ObFuture *result); - int submit_log_for_freeze(bool is_try); - void try_submit_log_for_freeze_(); + int submit_log_for_freeze(const bool is_tablet_freeze, const bool is_try); + void try_submit_log_for_freeze_(const bool is_tablet_freeze); int ls_freeze_task(); int tablet_freeze_task(ObTableHandleV2 handle); int submit_freeze_task(const bool is_ls_freeze, ObFuture *result, ObTableHandleV2 &handle); diff --git a/src/storage/ls/ob_ls_tx_service.cpp b/src/storage/ls/ob_ls_tx_service.cpp index fa6a12e6e..21f084bf3 100644 --- a/src/storage/ls/ob_ls_tx_service.cpp +++ b/src/storage/ls/ob_ls_tx_service.cpp @@ -429,9 +429,9 @@ int ObLSTxService::replay(const void *buffer, return ret; } -int ObLSTxService::traverse_trans_to_submit_redo_log(ObTransID &fail_tx_id) +int ObLSTxService::traverse_trans_to_submit_redo_log(ObTransID &fail_tx_id, const uint32_t freeze_clock) { - return mgr_->traverse_tx_to_submit_redo_log(fail_tx_id); + return mgr_->traverse_tx_to_submit_redo_log(fail_tx_id, freeze_clock); } int ObLSTxService::traverse_trans_to_submit_next_log() { return mgr_->traverse_tx_to_submit_next_log(); } diff --git a/src/storage/ls/ob_ls_tx_service.h b/src/storage/ls/ob_ls_tx_service.h index b48061c38..b6e9edc5a 100644 --- a/src/storage/ls/ob_ls_tx_service.h +++ b/src/storage/ls/ob_ls_tx_service.h @@ -111,7 +111,8 @@ public: const transaction::ObTxSEQ &spec_seq_no = transaction::ObTxSEQ::INVL()) const; int revert_store_ctx(storage::ObStoreCtx &store_ctx) const; // Freeze process needs to traverse trans ctx to submit redo log - int traverse_trans_to_submit_redo_log(transaction::ObTransID &fail_tx_id); + int traverse_trans_to_submit_redo_log(transaction::ObTransID &fail_tx_id, + const uint32_t freeze_clock = UINT32_MAX); // submit next log when all trx in frozen memtable have submitted log int traverse_trans_to_submit_next_log(); // check schduler status for gc diff --git a/src/storage/memtable/mvcc/ob_mvcc.h b/src/storage/memtable/mvcc/ob_mvcc.h index 06a9a8ff3..1c35781f1 100644 --- a/src/storage/memtable/mvcc/ob_mvcc.h +++ b/src/storage/memtable/mvcc/ob_mvcc.h @@ -59,6 +59,7 @@ public: virtual bool on_memtable(const ObIMemtable * const memtable) { UNUSED(memtable); return false; } virtual ObIMemtable* get_memtable() const { return nullptr; } + virtual uint32_t get_freeze_clock() const { return 0; } virtual transaction::ObTxSEQ get_seq_no() const { return transaction::ObTxSEQ::INVL(); } virtual int del() { return remove(); } virtual bool is_need_free() const { return true; } @@ -71,6 +72,7 @@ public: void after_append_cb(const bool is_replay); bool need_submit_log() const { return need_submit_log_; } virtual bool is_logging_blocked() const { return false; } + virtual bool on_frozen_memtable(ObIMemtable *&last_frozen_mt) const { return true; } int log_submitted_cb(const share::SCN scn, ObIMemtable *&last_mt); int log_sync_fail_cb(const share::SCN scn); // interface should be implement by subclasses diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp index 1d2c21661..68d87eab2 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.cpp @@ -1512,6 +1512,16 @@ bool ObMvccRowCallback::is_logging_blocked() const return is_blocked; } +uint32_t ObMvccRowCallback::get_freeze_clock() const +{ + if (OB_ISNULL(memtable_)) { + TRANS_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "mvcc row memtable is NULL", KPC(this)); + return 0; + } else { + return memtable_->get_freeze_clock(); + } +} + int ObMvccRowCallback::clean() { unlink_trans_node(); diff --git a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h index 65fd1f27c..c3e1ac25b 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h +++ b/src/storage/memtable/mvcc/ob_mvcc_trans_ctx.h @@ -486,6 +486,7 @@ public: const ObMemtableKey *get_key() { return &key_; } int get_memtable_key(uint64_t &table_id, common::ObStoreRowkey &rowkey) const; bool is_logging_blocked() const override; + uint32_t get_freeze_clock() const override; transaction::ObTxSEQ get_seq_no() const { return seq_no_; } int get_trans_id(transaction::ObTransID &trans_id) const; int get_cluster_version(uint64_t &cluster_version) const override; diff --git a/src/storage/memtable/mvcc/ob_tx_callback_list.h b/src/storage/memtable/mvcc/ob_tx_callback_list.h index 204f94646..2365666b3 100644 --- a/src/storage/memtable/mvcc/ob_tx_callback_list.h +++ b/src/storage/memtable/mvcc/ob_tx_callback_list.h @@ -194,6 +194,7 @@ public: { return ATOMIC_LOAD(&data_size_) - ATOMIC_LOAD(&logged_data_size_) > limit; } + // *NOTICE* this _only_ account MvccRowCallback on memtable bool has_pending_log() const { return ATOMIC_LOAD(&data_size_) - ATOMIC_LOAD(&logged_data_size_) > 0; } diff --git a/src/storage/memtable/ob_memtable_interface.h b/src/storage/memtable/ob_memtable_interface.h index 909e9a8fe..814be4529 100644 --- a/src/storage/memtable/ob_memtable_interface.h +++ b/src/storage/memtable/ob_memtable_interface.h @@ -148,7 +148,6 @@ public: storage::ObTableAccessContext &context, const blocksstable::ObDatumRowkey &rowkey, blocksstable::ObDatumRow &row) = 0; - virtual int64_t get_frozen_trans_version() { return 0; } virtual int major_freeze(const common::ObVersion &version) { UNUSED(version); return common::OB_SUCCESS; } diff --git a/src/storage/memtable/ob_redo_log_generator.cpp b/src/storage/memtable/ob_redo_log_generator.cpp index 62d57d6fb..1caf1f246 100644 --- a/src/storage/memtable/ob_redo_log_generator.cpp +++ b/src/storage/memtable/ob_redo_log_generator.cpp @@ -105,6 +105,9 @@ public: } else if (iter->is_logging_blocked()) { ret = OB_BLOCK_FROZEN; ctx_.last_log_blocked_memtable_ = static_cast(iter->get_memtable()); + } else if (OB_UNLIKELY(iter->get_freeze_clock() >= ctx_.freeze_clock_)) { + // when flush redo for frozen memtable, if memtable is active, should stop + ret = OB_BLOCK_FROZEN; } else { bool fake_fill = false; if (MutatorType::MUTATOR_ROW == iter->get_mutator_type()) { diff --git a/src/storage/memtable/ob_redo_log_generator.h b/src/storage/memtable/ob_redo_log_generator.h index 4259b54ef..555b6d793 100644 --- a/src/storage/memtable/ob_redo_log_generator.h +++ b/src/storage/memtable/ob_redo_log_generator.h @@ -82,6 +82,7 @@ struct ObTxFillRedoCtx write_seq_no_(), skip_lock_node_(false), all_list_(false), + freeze_clock_(UINT32_MAX), list_log_epoch_arr_(), cur_epoch_(0), next_epoch_(0), @@ -105,6 +106,7 @@ struct ObTxFillRedoCtx transaction::ObTxSEQ write_seq_no_; // to select callback list in parallel logging bool skip_lock_node_; // whether skip fill lock node bool all_list_; // whether to fill all callback-list + uint32_t freeze_clock_; // memtables before and equals it will be flushed ObSEArray list_log_epoch_arr_; // record each list's next log epoch int64_t cur_epoch_; // current filling epoch int64_t next_epoch_; // next epoch of list, used to update list_log_epoch_arr_ @@ -125,10 +127,24 @@ struct ObTxFillRedoCtx public: bool is_empty() const { return fill_count_ == 0; } bool not_empty() const { return fill_count_ > 0; } - TO_STRING_KV(K_(tx_id), K_(write_seq_no), K_(all_list), K_(cur_epoch), K_(next_epoch), - K_(epoch_from), K_(epoch_to), K_(fill_count), K_(fill_time), - KPC_(callback_scope), K_(skip_lock_node), K_(is_all_filled), K_(list_idx), - K_(list_log_epoch_arr), KP_(last_log_blocked_memtable), K_(buf_len), K_(buf_pos)); + TO_STRING_KV(K_(tx_id), + K_(write_seq_no), + K_(all_list), + K_(freeze_clock), + K_(cur_epoch), + K_(next_epoch), + K_(epoch_from), + K_(epoch_to), + K_(fill_count), + K_(fill_time), + KPC_(callback_scope), + K_(skip_lock_node), + K_(is_all_filled), + K_(list_idx), + K_(list_log_epoch_arr), + KP_(last_log_blocked_memtable), + K_(buf_len), + K_(buf_pos)); }; class ObCallbackListLogGuard diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp index d81d892ea..bf891e606 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp @@ -1335,11 +1335,11 @@ int ObLSTxCtxMgr::del_tx_ctx(ObTransCtx *ctx) return ret; } -int ObLSTxCtxMgr::traverse_tx_to_submit_redo_log(ObTransID &fail_tx_id) +int ObLSTxCtxMgr::traverse_tx_to_submit_redo_log(ObTransID &fail_tx_id, const uint32_t freeze_clock) { int ret = OB_SUCCESS; RLockGuard guard(rwlock_); - ObTxSubmitLogFunctor fn(ObTxSubmitLogFunctor::SUBMIT_REDO_LOG); + ObTxSubmitLogFunctor fn(ObTxSubmitLogFunctor::SUBMIT_REDO_LOG, freeze_clock); if (!is_follower_() && OB_FAIL(ls_tx_ctx_map_.for_each(fn))) { if (OB_SUCCESS != fn.get_result()) { // get real ret code diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.h b/src/storage/tx/ob_trans_ctx_mgr_v4.h index cc4e5c997..a6a7729ff 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.h +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.h @@ -269,7 +269,8 @@ public: int del_tx_ctx(ObTransCtx *ctx); // Freeze process needs to traverse TxCtx to submit log - int traverse_tx_to_submit_redo_log(ObTransID &fail_tx_id); + // @param[in] freeze_clock, the freeze clock after which will not be traversaled + int traverse_tx_to_submit_redo_log(ObTransID &fail_tx_id, const uint32_t freeze_clock = UINT32_MAX); int traverse_tx_to_submit_next_log(); // Get the min prepare version of transaction module of current observer for slave read diff --git a/src/storage/tx/ob_trans_functor.h b/src/storage/tx/ob_trans_functor.h index fba244226..7d135885d 100644 --- a/src/storage/tx/ob_trans_functor.h +++ b/src/storage/tx/ob_trans_functor.h @@ -1292,8 +1292,8 @@ private: class ObTxSubmitLogFunctor { public: - explicit ObTxSubmitLogFunctor(const int action) - : action_(action), result_(common::OB_SUCCESS), fail_tx_id_() + explicit ObTxSubmitLogFunctor(const int action, const uint32_t freeze_clock = UINT32_MAX) + : action_(action), freeze_clock_(freeze_clock), result_(common::OB_SUCCESS), fail_tx_id_() { SET_EXPIRED_LIMIT(100 * 1000 /*100ms*/, 3 * 1000 * 1000 /*3s*/); } @@ -1312,7 +1312,7 @@ public: ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "invalid argument", K(ret), K(tx_id), "ctx", OB_P(tx_ctx)); } else if (ObTxSubmitLogFunctor::SUBMIT_REDO_LOG == action_) { - if (OB_FAIL(tx_ctx->submit_redo_log_for_freeze())) { + if (OB_FAIL(tx_ctx->submit_redo_log_for_freeze(freeze_clock_))) { TRANS_LOG(WARN, "failed to submit redo log", K(ret), K(tx_id)); } } else if (ObTxSubmitLogFunctor::SUBMIT_NEXT_LOG == action_) { @@ -1337,6 +1337,7 @@ public: private: int action_; + uint32_t freeze_clock_; int result_; ObTransID fail_tx_id_; }; diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index 9fc5b8df5..86cb539b6 100644 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -1825,10 +1825,9 @@ int ObPartTransCtx::remove_callback_for_uncommited_txn( } // the semantic of submit redo for freeze is -// should flush all redos which can be flushed -// otherwise, need return some error to caller -// to indicate need retry -int ObPartTransCtx::submit_redo_log_for_freeze() +// should flush all redos bellow specified freeze_clock (inclusive) +// otherwise, need return some error to caller to indicate need retry +int ObPartTransCtx::submit_redo_log_for_freeze(const uint32_t freeze_clock) { int ret = OB_SUCCESS; TRANS_LOG(TRACE, "", K_(trans_id), K_(ls_id)); @@ -1838,7 +1837,7 @@ int ObPartTransCtx::submit_redo_log_for_freeze() if (need_submit) { CtxLockGuard guard(lock_); tg.click(); - ret = submit_redo_log_for_freeze_(submitted); + ret = submit_redo_log_for_freeze_(submitted, freeze_clock); tg.click(); if (submitted) { REC_TRANS_TRACE_EXT2(tlog_, submit_log_for_freeze, OB_Y(ret), @@ -1905,7 +1904,7 @@ int ObPartTransCtx::serial_submit_redo_after_write_() int64_t before_submit_pending_size = mt_ctx_.get_pending_log_size(); bool should_switch = should_switch_to_parallel_logging_(); ObTxRedoSubmitter submitter(*this, mt_ctx_); - ret = submitter.submit(false /*flush all log*/, should_switch, false /*display blocked info*/); + ret = submitter.serial_submit(should_switch); if (should_switch && submitter.get_submitted_cnt() > 0) { const share::SCN serial_final_scn = submitter.get_submitted_scn(); switch_to_parallel_logging_(serial_final_scn); @@ -1966,13 +1965,13 @@ int ObPartTransCtx::prepare_for_submit_redo(ObTxLogCb *&log_cb, return ret; } -int ObPartTransCtx::submit_redo_log_for_freeze_(bool &submitted) +int ObPartTransCtx::submit_redo_log_for_freeze_(bool &submitted, const uint32_t freeze_clock) { int ret = OB_SUCCESS; ATOMIC_STORE(&is_submitting_redo_log_for_freeze_, true); if (OB_SUCC(check_can_submit_redo_())) { ObTxRedoSubmitter submitter(*this, mt_ctx_); - if (OB_FAIL(submitter.submit(true/*flush all*/, false/*final serial*/, true /*display blocked info*/))) { + if (OB_FAIL(submitter.submit_for_freeze(freeze_clock, true /*display blocked info*/))) { if (OB_BLOCK_FROZEN != ret) { TRANS_LOG(WARN, "fail to submit redo log for freeze", K(ret)); // for some error, txn will be aborted immediately @@ -2859,7 +2858,7 @@ int ObPartTransCtx::submit_redo_if_parallel_logging_() int ret = OB_SUCCESS; if (OB_UNLIKELY(is_parallel_logging())) { ObTxRedoSubmitter submitter(*this, mt_ctx_); - if (OB_FAIL(submitter.submit(true/*flush all*/, false/*final serial*/, true /*display blocked info*/))) { + if (OB_FAIL(submitter.submit_all(true /*display blocked info*/))) { TRANS_LOG(WARN, "submit redo log fail", K(ret)); } } @@ -9237,8 +9236,9 @@ int ObPartTransCtx::move_tx_op(const ObTransferMoveTxParam &move_tx_param, if (exec_info_.state_ == ObTxState::INIT) { // promise redo log before move log bool submitted = false; - if (OB_FAIL(submit_redo_log_for_freeze_(submitted))) { - TRANS_LOG(WARN, "submit log failed", KR(ret), KPC(this)); + ObTxRedoSubmitter submitter(*this, mt_ctx_); + if (OB_FAIL(submitter.serial_submit(false))) { + TRANS_LOG(WARN, "submit redo failed", K(ret), KPC(this)); } else { sub_state_.set_transfer_blocking(); } diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 46d824fbb..4ca39fd3c 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -361,7 +361,7 @@ public: int try_submit_next_log(); // for instant logging and freezing int submit_redo_after_write(const bool force, const ObTxSEQ &write_seq_no); - int submit_redo_log_for_freeze(); + int submit_redo_log_for_freeze(const uint32_t freeze_clock); int return_redo_log_cb(ObTxLogCb *log_cb); int push_replaying_log_ts(const share::SCN log_ts_ns, const int64_t log_entry_no); int push_replayed_log_ts(const share::SCN log_ts_ns, @@ -872,7 +872,7 @@ private: const share::ObLSID &ori_ls_id, const ObAddr &ori_addr); int check_ls_state_(const SCN &snapshot, const ObLSID &ls_id, const ObStandbyCheckInfo &check_info); int get_ls_replica_readable_scn_(const ObLSID &ls_id, SCN &snapshot_version); - int submit_redo_log_for_freeze_(bool &try_submit); + int submit_redo_log_for_freeze_(bool &try_submit, const uint32_t freeze_clock); void print_first_mvcc_callback_(); public: int prepare_for_submit_redo(ObTxLogCb *&log_cb, diff --git a/src/storage/tx/ob_tx_redo_submitter.h b/src/storage/tx/ob_tx_redo_submitter.h index 2b9e22b73..c73227c9f 100644 --- a/src/storage/tx/ob_tx_redo_submitter.h +++ b/src/storage/tx/ob_tx_redo_submitter.h @@ -35,22 +35,33 @@ public: helper_(NULL), from_all_list_(false), flush_all_(false), - write_seq_no_(), serial_final_(false), submit_if_not_full_(true), + flush_freeze_clock_(UINT32_MAX), + write_seq_no_(), submit_cb_list_idx_(-1), submit_out_cnt_(0), submitted_scn_() {} ~ObTxRedoSubmitter(); - // submit, will traversal all callback-list - int submit(const bool flush_all, const bool is_final, const bool display_blocked_info = false); + int submit_for_freeze(const uint32_t freeze_clock = UINT32_MAX, const bool display_blocked_info = true) { + return submit_(true, freeze_clock, false, display_blocked_info); + } + int serial_submit(const bool is_final) { + return submit_(false, UINT32_MAX, is_final, false); + } + int submit_all(const bool display_blocked_info) { + return submit_(true, UINT32_MAX, false, display_blocked_info); + } // fill log_block, and if full submit out and continue to fill int fill(ObTxLogBlock &log_block, memtable::ObRedoLogSubmitHelper &helper, const bool display_blocked_info = true); // parallel submit, only traversal writter's callback-list int parallel_submit(const ObTxSEQ &write_seq); int get_submitted_cnt() const { return submit_out_cnt_; } share::SCN get_submitted_scn() const { return submitted_scn_; } +private: + // general submit entry, will traversal all callback-list + int submit_(const bool flush_all, const uint32_t freeze_clock, const bool is_final, const bool display_blocked_info); private: // common submit redo pipeline // prepare -> fill -> submit_out -> after_submit @@ -68,9 +79,16 @@ private: int submit_log_block_out_(const int64_t replay_hint, bool &submitted); int after_submit_redo_out_(); public: - TO_STRING_KV(K_(tx_id), K_(ls_id), K_(from_all_list), K_(flush_all), - K_(write_seq_no), K_(serial_final), - K_(submit_if_not_full), K_(submit_out_cnt), K_(submit_cb_list_idx)); + TO_STRING_KV(K_(tx_id), + K_(ls_id), + K_(from_all_list), + K_(flush_all), + K_(flush_freeze_clock), + K_(write_seq_no), + K_(serial_final), + K_(submit_if_not_full), + K_(submit_out_cnt), + K_(submit_cb_list_idx)); private: ObPartTransCtx &tx_ctx_; memtable::ObMemtableCtx &mt_ctx_; @@ -81,15 +99,17 @@ private: memtable::ObRedoLogSubmitHelper *helper_; // for writer thread submit, only submit single list // for freeze or switch leader or commit, submit from all list - bool from_all_list_; + bool from_all_list_ : 1; // whether flush all logs before can return - bool flush_all_; + bool flush_all_ : 1; + // whether is submitting the final serial log + bool serial_final_ : 1; + // wheter submit out if log_block is not full filled + bool submit_if_not_full_ : 1; + // flush memtables before and equals specified freeze_clock + int64_t flush_freeze_clock_; // writer seq_no, used to select logging callback-list ObTxSEQ write_seq_no_; - // whether is submitting the final serial log - bool serial_final_; - // wheter submit out if log_block is not full filled - bool submit_if_not_full_; // submit from which list, use by wirte thread logging int submit_cb_list_idx_; // the count of logs this submitter submitted out @@ -163,11 +183,15 @@ int ObTxRedoSubmitter::parallel_submit(const ObTxSEQ &write_seq_no) // the caller has hold TransCtx's FlushRedo write Lock // which ensure no writer thread is logging // and also hold TransCtx's CtxLock, which is safe to operate in the flush pipline -int ObTxRedoSubmitter::submit(const bool flush_all, const bool is_final, const bool display_blocked_info) +int ObTxRedoSubmitter::submit_(const bool flush_all, + const uint32_t freeze_clock, + const bool is_final, + const bool display_blocked_info) { int ret = OB_SUCCESS; from_all_list_ = true; flush_all_ = flush_all; + flush_freeze_clock_ = freeze_clock; serial_final_ = is_final; ObTxLogBlock log_block; log_block_ = &log_block; @@ -205,7 +229,6 @@ int ObTxRedoSubmitter::_submit_redo_pipeline_(const bool display_blocked_info) int ret = OB_SUCCESS; memtable::ObTxFillRedoCtx ctx; ctx.tx_id_ = tx_id_; - const bool parallel_logging = tx_ctx_.is_parallel_logging(); ctx.write_seq_no_ = write_seq_no_; const bool is_parallel_logging = tx_ctx_.is_parallel_logging(); bool stop = false; @@ -277,7 +300,7 @@ int ObTxRedoSubmitter::_submit_redo_pipeline_(const bool display_blocked_info) bool submitted = false; int submit_ret = OB_SUCCESS; if (ctx.fill_count_ > 0 && !skip_submit) { - const int64_t replay_hint = ctx.tx_id_.get_id() + (parallel_logging ? ctx.list_idx_ : 0); + const int64_t replay_hint = ctx.tx_id_.get_id() + (is_parallel_logging ? ctx.list_idx_ : 0); submit_ret = submit_log_block_out_(replay_hint, submitted); if (OB_SUCCESS == submit_ret) { submit_ret = after_submit_redo_out_(); @@ -371,6 +394,7 @@ int ObTxRedoSubmitter::fill_log_block_(memtable::ObTxFillRedoCtx &ctx) ctx.helper_ = helper_; ctx.skip_lock_node_ = false; ctx.all_list_ = from_all_list_; + ctx.freeze_clock_ = flush_freeze_clock_; ctx.fill_count_ = 0; ctx.list_idx_ = submit_cb_list_idx_; int64_t start_ts = ObTimeUtility::fast_current_time(); diff --git a/unittest/storage/tx/test_redo_submitter.cpp b/unittest/storage/tx/test_redo_submitter.cpp index dc20fad0e..ff31e48ae 100644 --- a/unittest/storage/tx/test_redo_submitter.cpp +++ b/unittest/storage/tx/test_redo_submitter.cpp @@ -50,6 +50,7 @@ struct MockDelegate { const bool has_hold_ctx_lock, share::SCN &submitted) = 0; virtual int fill_redo_log(memtable::ObTxFillRedoCtx &ctx) = 0; + }; struct MockImpl : MockDelegate { int a_; @@ -110,10 +111,14 @@ int succ_submit_redo_log_out(ObTxLogBlock & b, const bool has_hold_ctx_lock, share::SCN &submitted_scn) { - log_cb = NULL; submitted_scn.convert_for_tx(123123123); + if (log_cb) { + ((ObPartTransCtx*)(log_cb->ctx_))->return_log_cb_(log_cb); + log_cb = NULL; + } return OB_SUCCESS; } + bool ObPartTransCtx::is_parallel_logging() const { return mock_ptr->is_parallel_logging(); @@ -170,7 +175,7 @@ TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_BLOCK_FROZEN) .Times(1) .WillOnce(Invoke(succ_submit_redo_log_out)); ObTxRedoSubmitter submitter(tx_ctx, mt_ctx); - EXPECT_EQ(OB_BLOCK_FROZEN, submitter.submit(false/*flush_all*/, false)); + EXPECT_EQ(OB_BLOCK_FROZEN, submitter.serial_submit(false)); } } @@ -193,7 +198,7 @@ TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_BUF_NOT_ENOUGH) .Times(1) .WillOnce(Invoke(succ_submit_redo_log_out)); ObTxRedoSubmitter submitter(tx_ctx, mt_ctx); - EXPECT_EQ(OB_SUCCESS, submitter.submit(false/*flush_all*/, false)); + EXPECT_EQ(OB_SUCCESS, submitter.serial_submit(false)); } } @@ -216,7 +221,7 @@ TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_ALL_FILLED) .Times(1) .WillOnce(Invoke(succ_submit_redo_log_out)); ObTxRedoSubmitter submitter(tx_ctx, mt_ctx); - EXPECT_EQ(OB_SUCCESS, submitter.submit(false/*flush_all*/, false)); + EXPECT_EQ(OB_SUCCESS, submitter.serial_submit(false)); } } @@ -239,7 +244,7 @@ TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_UNEXPECTED_ERROR) .Times(1) .WillOnce(Invoke(succ_submit_redo_log_out)); ObTxRedoSubmitter submitter(tx_ctx, mt_ctx); - EXPECT_EQ(OB_ALLOCATE_MEMORY_FAILED, submitter.submit(false/*flush_all*/, false)); + EXPECT_EQ(OB_ALLOCATE_MEMORY_FAILED, submitter.serial_submit(false)); } } @@ -262,7 +267,7 @@ TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_serial_final) .Times(1) .WillOnce(Invoke(succ_submit_redo_log_out)); ObTxRedoSubmitter submitter(tx_ctx, mt_ctx); - EXPECT_EQ(OB_SUCCESS, submitter.submit(false/*flush_all*/, true/*serial final*/)); + EXPECT_EQ(OB_SUCCESS, submitter.serial_submit(true/*serial final*/)); } } @@ -380,35 +385,19 @@ TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_BLOCK_FROZEN) EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_)) .Times(1) .WillOnce(Invoke(succ_submit_redo_log_out)); - ObTxRedoSubmitter submitter(tx_ctx, mt_ctx); - EXPECT_EQ(OB_BLOCK_FROZEN, submitter.submit(false/*flush_all*/, false)); - } -} - -TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_BLOCKED_BY_OTHERS) -{ - mdo_.a_ = 3; - EXPECT_CALL(mdo_, is_parallel_logging()) - .Times(AtLeast(1)) - .WillRepeatedly(Return(true)); - { - InSequence s1; EXPECT_CALL(mdo_, fill_redo_log(_)) .Times(1) .WillOnce(Invoke([](ObTxFillRedoCtx &ctx) { - ctx.fill_count_ = 100; - ctx.buf_pos_ = 200; - return OB_ITER_END; + ctx.fill_count_ = 0; + ctx.buf_pos_ = 0; + return OB_BLOCK_FROZEN; })); - EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_)) - .Times(1) - .WillOnce(Invoke(succ_submit_redo_log_out)); ObTxRedoSubmitter submitter(tx_ctx, mt_ctx); - EXPECT_EQ(OB_ITER_END, submitter.submit(false/*flush_all*/, false)); + EXPECT_EQ(OB_BLOCK_FROZEN, submitter.submit_for_freeze()); } } -TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_CURRENT_FILLED_BUT_OTHERS_REMAINS) +TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_FROZEN_BLOCKED_BY_OTHERS) { mdo_.a_ = 3; EXPECT_CALL(mdo_, is_parallel_logging()) @@ -416,6 +405,7 @@ TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_CURRENT_FILLED_BUT .WillRepeatedly(Return(true)); { InSequence s1; + // first list flushed the flushable part EXPECT_CALL(mdo_, fill_redo_log(_)) .Times(1) .WillOnce(Invoke([](ObTxFillRedoCtx &ctx) { @@ -423,11 +413,32 @@ TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_CURRENT_FILLED_BUT ctx.buf_pos_ = 200; return OB_EAGAIN; })); + // submit it out to log service layer EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_)) .Times(1) .WillOnce(Invoke(succ_submit_redo_log_out)); + // retry from other lists, the list with small epoch flushed hit a block frozen + EXPECT_CALL(mdo_, fill_redo_log(_)) + .Times(1) + .WillOnce(Invoke([](ObTxFillRedoCtx &ctx) { + ctx.fill_count_ = 100; + ctx.buf_pos_ = 200; + return OB_BLOCK_FROZEN; + })); + // submit it out to log service layer + EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_)) + .Times(1) + .WillOnce(Invoke(succ_submit_redo_log_out)); + // retry from other list, can not submit others due to the list with min epoch is block frozen + EXPECT_CALL(mdo_, fill_redo_log(_)) + .Times(1) + .WillOnce(Invoke([](ObTxFillRedoCtx &ctx) { + ctx.fill_count_ = 0; + ctx.buf_pos_ = 0; + return OB_BLOCK_FROZEN; + })); ObTxRedoSubmitter submitter(tx_ctx, mt_ctx); - EXPECT_EQ(OB_EAGAIN, submitter.submit(false/*flush_all*/, false)); + EXPECT_EQ(OB_BLOCK_FROZEN, submitter.submit_for_freeze()); } } @@ -439,6 +450,7 @@ TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_BUF_NOT_ENOUGH) .WillRepeatedly(Return(true)); { InSequence s1; + // the list with small epoch is large EXPECT_CALL(mdo_, fill_redo_log(_)) .Times(1) .WillOnce(Invoke([](ObTxFillRedoCtx &ctx) { @@ -449,8 +461,38 @@ TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_BUF_NOT_ENOUGH) EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_)) .Times(1) .WillOnce(Invoke(succ_submit_redo_log_out)); + // the list with small epoch is all flushed + EXPECT_CALL(mdo_, fill_redo_log(_)) + .Times(1) + .WillOnce(Invoke([](ObTxFillRedoCtx &ctx) { + ctx.fill_count_ = 100; + ctx.buf_pos_ = 200; + return OB_EAGAIN; + })); + EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_)) + .Times(1) + .WillOnce(Invoke(succ_submit_redo_log_out)); + // the second list with small epoch is all flushed + EXPECT_CALL(mdo_, fill_redo_log(_)) + .Times(1) + .WillOnce(Invoke([](ObTxFillRedoCtx &ctx) { + ctx.fill_count_ = 100; + ctx.buf_pos_ = 200; + return OB_EAGAIN; + })); + EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_)) + .Times(1) + .WillOnce(Invoke(succ_submit_redo_log_out)); + // all list filled, nothing to fill + EXPECT_CALL(mdo_, fill_redo_log(_)) + .Times(1) + .WillOnce(Invoke([](ObTxFillRedoCtx &ctx) { + ctx.fill_count_ = 0; + ctx.buf_pos_ = 0; + return OB_SUCCESS; + })); ObTxRedoSubmitter submitter(tx_ctx, mt_ctx); - EXPECT_EQ(OB_SUCCESS, submitter.submit(false/*flush_all*/, false)); + EXPECT_EQ(OB_SUCCESS, submitter.submit_for_freeze()); } } @@ -473,7 +515,7 @@ TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_ALL_FILLED) .Times(1) .WillOnce(Invoke(succ_submit_redo_log_out)); ObTxRedoSubmitter submitter(tx_ctx, mt_ctx); - EXPECT_EQ(OB_SUCCESS, submitter.submit(false/*flush_all*/, false)); + EXPECT_EQ(OB_SUCCESS, submitter.submit_for_freeze()); } } @@ -609,7 +651,7 @@ TEST_F(ObTestRedoSubmitter, submit_ROW_SIZE_TOO_LARGE) })); { ObTxRedoSubmitter submitter(tx_ctx, mt_ctx); - EXPECT_EQ(OB_ERR_TOO_BIG_ROWSIZE, submitter.submit(true, false, true)); + EXPECT_EQ(OB_ERR_TOO_BIG_ROWSIZE, submitter.submit_all(true)); EXPECT_EQ(submitter.get_submitted_cnt(), 0); } {