diff --git a/deps/oblib/src/lib/stat/ob_latch_define.h b/deps/oblib/src/lib/stat/ob_latch_define.h index ab94c66e6..6729ed42a 100644 --- a/deps/oblib/src/lib/stat/ob_latch_define.h +++ b/deps/oblib/src/lib/stat/ob_latch_define.h @@ -296,7 +296,7 @@ LATCH_DEF(TENANT_MEM_USAGE_LOCK, 285, "tenant memory usage lock" , LATCH_FIFO, 2 LATCH_DEF(TX_TABLE_LOCK, 286, "tx table lock", LATCH_FIFO, 2000, 0, TX_TABLE_LOCK_WAIT, "tx table lock") LATCH_DEF(MEMTABLE_STAT_LOCK, 287, "metmable stat lock", LATCH_FIFO, 2000, 0, MEMTABLE_STAT_LOCK_WAIT, "memtable stat lock") LATCH_DEF(DEADLOCK_DETECT_LOCK, 288, "deadlock detect lock", LATCH_FIFO, 2000, 0, DEADLOCK_DETECT_LOCK_WAIT, "deadlock detect lock") - +LATCH_DEF(FREEZE_THREAD_POOL_LOCK, 289, "freeze thread pool lock", LATCH_FIFO, 2000, 0, FREEZE_THREAD_POOL_WAIT, "freeze thread pool lock") LATCH_DEF(LATCH_END, 99999, "latch end", LATCH_FIFO, 2000, 0, WAIT_EVENT_END, "latch end") #endif diff --git a/deps/oblib/src/lib/wait_event/ob_wait_event.h b/deps/oblib/src/lib/wait_event/ob_wait_event.h index 63a923bfa..a756095ab 100644 --- a/deps/oblib/src/lib/wait_event/ob_wait_event.h +++ b/deps/oblib/src/lib/wait_event/ob_wait_event.h @@ -345,6 +345,7 @@ WAIT_EVENT_DEF(REPLAY_STATUS_WAIT, 16051, "replay status lock wait", "", "", "", WAIT_EVENT_DEF(REPLAY_STATUS_TASK_WAIT, 16052, "replay status task lock wait", "", "", "", CONCURRENCY, "REPLAY_STATUS_TASK_WAIT", true) WAIT_EVENT_DEF(MAX_APPLY_SCN_WAIT, 16053, "max apply scn lock wait", "", "", "", CONCURRENCY, "MAX_APPLY_SCN_WAIT", true) WAIT_EVENT_DEF(GC_HANDLER_WAIT, 16054, "gc handler lock wait", "", "", "", CONCURRENCY, "GC_HANDLER_WAIT", true) +WAIT_EVENT_DEF(FREEZE_THREAD_POOL_WAIT, 16055, "freeze thread pool wait", "", "", "", CONCURRENCY, "FREEZE_THREAD_POOL_WAIT", true) //replication group WAIT_EVENT_DEF(RG_TRANSFER_LOCK_WAIT, 17000, "transfer lock wait", "src_rg", "dst_rg", "transfer_pkey", CONCURRENCY, "transfer lock wait", false) diff --git a/src/storage/checkpoint/ob_data_checkpoint.cpp b/src/storage/checkpoint/ob_data_checkpoint.cpp index 6613f7705..21ac88335 100644 --- a/src/storage/checkpoint/ob_data_checkpoint.cpp +++ b/src/storage/checkpoint/ob_data_checkpoint.cpp @@ -276,6 +276,17 @@ bool ObDataCheckpoint::is_flushing() const return !ls_freeze_finished_; } +bool ObDataCheckpoint::is_empty() +{ + ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_); + ObSpinLockGuard guard(lock_); + + return new_create_list_.is_empty() && + active_list_.is_empty() && + prepare_list_.is_empty() && + ls_frozen_list_.is_empty(); +} + static inline bool task_reach_time_interval(int64_t i, int64_t &last_time) { bool bret = false; @@ -355,35 +366,51 @@ void ObDataCheckpoint::ls_frozen_to_active_(int64_t &last_time) bool ls_frozen_list_is_empty = false; do { { - // traversal list once - ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_); - ObCheckpointIterator iterator; - ls_frozen_list_.get_iterator(iterator); - while (iterator.has_next()) { - int ret = OB_SUCCESS; - auto ob_freeze_checkpoint = iterator.get_next(); - if (ob_freeze_checkpoint->is_active_checkpoint()) { - ObSpinLockGuard guard(lock_); - // avoid new active ob_freeze_checkpoint block minor merge - // push back to new_create_list and wait next freeze - if(OB_FAIL(transfer_from_ls_frozen_to_new_created_(ob_freeze_checkpoint))) { - STORAGE_LOG(WARN, "ob_freeze_checkpoint move to new_created_list failed", - K(ret), K(*ob_freeze_checkpoint)); - } - } else { - ObSpinLockGuard guard(lock_); - if (OB_FAIL(ob_freeze_checkpoint->check_can_move_to_active(true))) { - STORAGE_LOG(WARN, "check can freeze failed", K(ret), K(*ob_freeze_checkpoint)); + int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA; + int64_t write_lock = 0; + ObLSLockGuard lock_ls(ls_->lock_, read_lock, write_lock); + + if (OB_UNLIKELY(ls_->is_stopped_)) { + ret = OB_NOT_RUNNING; + STORAGE_LOG(WARN, "ls stopped", K(ret), K_(ls_->ls_meta)); + } else if (OB_UNLIKELY(!(ls_->get_log_handler()->is_replay_enabled()))) { + ret = OB_NOT_RUNNING; + STORAGE_LOG(WARN, "log handler not enable replay, should not freeze", K(ret), K_(ls_->ls_meta)); + } else { + // traversal list once + ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_); + ObCheckpointIterator iterator; + ls_frozen_list_.get_iterator(iterator); + while (iterator.has_next()) { + int ret = OB_SUCCESS; + auto ob_freeze_checkpoint = iterator.get_next(); + if (ob_freeze_checkpoint->is_active_checkpoint()) { + ObSpinLockGuard guard(lock_); + // avoid new active ob_freeze_checkpoint block minor merge + // push back to new_create_list and wait next freeze + if(OB_FAIL(transfer_from_ls_frozen_to_new_created_(ob_freeze_checkpoint))) { + STORAGE_LOG(WARN, "ob_freeze_checkpoint move to new_created_list failed", + K(ret), K(*ob_freeze_checkpoint)); + } + } else { + ObSpinLockGuard guard(lock_); + if (OB_FAIL(ob_freeze_checkpoint->check_can_move_to_active(true))) { + STORAGE_LOG(WARN, "check can freeze failed", K(ret), K(*ob_freeze_checkpoint)); + } } } + ls_frozen_list_is_empty = ls_frozen_list_.is_empty(); + } + + if (OB_NOT_RUNNING == ret && is_empty()) { + ls_frozen_list_is_empty = true; } - ls_frozen_list_is_empty = ls_frozen_list_.is_empty(); } if (!ls_frozen_list_is_empty) { ob_usleep(LOOP_TRAVERSAL_INTERVAL_US); if (task_reach_time_interval(3 * 1000 * 1000, last_time)) { - STORAGE_LOG(WARN, "cost too much time in ls_frozen_list_", K(ls_->get_ls_id())); + STORAGE_LOG(WARN, "cost too much time in ls_frozen_list_", K(ret), K(ls_->get_ls_id())); ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_); print_list_(ls_frozen_list_); } @@ -401,28 +428,44 @@ void ObDataCheckpoint::ls_frozen_to_prepare_(int64_t &last_time) bool ls_frozen_list_is_empty = false; do { { - // traversal list once - ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_); - ObCheckpointIterator iterator; - ls_frozen_list_.get_iterator(iterator); - while (iterator.has_next()) { - int tmp_ret = OB_SUCCESS; - auto ob_freeze_checkpoint = iterator.get_next(); - if (ob_freeze_checkpoint->ready_for_flush()) { - if (OB_FAIL(ob_freeze_checkpoint->finish_freeze())) { - STORAGE_LOG(WARN, "finish freeze failed", K(ret)); - } - } else if (ob_freeze_checkpoint->is_active_checkpoint()) { - // avoid active ob_freeze_checkpoint block minor merge - // push back to active_list and wait next freeze - ObSpinLockGuard guard(lock_); - if(OB_SUCCESS != (tmp_ret = (transfer_from_ls_frozen_to_active_(ob_freeze_checkpoint)))) { - STORAGE_LOG(WARN, "active ob_freeze_checkpoint move to active_list failed", - K(tmp_ret), K(*ob_freeze_checkpoint)); + int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA; + int64_t write_lock = 0; + ObLSLockGuard lock_ls(ls_->lock_, read_lock, write_lock); + + if (OB_UNLIKELY(ls_->is_stopped_)) { + ret = OB_NOT_RUNNING; + STORAGE_LOG(WARN, "ls stopped", K(ret), K_(ls_->ls_meta)); + } else if (OB_UNLIKELY(!(ls_->get_log_handler()->is_replay_enabled()))) { + ret = OB_NOT_RUNNING; + STORAGE_LOG(WARN, "log handler not enable replay, should not freeze", K(ret), K_(ls_->ls_meta)); + } else { + // traversal list once + ObSpinLockGuard ls_frozen_list_guard(ls_frozen_list_lock_); + ObCheckpointIterator iterator; + ls_frozen_list_.get_iterator(iterator); + while (iterator.has_next()) { + int tmp_ret = OB_SUCCESS; + auto ob_freeze_checkpoint = iterator.get_next(); + if (ob_freeze_checkpoint->ready_for_flush()) { + if (OB_FAIL(ob_freeze_checkpoint->finish_freeze())) { + STORAGE_LOG(WARN, "finish freeze failed", K(ret)); + } + } else if (ob_freeze_checkpoint->is_active_checkpoint()) { + // avoid active ob_freeze_checkpoint block minor merge + // push back to active_list and wait next freeze + ObSpinLockGuard guard(lock_); + if(OB_SUCCESS != (tmp_ret = (transfer_from_ls_frozen_to_active_(ob_freeze_checkpoint)))) { + STORAGE_LOG(WARN, "active ob_freeze_checkpoint move to active_list failed", + K(tmp_ret), K(*ob_freeze_checkpoint)); + } } } + ls_frozen_list_is_empty = ls_frozen_list_.is_empty(); + } + + if (OB_NOT_RUNNING == ret && is_empty()) { + ls_frozen_list_is_empty = true; } - ls_frozen_list_is_empty = ls_frozen_list_.is_empty(); } if (!ls_frozen_list_is_empty) { diff --git a/src/storage/checkpoint/ob_data_checkpoint.h b/src/storage/checkpoint/ob_data_checkpoint.h index 457ce2f50..ab3ce3b00 100644 --- a/src/storage/checkpoint/ob_data_checkpoint.h +++ b/src/storage/checkpoint/ob_data_checkpoint.h @@ -118,6 +118,8 @@ public: bool has_prepared_flush_checkpoint(); + bool is_empty(); + private: // traversal prepare_list to flush memtable // case1: some memtable flush failed when ls freeze diff --git a/src/storage/ls/ob_freezer.cpp b/src/storage/ls/ob_freezer.cpp index 00a265aa1..1a24aab4a 100644 --- a/src/storage/ls/ob_freezer.cpp +++ b/src/storage/ls/ob_freezer.cpp @@ -22,6 +22,8 @@ #include "storage/tx/ob_trans_service.h" #include "storage/compaction/ob_compaction_diagnose.h" #include "logservice/ob_log_service.h" +#include "storage/ls/ob_ls.h" +#include "storage/tx_storage/ob_tenant_freezer.h" namespace oceanbase { @@ -195,12 +197,7 @@ ObFreezer::ObFreezer() : freeze_flag_(0), freeze_snapshot_version_(), max_decided_scn_(), - ls_wrs_handler_(nullptr), - ls_tx_svr_(nullptr), - ls_tablet_svr_(nullptr), - data_checkpoint_(nullptr), - loghandler_(nullptr), - ls_id_(), + ls_(nullptr), stat_(), empty_memtable_cnt_(0), high_priority_freeze_cnt_(0), @@ -210,21 +207,11 @@ ObFreezer::ObFreezer() is_inited_(false) {} -ObFreezer::ObFreezer(ObLSWRSHandler *ls_loop_worker, - ObLSTxService *ls_tx_svr, - ObLSTabletService *ls_tablet_svr, - checkpoint::ObDataCheckpoint *data_checkpoint, - ObILogHandler *ob_loghandler, - const share::ObLSID &ls_id) +ObFreezer::ObFreezer(ObLS *ls) : freeze_flag_(0), freeze_snapshot_version_(), max_decided_scn_(), - ls_wrs_handler_(ls_loop_worker), - ls_tx_svr_(ls_tx_svr), - ls_tablet_svr_(ls_tablet_svr), - data_checkpoint_(data_checkpoint), - loghandler_(ob_loghandler), - ls_id_(ls_id), + ls_(ls), stat_(), empty_memtable_cnt_(0), high_priority_freeze_cnt_(0), @@ -239,39 +226,12 @@ ObFreezer::~ObFreezer() reset(); } -void ObFreezer::set(ObLSWRSHandler *ls_loop_worker, - ObLSTxService *ls_tx_svr, - ObLSTabletService *ls_tablet_svr, - checkpoint::ObDataCheckpoint *data_checkpoint, - ObILogHandler *ob_loghandler, - const share::ObLSID &ls_id, - uint32_t freeze_flag) -{ - freeze_flag_ = freeze_flag; - freeze_snapshot_version_.reset(); - max_decided_scn_.reset(); - ls_wrs_handler_ = ls_loop_worker; - ls_tx_svr_ = ls_tx_svr; - ls_tablet_svr_ = ls_tablet_svr; - data_checkpoint_ = data_checkpoint; - loghandler_ = ob_loghandler; - ls_id_ = ls_id; - stat_.reset(); - empty_memtable_cnt_ = 0; - need_resubmit_log_ = false; -} - void ObFreezer::reset() { freeze_flag_ = 0; freeze_snapshot_version_.reset(); max_decided_scn_.reset(); - ls_wrs_handler_ = nullptr; - ls_tx_svr_ = nullptr; - data_checkpoint_ = nullptr; - ls_tablet_svr_ = nullptr; - loghandler_ = nullptr; - ls_id_.reset(); + ls_ = nullptr; stat_.reset(); empty_memtable_cnt_ = 0; high_priority_freeze_cnt_ = 0; @@ -281,37 +241,70 @@ void ObFreezer::reset() is_inited_ = false; } -int ObFreezer::init(ObLSWRSHandler *ls_loop_worker, - ObLSTxService *ls_tx_svr, - ObLSTabletService *ls_tablet_svr, - checkpoint::ObDataCheckpoint *data_checkpoint, - ObILogHandler *log_handler, - const share::ObLSID &ls_id) +int ObFreezer::init(ObLS *ls) { int ret = OB_SUCCESS; - if (OB_ISNULL(ls_loop_worker) || - OB_ISNULL(ls_tx_svr) || - OB_ISNULL(ls_tablet_svr) || - OB_ISNULL(data_checkpoint) || - OB_ISNULL(log_handler) || - !ls_id.is_valid()) { + + if (OB_ISNULL(ls)) { ret = OB_INVALID_ARGUMENT; - TRANS_LOG(WARN, "[Freezer] invalid argument", K(ret), K(ls_loop_worker), K(ls_tx_svr), - K(ls_tablet_svr), K(data_checkpoint), K(log_handler), K(ls_id)); + TRANS_LOG(WARN, "[Freezer] invalid argument", K(ret)); } else { - set(ls_loop_worker, ls_tx_svr, ls_tablet_svr, data_checkpoint, log_handler, ls_id); + freeze_flag_ = 0; + freeze_snapshot_version_.reset(); + max_decided_scn_.reset(); + ls_ = ls; + stat_.reset(); + empty_memtable_cnt_ = 0; + high_priority_freeze_cnt_ = 0; + low_priority_freeze_cnt_ = 0; + need_resubmit_log_ = false; + enable_ = true; + is_inited_ = true; } + return ret; } +/* ls info */ +share::ObLSID ObFreezer::get_ls_id() +{ + return OB_ISNULL(ls_) ? INVALID_LS : ls_->get_ls_id(); +} + +checkpoint::ObDataCheckpoint* ObFreezer::get_ls_data_checkpoint() +{ + return OB_ISNULL(ls_) ? nullptr : ls_->get_data_checkpoint(); +} + +ObLSTxService* ObFreezer::get_ls_tx_svr() +{ + return OB_ISNULL(ls_) ? nullptr : ls_->get_tx_svr(); +} + +ObLSTabletService* ObFreezer::get_ls_tablet_svr() +{ + return OB_ISNULL(ls_) ? nullptr : ls_->get_tablet_svr(); +} + +logservice::ObILogHandler* ObFreezer::get_ls_log_handler() +{ + return OB_ISNULL(ls_) ? nullptr : ls_->get_log_handler(); +} + +ObLSWRSHandler* ObFreezer::get_ls_wrs_handler() +{ + return OB_ISNULL(ls_) ? nullptr : ls_->get_ls_wrs_handler(); +} + /* logstream freeze */ int ObFreezer::logstream_freeze() { int ret = OB_SUCCESS; SCN freeze_snapshot_version; SCN max_decided_scn; - FLOG_INFO("[Freezer] logstream_freeze start", K(ret), K_(ls_id)); + share::ObLSID ls_id = get_ls_id(); + FLOG_INFO("[Freezer] logstream_freeze start", K(ret), K(ls_id)); stat_.reset(); stat_.start_time_ = ObTimeUtility::current_time(); stat_.state_ = ObFreezeState::NOT_SET_FREEZE_FLAG; @@ -319,35 +312,27 @@ int ObFreezer::logstream_freeze() ObLSFreezeGuard guard(*this); if (IS_NOT_INIT) { ret = OB_NOT_INIT; - LOG_WARN("[Freezer] not inited", K(ret), K_(ls_id)); + LOG_WARN("[Freezer] not inited", K(ret), K(ls_id)); } else if (OB_UNLIKELY(!enable_)) { - LOG_WARN("freezer is offline, can not freeze now", K(ret), K_(ls_id)); + LOG_WARN("freezer is offline, can not freeze now", K(ret), K(ls_id)); } else if (OB_FAIL(decide_max_decided_scn(max_decided_scn))) { - TRANS_LOG(WARN, "[Freezer] decide max decided log ts failure", K(ret), K_(ls_id)); + TRANS_LOG(WARN, "[Freezer] decide max decided log ts failure", K(ret), K(ls_id)); } else if (OB_FAIL(get_ls_weak_read_scn(freeze_snapshot_version))) { - TRANS_LOG(WARN, "[Freezer] get ls weak read ts failure", K(ret), K_(ls_id)); + TRANS_LOG(WARN, "[Freezer] get ls weak read ts failure", K(ret), K(ls_id)); } else if (ObScnRange::MAX_SCN == freeze_snapshot_version || ObScnRange::MIN_SCN >= freeze_snapshot_version) { ret = OB_NOT_INIT; - LOG_WARN("[Freezer] weak read service not inited", K(ret), K_(ls_id), K(freeze_snapshot_version)); + LOG_WARN("[Freezer] weak read service not inited", K(ret), K(ls_id), K(freeze_snapshot_version)); } else if (OB_FAIL(set_freeze_flag())) { - FLOG_INFO("[Freezer] freeze is running", K(ret), K_(ls_id)); + FLOG_INFO("[Freezer] freeze is running", K(ret), K(ls_id)); } else if (FALSE_IT(max_decided_scn_ = max_decided_scn)) { } else if (FALSE_IT(freeze_snapshot_version_ = freeze_snapshot_version)) { } else if (FALSE_IT(set_need_resubmit_log(false))) { } else if (FALSE_IT(stat_.state_ = ObFreezeState::NOT_SUBMIT_LOG)) { } else if (OB_FAIL(inner_logstream_freeze())) { + TRANS_LOG(WARN, "[Freezer] logstream_freeze failure", K(ret), K(ls_id)); undo_freeze_(); - TRANS_LOG(WARN, "[Freezer] logstream_freeze failure", K(ret), K_(ls_id)); - } else { - stat_.add_diagnose_info("logstream_freeze success"); - unset_freeze_(); - uint32_t freeze_clock = get_freeze_clock(); - FLOG_INFO("[Freezer] logstream_freeze success", K(ret), K_(ls_id), K(freeze_clock)); } - stat_.state_ = ObFreezeState::FINISH; - stat_.end_time_ = ObTimeUtility::current_time(); - stat_.ret_code_ = ret; print_freezer_statistics(); @@ -357,33 +342,75 @@ int ObFreezer::logstream_freeze() int ObFreezer::inner_logstream_freeze() { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); + + if (OB_FAIL(get_ls_data_checkpoint()->ls_freeze(SCN::max_scn()))) { + // move memtables from active_list to frozen_list + TRANS_LOG(WARN, "[Freezer] data_checkpoint freeze failed", K(ret), K(ls_id)); + stat_.add_diagnose_info("data_checkpoint freeze failed"); + } else if (FALSE_IT(submit_log_for_freeze())) { + } else if (OB_FAIL(submit_freeze_task(true/*is_ls_freeze*/))) { + TRANS_LOG(WARN, "failed to submit ls_freeze task", K(ret), K(ls_id)); + stat_.add_diagnose_info("fail to submit ls_freeze_task"); + } else { + TRANS_LOG(INFO, "[Freezer] succeed to start ls_freeze_task", K(ret), K(ls_id)); + } + + return ret; +} + +void ObFreezer::ls_freeze_task() +{ + int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); const int64_t start = ObTimeUtility::current_time(); uint32_t freeze_clock = get_freeze_clock(); - TRANS_LOG(INFO, "[Freezer] freeze_clock", K(ret), K_(ls_id), K(freeze_clock)); + TRANS_LOG(INFO, "[Freezer] freeze_clock", K(ls_id), K(freeze_clock)); - if (OB_FAIL(data_checkpoint_->ls_freeze(SCN::max_scn()))) { - // move memtables from active_list to frozen_list - TRANS_LOG(WARN, "[Freezer] data_checkpoint freeze failed", K(ret), K_(ls_id)); - stat_.add_diagnose_info("data_checkpoint freeze failed"); - } else { - submit_log_for_freeze(); - // wait till all memtables are moved from frozen_list to prepare_list - // this means that all memtables can be dumped - while (!data_checkpoint_->ls_freeze_finished()) { - const int64_t cost_time = ObTimeUtility::current_time() - start; - if (cost_time > 5 * 1000 * 1000) { - if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) { - if (need_resubmit_log()) { + // wait till all memtables are moved from frozen_list to prepare_list + // this means that all memtables can be dumped + while (!get_ls_data_checkpoint()->ls_freeze_finished()) { + const int64_t cost_time = ObTimeUtility::current_time() - start; + if (cost_time > 5 * 1000 * 1000) { + if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) { + if (need_resubmit_log()) { + int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA; + int64_t write_lock = 0; + ObLSLockGuard lock_ls(ls_->lock_, read_lock, write_lock); + if (OB_FAIL(check_ls_state())) { + } else { submit_log_for_freeze(); - TRANS_LOG(INFO, "[Freezer] resubmit log for ls_freeze", K_(ls_id), K(cost_time)); + TRANS_LOG(INFO, "[Freezer] resubmit log for ls_freeze", K(ls_id), K(cost_time)); } - TRANS_LOG(WARN, "[Freezer] finish ls_freeze costs too much time", - K_(ls_id), K(cost_time)); - stat_.add_diagnose_info("finish ls_freeze costs too much time"); } + TRANS_LOG(WARN, "[Freezer] finish ls_freeze costs too much time", + K(ls_id), K(cost_time)); + stat_.add_diagnose_info("finish ls_freeze costs too much time"); } - ob_usleep(100); } + ob_usleep(100); + } + stat_.add_diagnose_info("logstream_freeze success"); + FLOG_INFO("[Freezer] logstream_freeze success", K(ls_id), K(freeze_clock)); + + stat_.state_ = ObFreezeState::FINISH; + stat_.end_time_ = ObTimeUtility::current_time(); + stat_.ret_code_ = ret; + + unset_freeze_(); +} + +// must be used under the protection of ls_lock +int ObFreezer::check_ls_state() +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(ls_->is_stopped_)) { + ret = OB_NOT_RUNNING; + STORAGE_LOG(WARN, "ls stopped", K(ret), K_(ls_->ls_meta)); + } else if (OB_UNLIKELY(!(ls_->get_log_handler()->is_replay_enabled()))) { + ret = OB_NOT_RUNNING; + STORAGE_LOG(WARN, "log handler not enable replay, should not freeze", K(ret), K_(ls_->ls_meta)); } return ret; @@ -393,12 +420,13 @@ int ObFreezer::inner_logstream_freeze() int ObFreezer::tablet_freeze(const ObTabletID &tablet_id) { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); ObTabletHandle handle; ObTablet *tablet = nullptr; ObTabletMemtableMgr *memtable_mgr = nullptr; memtable::ObIMemtable *imemtable = nullptr; SCN freeze_snapshot_version; - FLOG_INFO("[Freezer] tablet_freeze start", K(ret), K_(ls_id), K(tablet_id)); + FLOG_INFO("[Freezer] tablet_freeze start", K(ret), K(ls_id), K(tablet_id)); stat_.reset(); stat_.start_time_ = ObTimeUtility::current_time(); stat_.state_ = ObFreezeState::NOT_SET_FREEZE_FLAG; @@ -407,75 +435,74 @@ int ObFreezer::tablet_freeze(const ObTabletID &tablet_id) ObTabletFreezeGuard guard(*this, true /* try guard */); if (IS_NOT_INIT) { ret = OB_NOT_INIT; - TRANS_LOG(WARN, "[Freezer] not inited", K(ret), K_(ls_id), K(tablet_id)); + TRANS_LOG(WARN, "[Freezer] not inited", K(ret), K(ls_id), K(tablet_id)); } else if (OB_UNLIKELY(!enable_)) { - LOG_WARN("freezer is offline, can not freeze now", K(ret), K_(ls_id)); + LOG_WARN("freezer is offline, can not freeze now", K(ret), K(ls_id)); } else if (OB_FAIL(guard.try_set_tablet_freeze_begin())) { // no need freeze now, a ls freeze is running or will be running ret = OB_SUCCESS; - FLOG_INFO("[Freezer] ls freeze is running, no need freeze again", K(ret), K_(ls_id), K(tablet_id)); + FLOG_INFO("[Freezer] ls freeze is running, no need freeze again", K(ret), K(ls_id), K(tablet_id)); } else if (OB_FAIL(set_freeze_flag_without_inc_freeze_clock())) { ret = OB_SUCCESS; - FLOG_INFO("[Freezer] freeze is running", K(ret), K_(ls_id), K(tablet_id)); + FLOG_INFO("[Freezer] freeze is running", K(ret), K(ls_id), K(tablet_id)); } else if (FALSE_IT(stat_.state_ = ObFreezeState::NOT_SUBMIT_LOG)) { } else { // succeed to set freeze flag if (OB_FAIL(get_ls_weak_read_scn(freeze_snapshot_version))) { - TRANS_LOG(WARN, "[Freezer] get ls weak read scn failure", K(ret), K_(ls_id)); + TRANS_LOG(WARN, "[Freezer] get ls weak read scn failure", K(ret), K(ls_id)); } else if (ObScnRange::MAX_SCN == freeze_snapshot_version || ObScnRange::MIN_SCN >= freeze_snapshot_version) { ret = OB_NOT_INIT; - LOG_WARN("[Freezer] weak read service not inited", K(ret), K_(ls_id)); + LOG_WARN("[Freezer] weak read service not inited", K(ret), K(ls_id)); } else if (FALSE_IT(freeze_snapshot_version_ = freeze_snapshot_version)) { } else if (FALSE_IT(set_need_resubmit_log(false))) { - } else if (OB_FAIL(ls_tablet_svr_->get_tablet(tablet_id, - handle, - ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { - TRANS_LOG(WARN, "[Freezer] fail to get tablet", K(ret), K_(ls_id), K(tablet_id)); + } else if (OB_FAIL(get_ls_tablet_svr()->get_tablet(tablet_id, + handle, + ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { + TRANS_LOG(WARN, "[Freezer] fail to get tablet", K(ret), K(ls_id), K(tablet_id)); stat_.add_diagnose_info("fail to get tablet"); } else if (FALSE_IT(tablet = handle.get_obj())) { } else if (OB_ISNULL(memtable_mgr = static_cast(tablet->get_memtable_mgr()))) { - TRANS_LOG(WARN, "[Freezer] tablet_memtable_mgr is null", K(ret), K_(ls_id), K(tablet_id)); + TRANS_LOG(WARN, "[Freezer] tablet_memtable_mgr is null", K(ret), K(ls_id), K(tablet_id)); } else if (OB_FAIL(memtable_mgr->set_is_tablet_freeze_for_active_memtable(imemtable))) { if (ret == OB_ENTRY_NOT_EXIST) { ret = OB_SUCCESS; TRANS_LOG(INFO, "[Freezer] no need to freeze since there is no active memtable", K(ret), - K_(ls_id), K(tablet_id)); + K(ls_id), K(tablet_id)); stat_.add_diagnose_info("no need to freeze since there is no active memtable"); } else { - TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K_(ls_id), K(tablet_id)); + TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id)); stat_.add_diagnose_info("fail to set is_tablet_freeze"); } - } else if (OB_FAIL(handle_memtable_for_tablet_freeze(imemtable))) { - TRANS_LOG(WARN, "[Freezer] fail to handle memtable", K(ret), K_(ls_id), K(tablet_id)); + } else if (FALSE_IT(submit_log_for_freeze())) { + } else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, imemtable))) { + TRANS_LOG(WARN, "[Freezer] fail to submit tablet_freeze_task", K(ret), K(ls_id), K(tablet_id)); + stat_.add_diagnose_info("fail to submit tablet_freeze_task"); } else { - stat_.add_diagnose_info("tablet_freeze success"); + TRANS_LOG(INFO, "[Freezer] succeed to start tablet_freeze_task", K(ret), K(ls_id), K(tablet_id)); + } + if (OB_FAIL(ret) || OB_ISNULL(imemtable)) { + stat_.state_ = ObFreezeState::FINISH; + stat_.end_time_ = ObTimeUtility::current_time(); + stat_.ret_code_ = ret; + print_freezer_statistics(); + unset_freeze_(); } - unset_freeze_(); } - if (OB_SUCC(ret)) { - FLOG_INFO("[Freezer] tablet_freeze success", K(ret), K_(ls_id), K(tablet_id)); - } - - stat_.state_ = ObFreezeState::FINISH; - stat_.end_time_ = ObTimeUtility::current_time(); - stat_.ret_code_ = ret; - - print_freezer_statistics(); - return ret; } int ObFreezer::force_tablet_freeze(const ObTabletID &tablet_id) { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); ObTabletHandle handle; ObTablet *tablet = nullptr; ObTabletMemtableMgr *memtable_mgr = nullptr; memtable::ObIMemtable *imemtable = nullptr; SCN freeze_snapshot_version; - FLOG_INFO("[Freezer] force_tablet_freeze start", K(ret), K_(ls_id), K(tablet_id)); + FLOG_INFO("[Freezer] force_tablet_freeze start", K(ret), K(ls_id), K(tablet_id)); stat_.reset(); stat_.start_time_ = ObTimeUtility::current_time(); stat_.state_ = ObFreezeState::NOT_SET_FREEZE_FLAG; @@ -485,57 +512,128 @@ int ObFreezer::force_tablet_freeze(const ObTabletID &tablet_id) ObTabletFreezeGuard guard(*this); if (IS_NOT_INIT) { ret = OB_NOT_INIT; - TRANS_LOG(WARN, "[Freezer] not inited", K(ret), K_(ls_id), K(tablet_id)); + TRANS_LOG(WARN, "[Freezer] not inited", K(ret), K(ls_id), K(tablet_id)); } else if (OB_UNLIKELY(!enable_)) { - LOG_WARN("freezer is offline, can not freeze now", K(ret), K_(ls_id)); + LOG_WARN("freezer is offline, can not freeze now", K(ret), K(ls_id)); } else if (OB_FAIL(loop_set_freeze_flag())) { - TRANS_LOG(WARN, "[Freezer] failed to set freeze_flag", K(ret), K_(ls_id), K(tablet_id)); + TRANS_LOG(WARN, "[Freezer] failed to set freeze_flag", K(ret), K(ls_id), K(tablet_id)); } else if (FALSE_IT(stat_.state_ = ObFreezeState::NOT_SUBMIT_LOG)) { } else { + // succeed to set freeze flag if (OB_FAIL(get_ls_weak_read_scn(freeze_snapshot_version))) { - TRANS_LOG(WARN, "[Freezer] get ls weak read ts failure", K(ret), K_(ls_id)); + TRANS_LOG(WARN, "[Freezer] get ls weak read ts failure", K(ret), K(ls_id)); } else if (ObScnRange::MAX_SCN == freeze_snapshot_version || ObScnRange::MIN_SCN >= freeze_snapshot_version) { ret = OB_NOT_INIT; - LOG_WARN("[Freezer] weak read service not inited", K(ret), K_(ls_id)); + LOG_WARN("[Freezer] weak read service not inited", K(ret), K(ls_id)); } else if (FALSE_IT(freeze_snapshot_version_ = freeze_snapshot_version)) { } else if (FALSE_IT(set_need_resubmit_log(false))) { - } else if (OB_FAIL(ls_tablet_svr_->get_tablet(tablet_id, - handle, ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { - TRANS_LOG(WARN, "[Freezer] fail to get tablet for freeze", K(ret), K_(ls_id), K(tablet_id)); + } else if (OB_FAIL(get_ls_tablet_svr()->get_tablet(tablet_id, + handle, + ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { + TRANS_LOG(WARN, "[Freezer] fail to get tablet for freeze", K(ret), K(ls_id), K(tablet_id)); stat_.add_diagnose_info("fail to get tablet"); } else if (FALSE_IT(tablet = handle.get_obj())) { } else if (OB_FAIL(create_memtable_if_no_active_memtable(tablet))) { if (OB_NO_NEED_UPDATE == ret) { ret = OB_SUCCESS; } else { - LOG_WARN("[Freezer] fail to create an active memtable for force_tablet_freeze", K(ret), K_(ls_id), K(tablet_id)); + LOG_WARN("[Freezer] fail to create an active memtable for force_tablet_freeze", K(ret), K(ls_id), K(tablet_id)); stat_.add_diagnose_info("fail to create an active memtable for force_tablet_freeze"); } } else if (OB_ISNULL(memtable_mgr = static_cast(tablet->get_memtable_mgr()))) { - TRANS_LOG(WARN, "[Freezer] tablet_memtable_mgr is null", K(ret), K_(ls_id), K(tablet_id)); + TRANS_LOG(WARN, "[Freezer] tablet_memtable_mgr is null", K(ret), K(ls_id), K(tablet_id)); } else if (OB_FAIL(memtable_mgr->set_is_tablet_freeze_for_active_memtable(imemtable, true))) { - TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K_(ls_id), K(tablet_id)); + TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id)); stat_.add_diagnose_info("fail to set is_tablet_freeze"); - } else if (OB_FAIL(handle_memtable_for_tablet_freeze(imemtable))) { - TRANS_LOG(WARN, "[Freezer] fail to handle memtable", K(ret), K_(ls_id), K(tablet_id)); + } else if (FALSE_IT(submit_log_for_freeze())) { + } else if (OB_FAIL(submit_freeze_task(false/*is_ls_freeze*/, imemtable))) { + TRANS_LOG(WARN, "[Freezer] fail to submit freeze_task", K(ret), K(ls_id), K(tablet_id)); + stat_.add_diagnose_info("fail to submit freeze_task"); } else { - stat_.add_diagnose_info("force_tablet_freeze success"); + TRANS_LOG(INFO, "[Freezer] succeed to start force_tablet_freeze_task", K(ret), K(ls_id), K(tablet_id)); } + if (OB_FAIL(ret) || OB_ISNULL(imemtable)) { + stat_.state_ = ObFreezeState::FINISH; + stat_.end_time_ = ObTimeUtility::current_time(); + stat_.ret_code_ = ret; + print_freezer_statistics(); + unset_freeze_(); + } + } + + return ret; +} + +int ObFreezer::tablet_freeze_task(memtable::ObIMemtable *imemtable) +{ + int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); + memtable::ObMemtable *memtable = nullptr; + + if (OB_ISNULL(imemtable)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "memtable cannot be null", K(ret), K(ls_id)); + } else { + // succeed to set freeze_flag + if (FALSE_IT(memtable = static_cast(imemtable))) { + } else if (OB_FAIL(wait_memtable_ready_for_flush_with_ls_lock(memtable))) { + TRANS_LOG(WARN, "[Freezer] fail to wait memtable ready_for_flush", K(ret), K(ls_id)); + } else { + int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA; + int64_t write_lock = 0; + ObLSLockGuard lock_ls(ls_->lock_, read_lock, write_lock); + if (OB_FAIL(check_ls_state())) { + } else if (OB_FAIL(memtable->finish_freeze())) { + TRANS_LOG(ERROR, "[Freezer] memtable cannot be flushed", + K(ret), K(ls_id), KPC(memtable)); + stat_.add_diagnose_info("memtable cannot be flushed"); + } else { + stat_.add_diagnose_info("tablet_freeze success"); + FLOG_INFO("[Freezer] tablet_freeze_task success", K(ret), K(ls_id), KPC(memtable)); + } + } + stat_.state_ = ObFreezeState::FINISH; + stat_.end_time_ = ObTimeUtility::current_time(); + stat_.ret_code_ = ret; + print_freezer_statistics(); unset_freeze_(); } - if (OB_SUCC(ret)) { - FLOG_INFO("[Freezer] force_tablet_freeze success", K(ret), K_(ls_id), K(tablet_id)); - } else { - FLOG_INFO("[Freezer] force_tablet_freeze failed", K(ret), K_(ls_id), K(tablet_id)); - } + return ret; +} - stat_.state_ = ObFreezeState::FINISH; - stat_.end_time_ = ObTimeUtility::current_time(); - stat_.ret_code_ = ret; +int ObFreezer::wait_memtable_ready_for_flush_with_ls_lock(memtable::ObMemtable *memtable) +{ + share::ObLSID ls_id = get_ls_id(); + const int64_t start = ObTimeUtility::current_time(); + int ret = OB_SUCCESS; + bool ready_for_flush = false; - print_freezer_statistics(); + do { + int64_t read_lock = LSLOCKALL - LSLOCKLOGMETA; + int64_t write_lock = 0; + ObLSLockGuard lock_ls(ls_->lock_, read_lock, write_lock); + + if (OB_FAIL(check_ls_state())) { + } else if (!memtable->ready_for_flush()) { + const int64_t cost_time = ObTimeUtility::current_time() - start; + if (cost_time > 5 * 1000 * 1000) { + if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) { + if (need_resubmit_log()) { + submit_log_for_freeze(); + TRANS_LOG(INFO, "[Freezer] resubmit log for tablet_freeze", K(ls_id), K(cost_time)); + } + TRANS_LOG(WARN, "[Freezer] ready_for_flush costs too much time", + K(ls_id), K(cost_time), KPC(memtable)); + stat_.add_diagnose_info("ready_for_flush costs too much time"); + } + } + ob_usleep(100); + } else { + ready_for_flush = true; + } + } while (OB_SUCC(ret) && !ready_for_flush); return ret; } @@ -543,12 +641,13 @@ int ObFreezer::force_tablet_freeze(const ObTabletID &tablet_id) int ObFreezer::tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id, memtable::ObIMemtable *&imemtable) { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); ObTabletHandle handle; ObTablet *tablet = nullptr; ObTabletMemtableMgr *memtable_mgr = nullptr; imemtable = nullptr; SCN freeze_snapshot_version; - FLOG_INFO("[Freezer] tablet_freeze_for_replace_tablet_meta start", K(ret), K_(ls_id), K(tablet_id)); + FLOG_INFO("[Freezer] tablet_freeze_for_replace_tablet_meta start", K(ret), K(ls_id), K(tablet_id)); stat_.reset(); stat_.start_time_ = ObTimeUtility::current_time(); stat_.state_ = ObFreezeState::NOT_SET_FREEZE_FLAG; @@ -557,41 +656,41 @@ int ObFreezer::tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id ObTabletFreezeGuard guard(*this, true /* try guard */); if (IS_NOT_INIT) { ret = OB_NOT_INIT; - TRANS_LOG(WARN, "[Freezer] not inited", K(ret), K_(ls_id), K(tablet_id)); + TRANS_LOG(WARN, "[Freezer] not inited", K(ret), K(ls_id), K(tablet_id)); } else if (OB_FAIL(guard.try_set_tablet_freeze_begin())) { // no need freeze now, a ls freeze is running or will be running ret = OB_SUCCESS; - FLOG_INFO("[Freezer] ls freeze is running, no need freeze again", K(ret), K_(ls_id), K(tablet_id)); + FLOG_INFO("[Freezer] ls freeze is running, no need freeze again", K(ret), K(ls_id), K(tablet_id)); } else if (OB_FAIL(set_freeze_flag_without_inc_freeze_clock())) { ret = OB_SUCCESS; - FLOG_INFO("[Freezer] freeze is running", K(ret), K_(ls_id), K(tablet_id)); + FLOG_INFO("[Freezer] freeze is running", K(ret), K(ls_id), K(tablet_id)); } else if (FALSE_IT(stat_.state_ = ObFreezeState::NOT_SUBMIT_LOG)) { } else { // succeed to set freeze flag if (OB_FAIL(get_ls_weak_read_scn(freeze_snapshot_version))) { - TRANS_LOG(WARN, "[Freezer] get ls weak read ts failure", K(ret), K_(ls_id)); + TRANS_LOG(WARN, "[Freezer] get ls weak read ts failure", K(ret), K(ls_id)); } else if (freeze_snapshot_version.is_max() || !freeze_snapshot_version.is_valid_and_not_min()) { ret = OB_NOT_INIT; - LOG_WARN("[Freezer] weak read service not inited", K(ret), K_(ls_id)); + LOG_WARN("[Freezer] weak read service not inited", K(ret), K(ls_id)); } else if (FALSE_IT(freeze_snapshot_version_ = freeze_snapshot_version)) { } else if (FALSE_IT(set_need_resubmit_log(false))) { - } else if (OB_FAIL(ls_tablet_svr_->get_tablet(tablet_id, + } else if (OB_FAIL(get_ls_tablet_svr()->get_tablet(tablet_id, handle, ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { - TRANS_LOG(WARN, "[Freezer] fail to get tablet", K(ret), K_(ls_id), K(tablet_id)); + TRANS_LOG(WARN, "[Freezer] fail to get tablet", K(ret), K(ls_id), K(tablet_id)); stat_.add_diagnose_info("fail to get tablet"); } else if (FALSE_IT(tablet = handle.get_obj())) { } else if (OB_ISNULL(memtable_mgr = static_cast(tablet->get_memtable_mgr()))) { - TRANS_LOG(WARN, "[Freezer] tablet_memtable_mgr is null", K(ret), K_(ls_id), K(tablet_id)); + TRANS_LOG(WARN, "[Freezer] tablet_memtable_mgr is null", K(ret), K(ls_id), K(tablet_id)); } else if (OB_FAIL(memtable_mgr->set_is_tablet_freeze_for_active_memtable(imemtable))) { if (ret == OB_ENTRY_NOT_EXIST) { ret = OB_SUCCESS; TRANS_LOG(INFO, "[Freezer] no need to freeze since there is no active memtable", K(ret), - K_(ls_id), K(tablet_id)); + K(ls_id), K(tablet_id)); stat_.add_diagnose_info("no need to freeze since there is no active memtable"); } else { - TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K_(ls_id), K(tablet_id)); + TRANS_LOG(WARN, "[Freezer] fail to set is_tablet_freeze", K(ret), K(ls_id), K(tablet_id)); stat_.add_diagnose_info("fail to set is_tablet_freeze"); } } @@ -611,16 +710,17 @@ int ObFreezer::tablet_freeze_for_replace_tablet_meta(const ObTabletID &tablet_id int ObFreezer::handle_frozen_memtable_for_replace_tablet_meta(const ObTabletID &tablet_id, memtable::ObIMemtable *imemtable) { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); if (OB_ISNULL(imemtable)) { ret = OB_SUCCESS; - FLOG_INFO("[Freezer] no need to tablet_freeze_for_replace_tablet_meta", K(ret), K_(ls_id), K(tablet_id)); + FLOG_INFO("[Freezer] no need to tablet_freeze_for_replace_tablet_meta", K(ret), K(ls_id), K(tablet_id)); } else { if (OB_FAIL(handle_memtable_for_tablet_freeze(imemtable))) { - TRANS_LOG(WARN, "[Freezer] fail to handle memtable", K(ret), K_(ls_id), K(tablet_id)); + TRANS_LOG(WARN, "[Freezer] fail to handle memtable", K(ret), K(ls_id), K(tablet_id)); } else { stat_.add_diagnose_info("tablet_freeze_for_replace_tablet_meta success"); - FLOG_INFO("[Freezer] tablet_freeze_for_replace_tablet_meta success", K(ret), K_(ls_id), K(tablet_id)); + FLOG_INFO("[Freezer] tablet_freeze_for_replace_tablet_meta success", K(ret), K(ls_id), K(tablet_id)); } stat_.state_ = ObFreezeState::FINISH; stat_.end_time_ = ObTimeUtility::current_time(); @@ -636,6 +736,7 @@ int ObFreezer::handle_frozen_memtable_for_replace_tablet_meta(const ObTabletID & int ObFreezer::handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtable) { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); if (OB_ISNULL(imemtable)) { ret = OB_ERR_UNEXPECTED; @@ -645,11 +746,11 @@ int ObFreezer::handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtabl wait_memtable_ready_for_flush(memtable); if (OB_FAIL(memtable->finish_freeze())) { TRANS_LOG(ERROR, "[Freezer] memtable cannot be flushed", - K(ret), K_(ls_id), K(*memtable)); + K(ret), K(ls_id), K(*memtable)); stat_.add_diagnose_info("memtable cannot be flushed"); } else { TRANS_LOG(INFO, "[Freezer] memtable is ready to be flushed", - K(ret), K_(ls_id), K(*memtable)); + K(ret), K(ls_id), K(*memtable)); } } @@ -659,6 +760,7 @@ int ObFreezer::handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtabl int ObFreezer::submit_log_for_freeze() { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); const int64_t start = ObTimeUtility::current_time(); ObTabletID tablet_id(INT64_MAX); // used for diagnose @@ -666,14 +768,14 @@ int ObFreezer::submit_log_for_freeze() ret = OB_SUCCESS; transaction::ObTransID fail_tx_id; - if (OB_FAIL(ls_tx_svr_->traverse_trans_to_submit_redo_log(fail_tx_id))) { + if (OB_FAIL(get_ls_tx_svr()->traverse_trans_to_submit_redo_log(fail_tx_id))) { const int64_t cost_time = ObTimeUtility::current_time() - start; if (cost_time > 1000 * 1000) { if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) { TRANS_LOG(WARN, "[Freezer] failed to traverse trans ctx to submit redo log", K(ret), - K_(ls_id), K(cost_time), K(fail_tx_id)); + K(ls_id), K(cost_time), K(fail_tx_id)); ADD_SUSPECT_INFO(MINI_MERGE, - ls_id_, tablet_id, + ls_id, tablet_id, "traverse_trans_to_submit_redo_log failed", K(ret), K(fail_tx_id)); @@ -685,10 +787,10 @@ int ObFreezer::submit_log_for_freeze() ob_usleep(100 * 1000); } } while (OB_FAIL(ret)); - DEL_SUSPECT_INFO(MINI_MERGE, ls_id_, tablet_id); + DEL_SUSPECT_INFO(MINI_MERGE, ls_id, tablet_id); if (OB_SUCC(ret)) { - if (OB_FAIL(ls_tx_svr_->traverse_trans_to_submit_next_log())) { + if (OB_FAIL(get_ls_tx_svr()->traverse_trans_to_submit_next_log())) { TRANS_LOG(WARN, "traverse trans ctx to submit next log failed", K(ret)); } } @@ -698,8 +800,38 @@ int ObFreezer::submit_log_for_freeze() return ret; } +int ObFreezer::submit_freeze_task(bool is_ls_freeze, memtable::ObIMemtable *imemtable) +{ + int ret = OB_SUCCESS; + ObTenantFreezer *tenant_freezer = nullptr; + share::ObLSID ls_id = get_ls_id(); + const int64_t start = ObTimeUtility::current_time(); + + if (OB_ISNULL(tenant_freezer = MTL(storage::ObTenantFreezer*))) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "ObTenantFreezer is null", K(ret), K(ls_id)); + } else { + ObSpinLockGuard freeze_thread_pool(tenant_freezer->freeze_thread_pool_lock_); + do { + ret = is_ls_freeze ? tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this]() { ls_freeze_task(); }) + : tenant_freezer->freeze_thread_pool_.commit_task_ignore_ret([this, imemtable]() { tablet_freeze_task(imemtable); }); + if (OB_FAIL(ret)) { + const int64_t cost_time = ObTimeUtility::current_time() - start; + if (cost_time > 100 * 1000) { + if (TC_REACH_TIME_INTERVAL(100 * 1000)) { + TRANS_LOG(WARN, "[Freezer] failed to start freeze_task", K(ret), K(ls_id), K(is_ls_freeze)); + } + } + } + } while (OB_FAIL(ret)); + } + + return ret; +} + void ObFreezer::wait_memtable_ready_for_flush(memtable::ObMemtable *memtable) { + share::ObLSID ls_id = get_ls_id(); const int64_t start = ObTimeUtility::current_time(); int ret = OB_SUCCESS; @@ -709,10 +841,10 @@ void ObFreezer::wait_memtable_ready_for_flush(memtable::ObMemtable *memtable) if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) { if (need_resubmit_log()) { submit_log_for_freeze(); - TRANS_LOG(INFO, "[Freezer] resubmit log for tablet_freeze", K_(ls_id), K(cost_time)); + TRANS_LOG(INFO, "[Freezer] resubmit log for tablet_freeze", K(ls_id), K(cost_time)); } TRANS_LOG(WARN, "[Freezer] ready_for_flush costs too much time", - K_(ls_id), K(cost_time), KPC(memtable)); + K(ls_id), K(cost_time), KPC(memtable)); stat_.add_diagnose_info("ready_for_flush costs too much time"); memtable->print_ready_for_flush(); } @@ -724,6 +856,7 @@ void ObFreezer::wait_memtable_ready_for_flush(memtable::ObMemtable *memtable) int ObFreezer::create_memtable_if_no_active_memtable(ObTablet *tablet) { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); ObTabletMemtableMgr *memtable_mgr = nullptr; memtable::ObMemtable *last_frozen_memtable = nullptr; const common::ObTabletID &tablet_id = tablet->get_tablet_meta().tablet_id_; @@ -733,34 +866,34 @@ int ObFreezer::create_memtable_if_no_active_memtable(ObTablet *tablet) if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; - LOG_WARN("[Freezer] not inited", K(ret), K_(is_inited), K_(ls_id), K(tablet_id)); + LOG_WARN("[Freezer] not inited", K(ret), K_(is_inited), K(ls_id), K(tablet_id)); } else if (OB_FAIL(get_max_consequent_callbacked_scn(max_callbacked_scn))) { - LOG_WARN("[Freezer] fail to get max_consequent_callbacked_scn", K(ret), K_(ls_id), K(tablet_id)); + LOG_WARN("[Freezer] fail to get max_consequent_callbacked_scn", K(ret), K(ls_id), K(tablet_id)); } else if (max_callbacked_scn < clog_checkpoint_scn) { ret = OB_NO_NEED_UPDATE; LOG_WARN("[Freezer] cannot create memtable because max_callbacked_scn < clog_checkpoint_scn", K(ret), K(ls_id), K(tablet_id)); } else if (OB_ISNULL(memtable_mgr = static_cast(tablet->get_memtable_mgr()))) { - LOG_WARN("[Freezer] memtable mgr should not be null", K(ret), K_(ls_id), K(tablet_id)); + LOG_WARN("[Freezer] memtable mgr should not be null", K(ret), K(ls_id), K(tablet_id)); } else if (memtable_mgr->has_active_memtable()) { - LOG_INFO("[Freezer] no need to create an active memtable", K(ret), K_(ls_id), K(tablet_id)); + LOG_INFO("[Freezer] no need to create an active memtable", K(ret), K(ls_id), K(tablet_id)); } else { // create a new memtable since there is no active memtable // get schema_version if (OB_NOT_NULL(last_frozen_memtable = memtable_mgr->get_last_frozen_memtable())) { schema_version = last_frozen_memtable->get_max_schema_version(); } else if (OB_FAIL(tablet->get_schema_version_from_storage_schema(schema_version))) { - LOG_WARN("[Freezer] failed to get schema version", K(ret), K_(ls_id), K(tablet_id)); + LOG_WARN("[Freezer] failed to get schema version", K(ret), K(ls_id), K(tablet_id)); } else { //do nothing } // create new memtable if (OB_SUCC(ret)) { - if (OB_FAIL(ls_tablet_svr_->create_memtable(tablet_id, schema_version))) { + if (OB_FAIL(get_ls_tablet_svr()->create_memtable(tablet_id, schema_version))) { if (OB_MINOR_FREEZE_NOT_ALLOW != ret) { - LOG_WARN("[Freezer] failed to create memtable", K(ret), K_(ls_id), K(tablet_id), + LOG_WARN("[Freezer] failed to create memtable", K(ret), K(ls_id), K(tablet_id), K(schema_version)); } } else { - LOG_INFO("[Freezer] succeed to create new active memtable", K(ret), K_(ls_id), + LOG_INFO("[Freezer] succeed to create new active memtable", K(ret), K(ls_id), K(tablet_id), K(schema_version)); } } @@ -773,6 +906,7 @@ int ObFreezer::create_memtable_if_no_active_memtable(ObTablet *tablet) int ObFreezer::loop_set_freeze_flag() { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); const int64_t start = ObTimeUtility::current_time(); do { @@ -780,7 +914,7 @@ int ObFreezer::loop_set_freeze_flag() if (OB_FAIL(set_freeze_flag_without_inc_freeze_clock())) { const int64_t cost_time = ObTimeUtility::current_time() - start; if (cost_time > 3 * 1000 * 1000) { - TRANS_LOG(WARN, "[Freezer] wait the running freeze too long time", K_(ls_id), + TRANS_LOG(WARN, "[Freezer] wait the running freeze too long time", K(ls_id), K(cost_time)); break; } @@ -794,6 +928,7 @@ int ObFreezer::loop_set_freeze_flag() int ObFreezer::set_freeze_flag_without_inc_freeze_clock() { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); uint32_t old_v = 0; uint32_t new_v = 0; @@ -801,7 +936,7 @@ int ObFreezer::set_freeze_flag_without_inc_freeze_clock() old_v = ATOMIC_LOAD(&freeze_flag_); if (is_freeze(old_v)) { ret = OB_ENTRY_EXIST; - TRANS_LOG(WARN, "[Freezer] freeze is running!", K(ret), K_(ls_id)); + TRANS_LOG(WARN, "[Freezer] freeze is running!", K(ret), K(ls_id)); break; } new_v = old_v | (1 << 31); @@ -813,6 +948,7 @@ int ObFreezer::set_freeze_flag_without_inc_freeze_clock() int ObFreezer::set_freeze_flag() { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); uint32_t old_v = 0; uint32_t new_v = 0; @@ -823,7 +959,7 @@ int ObFreezer::set_freeze_flag() old_v = ATOMIC_LOAD(&freeze_flag_); if (is_freeze(old_v)) { ret = OB_ENTRY_EXIST; - TRANS_LOG(WARN, "[Freezer] freeze is running!", K(ret), K_(ls_id)); + TRANS_LOG(WARN, "[Freezer] freeze is running!", K(ret), K(ls_id)); break; } new_v = (old_v + 1) | (1 << 31); @@ -835,6 +971,7 @@ int ObFreezer::set_freeze_flag() int ObFreezer::inc_freeze_clock() { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); uint32_t old_v = 0; uint32_t new_v = 0; @@ -845,7 +982,7 @@ int ObFreezer::inc_freeze_clock() if (!is_freeze(old_v)) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "[Freezer] cannot inc freeze clock without setting freeze flag!", - K(ret), K_(ls_id)); + K(ret), K(ls_id)); break; } new_v = old_v + 1; @@ -911,41 +1048,44 @@ bool ObFreezer::is_freeze(uint32_t freeze_flag) const int ObFreezer::decide_max_decided_scn(SCN &max_decided_scn) { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; - LOG_WARN("[Freezer] not inited", K(ret), K_(ls_id)); - } else if (OB_FAIL(loghandler_->get_max_decided_scn(max_decided_scn))) { + LOG_WARN("[Freezer] not inited", K(ret), K(ls_id)); + } else if (OB_FAIL(get_ls_log_handler()->get_max_decided_scn(max_decided_scn))) { if (OB_STATE_NOT_MATCH == ret) { max_decided_scn.reset(); ret = OB_SUCCESS; } else { - TRANS_LOG(WARN, "[Freezer] fail to get max_decided_scn", K(ret), K_(ls_id), + TRANS_LOG(WARN, "[Freezer] fail to get max_decided_scn", K(ret), K(ls_id), K(max_decided_scn)); } } if (OB_SUCC(ret)) { - TRANS_LOG(TRACE, "[Freezer] decide max decided log ts", K(ret), K_(ls_id), K(max_decided_scn)); + TRANS_LOG(TRACE, "[Freezer] decide max decided log ts", K(ret), K(ls_id), K(max_decided_scn)); } + return ret; } int ObFreezer::get_max_consequent_callbacked_scn(SCN &max_consequent_callbacked_scn) { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; - LOG_WARN("[Freezer] not inited", K(ret), K_(ls_id)); - } else if (OB_FAIL(loghandler_->get_max_decided_scn(max_consequent_callbacked_scn))) { + LOG_WARN("[Freezer] not inited", K(ret), K(ls_id)); + } else if (OB_FAIL(get_ls_log_handler()->get_max_decided_scn(max_consequent_callbacked_scn))) { if (OB_STATE_NOT_MATCH == ret) { max_consequent_callbacked_scn.set_min(); ret = OB_SUCCESS; } else { - TRANS_LOG(WARN, "[Freezer] fail to get min_unreplay_scn", K(ret), K_(ls_id), K(max_consequent_callbacked_scn)); + TRANS_LOG(WARN, "[Freezer] fail to get min_unreplay_scn", K(ret), K(ls_id), K(max_consequent_callbacked_scn)); } } else { - TRANS_LOG(TRACE, "[Freezer] get_max_decided_scn", K(ret), K_(ls_id), K(max_consequent_callbacked_scn)); + TRANS_LOG(TRACE, "[Freezer] get_max_decided_scn", K(ret), K(ls_id), K(max_consequent_callbacked_scn)); } return ret; } @@ -953,14 +1093,15 @@ int ObFreezer::get_max_consequent_callbacked_scn(SCN &max_consequent_callbacked_ int ObFreezer::get_ls_weak_read_scn(SCN &weak_read_scn) { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); weak_read_scn.reset(); - if (OB_ISNULL(ls_wrs_handler_)) { - ret = OB_ENTRY_NOT_EXIST; - TRANS_LOG(WARN, "[Freezer] service should not be null", K(ret), K_(ls_id)); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + TRANS_LOG(WARN, "[Freezer] not inited", K(ret)); } else { - weak_read_scn = ls_wrs_handler_->get_ls_weak_read_ts(); - TRANS_LOG(TRACE, "[Freezer] get_ls_weak_read_scn", K(ret), K_(ls_id), K(weak_read_scn)); + weak_read_scn = get_ls_wrs_handler()->get_ls_weak_read_ts(); + TRANS_LOG(TRACE, "[Freezer] get_ls_weak_read_scn", K(ret), K(ls_id), K(weak_read_scn)); } return ret; @@ -970,18 +1111,19 @@ int ObFreezer::get_newest_clog_checkpoint_scn(const ObTabletID &tablet_id, SCN &clog_checkpoint_scn) { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); ObTabletHandle handle; clog_checkpoint_scn.reset(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; TRANS_LOG(WARN, "[Freezer] not inited", K(ret)); - } else if (OB_FAIL(ls_tablet_svr_->get_tablet(tablet_id, + } else if (OB_FAIL(get_ls_tablet_svr()->get_tablet(tablet_id, handle, ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { - TRANS_LOG(WARN, "[Freezer] fail to get tablet", K(ret), K_(ls_id), K(tablet_id)); + TRANS_LOG(WARN, "[Freezer] fail to get tablet", K(ret), K(ls_id), K(tablet_id)); } else { clog_checkpoint_scn = handle.get_obj()->get_tablet_meta().clog_checkpoint_scn_; - TRANS_LOG(TRACE, "[Freezer] get_newest_clog_checkpoint_scn", K(ret), K_(ls_id), K(tablet_id), + TRANS_LOG(TRACE, "[Freezer] get_newest_clog_checkpoint_scn", K(ret), K(ls_id), K(tablet_id), K(clog_checkpoint_scn)); } @@ -992,19 +1134,20 @@ int ObFreezer::get_newest_snapshot_version(const ObTabletID &tablet_id, SCN &snapshot_version) { int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); ObTabletHandle handle; snapshot_version.reset(); if (IS_NOT_INIT) { ret = OB_NOT_INIT; TRANS_LOG(WARN, "[Freezer] not inited", K(ret)); - } else if (OB_FAIL(ls_tablet_svr_->get_tablet(tablet_id, + } else if (OB_FAIL(get_ls_tablet_svr()->get_tablet(tablet_id, handle, ObTabletCommon::NO_CHECK_GET_TABLET_TIMEOUT_US))) { - TRANS_LOG(WARN, "[Freezer] fail to get tablet", K(ret), K_(ls_id), K(tablet_id)); + TRANS_LOG(WARN, "[Freezer] fail to get tablet", K(ret), K(ls_id), K(tablet_id)); } else if (OB_FAIL(snapshot_version.convert_for_tx(handle.get_obj()->get_snapshot_version()))) { - TRANS_LOG(WARN, "[Freezer] fail to convert from ts", K(ret), K_(ls_id), K(tablet_id)); + TRANS_LOG(WARN, "[Freezer] fail to convert from ts", K(ret), K(ls_id), K(tablet_id)); } else { - TRANS_LOG(TRACE, "[Freezer] get_snapshot_version", K(ret), K_(ls_id), K(tablet_id), K(snapshot_version)); + TRANS_LOG(TRACE, "[Freezer] get_snapshot_version", K(ret), K(ls_id), K(tablet_id), K(snapshot_version)); } return ret; @@ -1029,7 +1172,7 @@ void ObFreezer::print_freezer_statistics() { // print every 10s if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) { - TRANS_LOG(INFO, "[Freezer] empty table statistics: ", K_(ls_id), K(get_empty_memtable_cnt())); + TRANS_LOG(INFO, "[Freezer] empty table statistics: ", K(ls_id), K(get_empty_memtable_cnt())); clear_empty_memtable_cnt(); } } diff --git a/src/storage/ls/ob_freezer.h b/src/storage/ls/ob_freezer.h index fe10db9f3..8cc257ed3 100644 --- a/src/storage/ls/ob_freezer.h +++ b/src/storage/ls/ob_freezer.h @@ -22,6 +22,7 @@ #include "storage/checkpoint/ob_freeze_checkpoint.h" #include "logservice/ob_log_handler.h" #include "lib/container/ob_array_serialization.h" +#include "share/ob_occam_thread_pool.h" namespace oceanbase { @@ -183,27 +184,10 @@ public: public: ObFreezer(); - ObFreezer(ObLSWRSHandler *ls_loop_worker, - ObLSTxService *ls_tx_svr, - ObLSTabletService *ls_tablet_svr, - checkpoint::ObDataCheckpoint *data_checkpoint, - logservice::ObILogHandler *ob_loghandler, - const share::ObLSID &ls_id); + ObFreezer(ObLS *ls); ~ObFreezer(); - int init(ObLSWRSHandler *ls_loop_worker, - ObLSTxService *ls_tx_svr, - ObLSTabletService *ls_tablet_svr, - checkpoint::ObDataCheckpoint *data_checkpoint, - logservice::ObILogHandler *ob_loghandler, - const share::ObLSID &ls_id); - void set(ObLSWRSHandler *ls_loop_worker, - ObLSTxService *ls_tx_svr, - ObLSTabletService *ls_tablet_svr, - checkpoint::ObDataCheckpoint *data_checkpoint, - logservice::ObILogHandler *ob_loghandler, - const share::ObLSID &ls_id, - uint32_t freeze_flag = 0); + int init(ObLS *ls); void reset(); void offline() { enable_ = false; } void online() { enable_ = true; } @@ -222,10 +206,12 @@ public: uint32_t get_freeze_clock() { return ATOMIC_LOAD(&freeze_flag_) & (~(1 << 31)); } /* ls info */ - share::ObLSID &get_ls_id() { return ls_id_; } - checkpoint::ObDataCheckpoint *get_data_checkpoint() { return data_checkpoint_; } - ObLSTxService *get_ls_tx_svr() { return ls_tx_svr_; } - ObLSTabletService *get_ls_tablet_svr() { return ls_tablet_svr_; } + share::ObLSID get_ls_id(); + checkpoint::ObDataCheckpoint *get_ls_data_checkpoint(); + ObLSTxService *get_ls_tx_svr(); + ObLSTabletService *get_ls_tablet_svr(); + logservice::ObILogHandler *get_ls_log_handler(); + ObLSWRSHandler *get_ls_wrs_handler(); /* freeze_snapshot_version */ share::SCN get_freeze_snapshot_version() { return freeze_snapshot_version_; } @@ -285,15 +271,19 @@ private: /* inner subfunctions for freeze process */ int inner_logstream_freeze(); int submit_log_for_freeze(); + void ls_freeze_task(); + int tablet_freeze_task(memtable::ObIMemtable *imemtable); + int submit_freeze_task(bool is_ls_freeze, memtable::ObIMemtable *imemtable = nullptr); void wait_memtable_ready_for_flush(memtable::ObMemtable *memtable); + int wait_memtable_ready_for_flush_with_ls_lock(memtable::ObMemtable *memtable); int handle_memtable_for_tablet_freeze(memtable::ObIMemtable *imemtable); int create_memtable_if_no_active_memtable(ObTablet *tablet); - int try_set_tablet_freeze_begin_(); void set_tablet_freeze_begin_(); void set_tablet_freeze_end_(); void set_ls_freeze_begin_(); void set_ls_freeze_end_(); + int check_ls_state(); // must be used under the protection of ls_lock private: // flag whether the logsteram is freezing // the first bit: 1, freeze; 0, not freeze @@ -307,14 +297,8 @@ private: // log ts before which will be smaller than the log ts in the latter memtables share::SCN max_decided_scn_; - ObLSWRSHandler *ls_wrs_handler_; - ObLSTxService *ls_tx_svr_; - ObLSTabletService *ls_tablet_svr_; - checkpoint::ObDataCheckpoint *data_checkpoint_; - logservice::ObILogHandler *loghandler_; - share::ObLSID ls_id_; + ObLS *ls_; ObFreezerStat stat_; - int64_t empty_memtable_cnt_; // make sure ls freeze has higher priority than tablet freeze diff --git a/src/storage/ls/ob_ls.cpp b/src/storage/ls/ob_ls.cpp index dca1d7cde..95c5d0134 100644 --- a/src/storage/ls/ob_ls.cpp +++ b/src/storage/ls/ob_ls.cpp @@ -59,7 +59,7 @@ const uint64_t ObLS::INNER_TABLET_ID_LIST[TOTAL_INNER_TABLET_NUM] = { ObLS::ObLS() : ls_tx_svr_(this), replay_handler_(this), - ls_freezer_(&ls_wrs_handler_, &ls_tx_svr_, &ls_tablet_svr_, &data_checkpoint_, &log_handler_, ls_meta_.ls_id_), + ls_freezer_(this), ls_sync_tablet_seq_handler_(), ls_ddl_log_handler_(), is_inited_(false), @@ -113,7 +113,7 @@ int ObLS::init(const share::ObLSID &ls_id, LOG_WARN("failed to init ls meta", K(ret), K(tenant_id), K(ls_id), K(replica_type)); } else { rs_reporter_ = reporter; - ls_freezer_.init(&ls_wrs_handler_, &ls_tx_svr_, &ls_tablet_svr_, &data_checkpoint_, &log_handler_, ls_meta_.ls_id_); + ls_freezer_.init(this); transaction::ObTxPalfParam tx_palf_param(get_log_handler()); // tx_table_.init() should after ls_table_svr.init() diff --git a/src/storage/ls/ob_ls.h b/src/storage/ls/ob_ls.h index 8a05ff488..3fb1f424c 100644 --- a/src/storage/ls/ob_ls.h +++ b/src/storage/ls/ob_ls.h @@ -118,6 +118,8 @@ class ObLS : public common::ObLink { public: friend ObLSLockGuard; + friend class ObFreezer; + friend class checkpoint::ObDataCheckpoint; public: static constexpr int64_t TOTAL_INNER_TABLET_NUM = 3; static const uint64_t INNER_TABLET_ID_LIST[TOTAL_INNER_TABLET_NUM]; diff --git a/src/storage/tablet/ob_tablet_memtable_mgr.cpp b/src/storage/tablet/ob_tablet_memtable_mgr.cpp index 4bf74492a..c876ba7ef 100644 --- a/src/storage/tablet/ob_tablet_memtable_mgr.cpp +++ b/src/storage/tablet/ob_tablet_memtable_mgr.cpp @@ -241,7 +241,7 @@ int ObTabletMemtableMgr::create_memtable(const SCN clog_checkpoint_scn, if (OB_FAIL(add_memtable_(memtable_handle))) { LOG_WARN("failed to add memtable", K(ret), K(ls_id), K(tablet_id_), K(memtable_handle)); } else if (FALSE_IT(time_guard.click("add memtable"))) { - } else if (OB_FAIL(memtable->add_to_data_checkpoint(freezer_->get_data_checkpoint()))) { + } else if (OB_FAIL(memtable->add_to_data_checkpoint(freezer_->get_ls_data_checkpoint()))) { LOG_WARN("add to data_checkpoint failed", K(ret), K(ls_id), KPC(memtable)); clean_tail_memtable_(); } else if (FALSE_IT(time_guard.click("add to data_checkpoint"))) { diff --git a/src/storage/tx_storage/ob_tenant_freezer.cpp b/src/storage/tx_storage/ob_tenant_freezer.cpp index d248b2f20..7f57a5e79 100644 --- a/src/storage/tx_storage/ob_tenant_freezer.cpp +++ b/src/storage/tx_storage/ob_tenant_freezer.cpp @@ -41,6 +41,8 @@ ObTenantFreezer::ObTenantFreezer() rs_mgr_(nullptr), config_(nullptr), allocator_mgr_(nullptr), + freeze_thread_pool_(), + freeze_thread_pool_lock_(common::ObLatchIds::FREEZE_THREAD_POOL_LOCK), exist_ls_freezing_(false), last_update_ts_(0) {} @@ -87,6 +89,8 @@ int ObTenantFreezer::init() K(GCONF.self_addr_)); } else if (OB_FAIL(freeze_trigger_pool_.init_and_start(FREEZE_TRIGGER_THREAD_NUM))) { LOG_WARN("[TenantFreezer] fail to initialize freeze trigger pool", KR(ret)); + } else if (OB_FAIL(freeze_thread_pool_.init_and_start(FREEZE_THREAD_NUM))) { + LOG_WARN("[TenantFreezer] fail to initialize freeze thread pool", KR(ret)); } else if (OB_FAIL(freeze_trigger_timer_.init_and_start(freeze_trigger_pool_, TIME_WHEEL_PRECISION, "FrzTrigger"))) { diff --git a/src/storage/tx_storage/ob_tenant_freezer.h b/src/storage/tx_storage/ob_tenant_freezer.h index f4a919428..5ea30a7d8 100644 --- a/src/storage/tx_storage/ob_tenant_freezer.h +++ b/src/storage/tx_storage/ob_tenant_freezer.h @@ -37,9 +37,11 @@ class ObTenantTxDataFreezeGuard; class ObTenantFreezer { friend ObTenantTxDataFreezeGuard; +friend class ObFreezer; const static int64_t TIME_WHEEL_PRECISION = 100_ms; const static int64_t SLOW_FREEZE_INTERVAL = 30_s; const static int FREEZE_TRIGGER_THREAD_NUM= 1; + const static int FREEZE_THREAD_NUM= 5; const static int64_t FREEZE_TRIGGER_INTERVAL = 2_s; const static int64_t UPDATE_INTERVAL = 100_ms; // replay use 1G/s @@ -172,6 +174,8 @@ private: common::ObOccamThreadPool freeze_trigger_pool_; common::ObOccamTimer freeze_trigger_timer_; common::ObOccamTimerTaskRAIIHandle timer_handle_; + common::ObOccamThreadPool freeze_thread_pool_; + ObSpinLock freeze_thread_pool_lock_; bool exist_ls_freezing_; int64_t last_update_ts_; }; diff --git a/unittest/storage/memtable/test_memtable_basic.cpp b/unittest/storage/memtable/test_memtable_basic.cpp index 62d3b87a4..0eddf065a 100644 --- a/unittest/storage/memtable/test_memtable_basic.cpp +++ b/unittest/storage/memtable/test_memtable_basic.cpp @@ -29,6 +29,7 @@ #include "storage/tx/ob_trans_define_v4.h" #include "storage/memtable/mvcc/ob_mvcc_row.h" #include "share/scn.h" +#include "storage/ls/ob_ls.h" namespace oceanbase { @@ -98,7 +99,7 @@ int ObMvccRow::check_double_insert_(const share::SCN , class TestMemtable : public testing::Test { public: - TestMemtable() : tenant_base_(1001),tablet_id_(1000),rowkey_cnt_(1) {} + TestMemtable() : tenant_base_(1001),tablet_id_(1000),rowkey_cnt_(1) { freezer_.init(&ls_); } void SetUp() override { share::ObTenantEnv::set_tenant(&tenant_base_); // mock columns @@ -177,6 +178,7 @@ public: read_info_.reset(); } public: + ObLS ls_; share::ObTenantBase tenant_base_; storage::ObFreezer freezer_; storage::ObTabletMemtableMgr memtable_mgr_; diff --git a/unittest/storage/test_compaction_policy.cpp b/unittest/storage/test_compaction_policy.cpp index e3f7e28b6..fb721bb4e 100644 --- a/unittest/storage/test_compaction_policy.cpp +++ b/unittest/storage/test_compaction_policy.cpp @@ -302,7 +302,7 @@ int TestCompactionPolicy::mock_memtable( LOG_WARN("failed to init memtable", K(ret)); } else if (OB_FAIL(mt_mgr->add_memtable_(table_handle))) { LOG_WARN("failed to add memtable to mgr", K(ret)); - } else if (OB_FAIL(memtable->add_to_data_checkpoint(mt_mgr->freezer_->get_data_checkpoint()))) { + } else if (OB_FAIL(memtable->add_to_data_checkpoint(mt_mgr->freezer_->get_ls_data_checkpoint()))) { LOG_WARN("add to data_checkpoint failed", K(ret), KPC(memtable)); mt_mgr->clean_tail_memtable_(); } else if (OB_MAX_SCN_TS_NS != end_border) { // frozen memtable diff --git a/unittest/storage/tx_table/test_tx_ctx_table.cpp b/unittest/storage/tx_table/test_tx_ctx_table.cpp index fd3c373e7..bb0742951 100644 --- a/unittest/storage/tx_table/test_tx_ctx_table.cpp +++ b/unittest/storage/tx_table/test_tx_ctx_table.cpp @@ -65,12 +65,7 @@ public: tablet_id_(LS_TX_DATA_TABLET), ls_id_(1), tenant_id_(1001), - freezer_((ObLSWRSHandler *)(0x1), - (ObLSTxService *)(0x1), - (ObLSTabletService *)(0x1), - (checkpoint::ObDataCheckpoint *)(0x1), - (logservice::ObILogHandler *)(0x1), - ls_id_), + freezer_(&ls_), t3m_(common::OB_SERVER_TENANT_ID), mt_mgr_(nullptr), ctx_mt_mgr_(nullptr), @@ -95,7 +90,7 @@ protected: virtual void SetUp() override { ObTxPalfParam palf_param((logservice::ObLogHandler *)(0x01)); - freezer_.init(&ls_loop_worker_, &ls_tx_service_, &ls_tablet_service_, &ls_data_checkpoint_, &log_handler_, ls_id_); + freezer_.init(&ls_); EXPECT_EQ(OB_SUCCESS, t3m_.init()); EXPECT_EQ(OB_SUCCESS, ls_tx_ctx_mgr_.init(tenant_id_, /*tenant_id*/