diff --git a/mittest/simple_server/freeze/test_ob_minor_freeze.cpp b/mittest/simple_server/freeze/test_ob_minor_freeze.cpp index 5b3480687..9e7eac6b5 100644 --- a/mittest/simple_server/freeze/test_ob_minor_freeze.cpp +++ b/mittest/simple_server/freeze/test_ob_minor_freeze.cpp @@ -77,9 +77,6 @@ namespace storage bool ObFreezer::is_freeze(uint32_t freeze_flag) const { ob_usleep(rand() % SLEEP_TIME); - if (freeze_flag == UINT32_MAX) { - freeze_flag = (ATOMIC_LOAD(&freeze_flag_)); - } return 1 == (freeze_flag >> 31); } namespace checkpoint diff --git a/src/share/allocator/ob_shared_memory_allocator_mgr.h b/src/share/allocator/ob_shared_memory_allocator_mgr.h index 554ad3b76..9cf88bc22 100644 --- a/src/share/allocator/ob_shared_memory_allocator_mgr.h +++ b/src/share/allocator/ob_shared_memory_allocator_mgr.h @@ -112,7 +112,7 @@ public: while (throttle_tool.still_throttling(share_ti_guard, module_ti_guard) && (left_interval > 0)) { int64_t expected_wait_time = 0; - if (for_replay && MTL(ObTenantFreezer *)->exist_ls_freezing()) { + if (for_replay && MTL(ObTenantFreezer *)->exist_ls_throttle_is_skipping()) { // skip throttle if ls freeze exists break; } else if ((expected_wait_time = diff --git a/src/share/throttle/ob_share_resource_throttle_tool.h b/src/share/throttle/ob_share_resource_throttle_tool.h index 6d704bc81..259a680cb 100644 --- a/src/share/throttle/ob_share_resource_throttle_tool.h +++ b/src/share/throttle/ob_share_resource_throttle_tool.h @@ -83,6 +83,9 @@ public: template void alloc_resource(const int64_t resource_size, const int64_t abs_expire_time, bool &is_throttled); + template + bool has_triggered_throttle(); + template bool is_throttling(ObThrottleInfoGuard &share_ti_guard, ObThrottleInfoGuard &module_ti_guard); diff --git a/src/share/throttle/ob_share_resource_throttle_tool.ipp b/src/share/throttle/ob_share_resource_throttle_tool.ipp index 4852d45c3..b24eca934 100644 --- a/src/share/throttle/ob_share_resource_throttle_tool.ipp +++ b/src/share/throttle/ob_share_resource_throttle_tool.ipp @@ -122,6 +122,24 @@ void ObShareResourceThrottleTool::alloc_resource(const i } } +template +template +bool ObShareResourceThrottleTool::has_triggered_throttle() +{ + ACQUIRE_THROTTLE_UNIT(FakeAllocator, share_throttle_unit); + ACQUIRE_UNIT_ALLOCATOR(ALLOCATOR, module_throttle_unit, allocator); + + int64_t module_hold = allocator->hold(); + SumModuleHoldResourceFunctor sum_hold_func; + (void)module_throttle_tuple_.for_each(sum_hold_func); + + bool share_throttled = share_throttle_unit.has_triggered_throttle(sum_hold_func.sum_); + bool module_throttled = module_throttle_unit.has_triggered_throttle(allocator->hold()); + + bool has_triggered_throttle = (share_throttled | module_throttled); + return has_triggered_throttle; +} + template template bool ObShareResourceThrottleTool::is_throttling(ObThrottleInfoGuard &share_ti_guard, diff --git a/src/share/throttle/ob_throttle_unit.h b/src/share/throttle/ob_throttle_unit.h index 042192dd3..d3befe8d4 100644 --- a/src/share/throttle/ob_throttle_unit.h +++ b/src/share/throttle/ob_throttle_unit.h @@ -97,6 +97,13 @@ public: const int64_t abs_expire_time, bool &is_throttled); + /** + * @brief Check if this throttle unit has triggerd throttle but do not alloc any resource + * ATTENTION : This function is different from is_throttling(). is_throttling() only checks if current + * thread is throttling, but this function checks if this tenant is throttling + */ + bool has_triggered_throttle(const int64_t holding_resource); + /** * @brief Check if this throttle unit is throttling status. * diff --git a/src/share/throttle/ob_throttle_unit.ipp b/src/share/throttle/ob_throttle_unit.ipp index 1fd824c6b..2cc40d038 100644 --- a/src/share/throttle/ob_throttle_unit.ipp +++ b/src/share/throttle/ob_throttle_unit.ipp @@ -101,6 +101,27 @@ int ObThrottleUnit::alloc_resource(const int64_t holding_size, return ret; } +template +bool ObThrottleUnit::has_triggered_throttle(const int64_t holding_size) +{ + int ret = OB_SUCCESS; + bool triggered_throttle = false; + int64_t trigger_percentage = throttle_trigger_percentage_; + + if (OB_LIKELY(trigger_percentage < 100)) { + int64_t throttle_trigger = resource_limit_ * trigger_percentage / 100; + if (OB_UNLIKELY(holding_size < 0 || trigger_percentage <= 0)) { + triggered_throttle = true; + SHARE_LOG(ERROR, "invalid arguments", K(holding_size), K(resource_limit_), K(trigger_percentage)); + } else if (holding_size > throttle_trigger) { + triggered_throttle = true; + } else { + triggered_throttle = false; + } + } + return triggered_throttle; +} + template bool ObThrottleUnit::is_throttling(ObThrottleInfoGuard &ti_guard) { diff --git a/src/storage/ls/ob_freezer.cpp b/src/storage/ls/ob_freezer.cpp index 125a374ae..ded2ff852 100644 --- a/src/storage/ls/ob_freezer.cpp +++ b/src/storage/ls/ob_freezer.cpp @@ -14,6 +14,7 @@ #include "common/ob_tablet_id.h" #include "logservice/ob_log_service.h" #include "share/ob_force_print_log.h" +#include "share/allocator/ob_shared_memory_allocator_mgr.h" #include "storage/ls/ob_ls.h" #include "storage/ls/ob_freezer.h" #include "storage/ls/ob_ls_tx_service.h" @@ -389,6 +390,8 @@ ObFreezer::ObFreezer() enable_(false), is_inited_(false), is_async_tablet_freeze_task_running_(false), + throttle_is_skipping_(false), + tenant_replay_is_pending_(false), async_freeze_tablets_() {} ObFreezer::ObFreezer(ObLS *ls) @@ -406,6 +409,8 @@ ObFreezer::ObFreezer(ObLS *ls) enable_(false), is_inited_(false), is_async_tablet_freeze_task_running_(false), + throttle_is_skipping_(false), + tenant_replay_is_pending_(false), async_freeze_tablets_() {} ObFreezer::~ObFreezer() @@ -429,6 +434,8 @@ void ObFreezer::reset() async_freeze_tablets_.reuse(); is_inited_ = false; is_async_tablet_freeze_task_running_ = false; + throttle_is_skipping_ = false; + tenant_replay_is_pending_ = false; } int ObFreezer::init(ObLS *ls) @@ -452,6 +459,8 @@ int ObFreezer::init(ObLS *ls) pend_replay_cnt_ = 0; need_resubmit_log_ = false; is_async_tablet_freeze_task_running_ = false; + throttle_is_skipping_ = false; + tenant_replay_is_pending_ = false; is_inited_ = true; } @@ -569,50 +578,82 @@ void ObFreezer::submit_checkpoint_task() int ObFreezer::wait_ls_freeze_finish() { int ret = OB_SUCCESS; - share::ObLSID ls_id = get_ls_id(); - const int64_t start = ObClockGenerator::getClock(); - int64_t last_submit_log_time = start; + const share::ObLSID ls_id = get_ls_id(); + const int64_t start_time = ObClockGenerator::getClock(); uint32_t freeze_clock = get_freeze_clock(); - PendTenantReplayGuard pend_replay_guard; - TRANS_LOG(INFO, "[Freezer] freeze_clock", K(ls_id), K(freeze_clock)); + { + PendTenantReplayHelper pend_replay_helper(*this, ls_); + (void)pend_replay_helper.set_skip_throttle_flag(); - // wait till all memtables are moved from frozen_list to prepare_list - // this means that all memtables can be dumped - while (!get_ls_data_checkpoint()->ls_freeze_finished()) { - if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) { - if (need_resubmit_log() || - // In order to prevent the txn has already passed the try_submit test - // while failing to submit some logs due to an unexpected bug, we need - // retry to submit the log to go around the above case - (ObTimeUtility::current_time() - last_submit_log_time >= 1_min)) { - last_submit_log_time = ObTimeUtility::current_time(); + TRANS_LOG(INFO, + "[Freezer] wait freeze : Logstream ", + K(ls_id), + K(freeze_clock), + K(throttle_is_skipping_), + K(tenant_replay_is_pending_)); - (void)submit_log_for_freeze(false/*tablet freeze*/, false/*try*/); - TRANS_LOG(INFO, "[Freezer] resubmit log for ls_freeze", K(ls_id)); - } - - const int64_t cost_time = ObClockGenerator::getClock() - start; - - if (cost_time > 5 * 1000 * 1000) { - TRANS_LOG_RET(WARN, OB_ERR_TOO_MUCH_TIME, "[Freezer] finish ls_freeze costs too much time", - K(ls_id), K(cost_time)); - stat_.add_diagnose_info("finish ls_freeze costs too much time"); + // wait till all memtables are moved from frozen_list to prepare_list + // this means that all memtables can be dumped + int64_t time_counter = 0; + while (!get_ls_data_checkpoint()->ls_freeze_finished()) { + if (TC_REACH_TIME_INTERVAL(1LL * 1000LL * 1000LL /* 1 second */)) { + ++time_counter; + + // check pend condition every second + (void)pend_replay_helper.check_pend_condition_once(); + + // check resubmit log condition and report some debug info every 5 seconds + if (time_counter >= 5 && time_counter % 5 == 0) { + (void)resubmit_log_if_needed_(start_time, false /* is_tablet_freeze */, false /* is_try */); + } } + ob_usleep(100); } - ob_usleep(100); + stat_.add_diagnose_info("logstream_freeze success"); + stat_.end_set_freeze_stat(ObFreezeState::FINISH, ObClockGenerator::getClock(), ret); + unset_freeze_(); + const int64_t wait_freeze_finish_spend_time = ObClockGenerator::getClock() - start_time; + FLOG_INFO("[Freezer] logstream_freeze success", K(ls_id), K(freeze_clock), K(wait_freeze_finish_spend_time)); } - stat_.add_diagnose_info("logstream_freeze success"); - stat_.end_set_freeze_stat(ObFreezeState::FINISH, ObClockGenerator::getClock(), ret); - unset_freeze_(); - const int64_t wait_freeze_finish_spend_time = ObClockGenerator::getClock() - start; - FLOG_INFO("[Freezer] logstream_freeze success", K(ls_id), K(freeze_clock), K(wait_freeze_finish_spend_time)); + // freeze tx data out of PendTenantReplayGuard (void)try_freeze_tx_data_(); - return ret; } +void ObFreezer::resubmit_log_if_needed_(const int64_t start_time, + const bool is_tablet_freeze, + const bool is_try, + const ObITabletMemtable *memtable) +{ + const share::ObLSID ls_id = get_ls_id(); + int64_t last_submit_log_time = start_time; + if (need_resubmit_log() || + // In order to prevent the txn has already passed the try_submit test + // while failing to submit some logs due to an unexpected bug, we need + // retry to submit the log to go around the above case + (ObClockGenerator::getClock() - last_submit_log_time >= 1_min)) { + last_submit_log_time = ObClockGenerator::getClock(); + + (void)submit_log_for_freeze(is_tablet_freeze, is_try); + } + + const int64_t cost_time = ObClockGenerator::getClock() - start_time; + + TRANS_LOG_RET(WARN, + OB_ERR_TOO_MUCH_TIME, + "[Freezer] wait freeze finish costs too much time", + K(ls_id), + K(is_tablet_freeze), + K(is_try), + K(cost_time), + K(throttle_is_skipping_), + K(tenant_replay_is_pending_), + KPC(memtable)); + stat_.add_diagnose_info("wait freeze finish costs too much time"); +} + // Define a functor to avoid using lambda struct AsyncFreezeFunctor { const int64_t trace_id_; @@ -1178,7 +1219,7 @@ int ObFreezer::wait_data_memtable_freeze_finish_(ObITabletMemtable *tablet_memta int ret = OB_SUCCESS; share::ObLSID ls_id = get_ls_id(); ObMemtable *memtable = static_cast(tablet_memtable); - if (OB_FAIL(wait_memtable_ready_for_flush_with_ls_lock(memtable))) { + if (OB_FAIL(wait_memtable_ready_for_flush_(memtable))) { TRANS_LOG(WARN, "[Freezer] fail to wait memtable ready_for_flush", K(ret), K(ls_id)); } else { int64_t read_lock = LSLOCKALL; @@ -1219,64 +1260,47 @@ int ObFreezer::wait_direct_load_memtable_freeze_finish_(ObITabletMemtable *table return ret; } -int ObFreezer::wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tablet_memtable) +int ObFreezer::wait_memtable_ready_for_flush_(ObITabletMemtable *tablet_memtable) { - share::ObLSID ls_id = get_ls_id(); - const int64_t start = ObClockGenerator::getClock(); - int64_t last_submit_log_time = start; int ret = OB_SUCCESS; + share::ObLSID ls_id = get_ls_id(); + const int64_t start_time = ObClockGenerator::getClock(); bool ready_for_flush = false; bool is_force_released = false; - do { - if (OB_FAIL(try_wait_memtable_ready_for_flush_with_ls_lock(tablet_memtable, - ready_for_flush, - is_force_released, - start, - last_submit_log_time))) { - TRANS_LOG(WARN, "[Freezer] memtable is not ready_for_flush", K(ret)); - } - } while (OB_SUCC(ret) && !ready_for_flush && !is_force_released); + { + PendTenantReplayHelper pend_replay_helper(*this, ls_); + (void)pend_replay_helper.set_skip_throttle_flag(); - return ret; -} + TRANS_LOG(INFO, + "[Freezer] wait freeze : Tablet", + K(ls_id), + KP(tablet_memtable), + K(throttle_is_skipping_), + K(tenant_replay_is_pending_)); -int ObFreezer::try_wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tablet_memtable, - bool &ready_for_flush, - bool &is_force_released, - const int64_t start, - int64_t &last_submit_log_time) -{ - int ret = OB_SUCCESS; - int64_t read_lock = LSLOCKALL; - int64_t write_lock = 0; + int64_t time_counter = 0; + do { + if (OB_ISNULL(tablet_memtable)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "[Freezer] memtable cannot be null", K(ret)); + } else if (FALSE_IT(ready_for_flush = tablet_memtable->ready_for_flush())) { + } else if (FALSE_IT(is_force_released = tablet_memtable->is_force_released())) { + } else if (!ready_for_flush && !is_force_released) { + if (TC_REACH_TIME_INTERVAL(1LL * 1000LL * 1000LL /* 1 second */)) { + ++time_counter; - if (OB_ISNULL(tablet_memtable)) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(WARN, "[Freezer] memtable cannot be null", K(ret)); - } else if (FALSE_IT(ready_for_flush = tablet_memtable->ready_for_flush())) { - } else if (FALSE_IT(is_force_released = tablet_memtable->is_force_released())) { - } else if (!ready_for_flush && !is_force_released) { - if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) { - if (need_resubmit_log() || - // In order to prevent the txn has already passed the try_submit test - // while failing to submit some logs due to an unexpected bug, we need - // retry to submit the log to go around the above case - (ObTimeUtility::current_time() - last_submit_log_time >= 1_min)) { - last_submit_log_time = ObTimeUtility::current_time(); - (void)submit_log_for_freeze(true/*tablet freeze*/, false/*try*/); - TRANS_LOG(INFO, "[Freezer] resubmit log", K(ret)); + // check pend condition every second + (void)pend_replay_helper.check_pend_condition_once(); + + // check resubmit log condition and report some debug info every 5 seconds + if (time_counter >= 5 && time_counter % 5 == 0) { + (void)resubmit_log_if_needed_(start_time, true /* is_tablet_freeze */, false /* is_try */, tablet_memtable); + } + } + ob_usleep(100); } - const int64_t cost_time = ObClockGenerator::getClock() - start; - - if (cost_time > 5 * 1000 * 1000) { - TRANS_LOG(WARN, "[Freezer] ready_for_flush costs too much time", - K(cost_time), KPC(tablet_memtable)); - stat_.add_diagnose_info("ready_for_flush costs too much time"); - } - } - - ob_usleep(100); + } while (OB_SUCC(ret) && !ready_for_flush && !is_force_released); } return ret; @@ -1455,9 +1479,6 @@ void ObFreezer::unset_freeze_() /* public function about freeze_flag */ bool ObFreezer::is_freeze(uint32_t freeze_flag) const { - if (freeze_flag == UINT32_MAX) { - freeze_flag = (ATOMIC_LOAD(&freeze_flag_)); - } return 1 == (freeze_flag >> 31); } @@ -1626,11 +1647,6 @@ void ObFreezer::set_tablet_freeze_begin_() } } -void ObFreezer::set_tablet_freeze_end_() -{ - ATOMIC_DEC(&low_priority_freeze_cnt_); -} - void ObFreezer::set_ls_freeze_begin_() { const int64_t SLEEP_INTERVAL = 100 * 1000; // 100 ms @@ -1646,11 +1662,6 @@ void ObFreezer::set_ls_freeze_begin_() } } -void ObFreezer::set_ls_freeze_end_() -{ - ATOMIC_DEC(&high_priority_freeze_cnt_); -} - int ObFreezer::pend_ls_replay() { int ret = OB_SUCCESS; @@ -1823,45 +1834,136 @@ int ObFreezer::ObTabletFreezeGuard::try_set_tablet_freeze_begin() return ret; } -ObFreezer::PendTenantReplayGuard::PendTenantReplayGuard() + +void ObFreezer::PendTenantReplayHelper::set_skip_throttle_flag() { int ret = OB_SUCCESS; - common::ObSharedGuard iter; - ObLSService *ls_srv = MTL(ObLSService *); - if (OB_FAIL(ls_srv->get_ls_iter(iter, ObLSGetMod::STORAGE_MOD))) { - STORAGE_LOG(WARN, "[ObFreezer] fail to get ls iterator", KR(ret)); + if (OB_ISNULL(current_freeze_ls_)) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(ERROR, "invalid ls pointer", KP(current_freeze_ls_)); + } else if (current_ls_is_leader_()){ + // leader do not need skip throttle } else { - ObLS *ls = nullptr; - ls_handle_array_.reuse(); - while (OB_SUCC(iter->get_next(ls))) { - int tmp_ret = OB_SUCCESS; - ObLSHandle ls_handle; - if (OB_TMP_FAIL(ls_srv->get_ls(ls->get_ls_id(), ls_handle, ObLSGetMod::STORAGE_MOD))) { - STORAGE_LOG(WARN, "[ObFreezer] get ls handle failed", KR(tmp_ret), KP(ls)); - } else if (OB_TMP_FAIL(ls_handle_array_.push_back(ls_handle))) { - STORAGE_LOG(WARN, "[ObFreezer] push back ls handle failed", KR(tmp_ret), KP(ls)); - } else if (OB_TMP_FAIL(ls->get_freezer()->pend_ls_replay())) { - STORAGE_LOG(WARN, "[ObFreezer] pend replay failed", KR(tmp_ret), KPC(ls)); - (void)ls_handle_array_.pop_back(); + // pend replay before set_throttle_is_skipping to avoid skipping too much replay + if (remain_memory_is_exhausting_()) { + (void)pend_tenant_replay_(); + } + host_.set_throttle_is_skipping(); + } +} + +bool ObFreezer::PendTenantReplayHelper::current_ls_is_leader_() +{ + int ret = OB_SUCCESS; + // set default value as leader because leader do not skip throttle + bool is_leader = true; + ObRole role; + int64_t proposal_id = 0; + if (OB_FAIL(current_freeze_ls_->get_log_handler()->get_role(role, proposal_id))) { + LOG_WARN("get ls role failed", KR(ret), K(current_freeze_ls_->get_ls_id())); + } else if (common::is_strong_leader(role)) { + is_leader = true; + } else { + is_leader = false; + } + return is_leader; +} + +void ObFreezer::PendTenantReplayHelper::check_pend_condition_once() +{ + // only check pend condition when throttle is skipping + if (host_.throttle_is_skipping()) { + if (host_.tenant_replay_is_pending()) { + if (!remain_memory_is_exhausting_()) { + (void)restore_tenant_replay_(); + } else { + // keep pending replay + } + } else { + // tenant replay is not pending + if (remain_memory_is_exhausting_()) { + (void)pend_tenant_replay_(); + } else { + // keep not pending replay } } } } -ObFreezer::PendTenantReplayGuard::~PendTenantReplayGuard() +bool ObFreezer::PendTenantReplayHelper::remain_memory_is_exhausting_() { + TxShareThrottleTool &throttle_tool = (MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool()); + const bool has_triggered_throttle = throttle_tool.has_triggered_throttle(); + const bool remain_memory_is_exhausting = + has_triggered_throttle || MTL(ObTenantFreezer *)->memstore_remain_memory_is_exhausting(); + STORAGE_LOG(INFO, "finish check remain memory", K(has_triggered_throttle), K(remain_memory_is_exhausting)); + + return remain_memory_is_exhausting; +} + +void ObFreezer::PendTenantReplayHelper::pend_tenant_replay_() { int ret = OB_SUCCESS; - for (int64_t i = 0; i < ls_handle_array_.count(); i++) { - int tmp_ret = OB_SUCCESS; + ObLSService *ls_srv = MTL(ObLSService *); + common::ObSharedGuard iter; + if (OB_FAIL(ls_srv->get_ls_iter(iter, ObLSGetMod::STORAGE_MOD))) { + STORAGE_LOG(WARN, "[ObFreezer] fail to get ls iterator", KR(ret)); + } else { ObLS *ls = nullptr; - if (OB_ISNULL(ls = ls_handle_array_.at(i).get_ls())) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(ERROR, "[ObFreezer] invalid ls handle", KR(ret), KPC(ls)); - } else if (OB_TMP_FAIL(ls->get_freezer()->restore_ls_replay())) { - STORAGE_LOG(WARN, "[ObFreezer] restore replay failed", KR(ret), KPC(ls)); + ls_handle_array_.reuse(); + int64_t iterate_ls_count = 0; + int64_t pend_ls_replay_count = 0; + + // Here we need to perform a pend_ls_replay on all ls, because if a switch_leader occurs for any ls during the + // freezing process, the absence of pend_ls_replay might result in an unthrottled memory usage + while (OB_SUCC(iter->get_next(ls))) { + ObLSHandle ls_handle; + iterate_ls_count++; + if (OB_FAIL(ls_srv->get_ls(ls->get_ls_id(), ls_handle, ObLSGetMod::STORAGE_MOD))) { + STORAGE_LOG(WARN, "[ObFreezer] get ls handle failed", KR(ret), KP(ls)); + } else if (OB_FAIL(ls_handle_array_.push_back(ls_handle))) { + STORAGE_LOG(WARN, "[ObFreezer] push back ls handle failed", KR(ret), KP(ls)); + } else if (OB_FAIL(ls->get_freezer()->pend_ls_replay())) { + STORAGE_LOG(WARN, "[ObFreezer] pend replay failed", KR(ret), KPC(ls)); + (void)ls_handle_array_.pop_back(); + } else { + pend_ls_replay_count++; + } + } + + // only skip throttle when pend all ls replay successfully, or reset this guard to restore replay + if (iterate_ls_count == pend_ls_replay_count) { + (void)host_.set_tenant_replay_is_pending(); + STORAGE_LOG(INFO, + "pend replay finish", + K(pend_ls_replay_count), + K(host_.throttle_is_skipping()), + K(host_.tenant_replay_is_pending())); + } else { + (void)restore_tenant_replay_(); } } } +void ObFreezer::PendTenantReplayHelper::restore_tenant_replay_() +{ + for (int64_t i = 0; i < ls_handle_array_.count(); i++) { + int ret = OB_SUCCESS; + ObLS *ls = nullptr; + if (OB_ISNULL(ls = ls_handle_array_.at(i).get_ls())) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(ERROR, "[ObFreezer] invalid ls handle", KR(ret), KPC(ls)); + } else if (OB_FAIL(ls->get_freezer()->restore_ls_replay())) { + STORAGE_LOG(ERROR, "[ObFreezer] restore replay failed", KR(ret), KPC(ls)); + } + } + host_.unset_tenant_replay_is_pending(); + STORAGE_LOG(INFO, + "restore tenant replay", + K(ls_handle_array_.count()), + K(host_.tenant_replay_is_pending()), + K(host_.throttle_is_skipping())); + ls_handle_array_.reuse(); +} + } // namespace storage } // namespace oceanbase diff --git a/src/storage/ls/ob_freezer.h b/src/storage/ls/ob_freezer.h index 5e33e26fb..2b902441a 100644 --- a/src/storage/ls/ob_freezer.h +++ b/src/storage/ls/ob_freezer.h @@ -222,7 +222,8 @@ public: /********************** freeze **********************/ /* freeze_flag */ - bool is_freeze(uint32_t is_freeze=UINT32_MAX) const; + bool is_freeze(uint32_t freeze_flag) const; + bool is_ls_freeze_running() const { return 0 < ATOMIC_LOAD(&high_priority_freeze_cnt_); } uint32_t get_freeze_flag() const { return ATOMIC_LOAD(&freeze_flag_); }; uint32_t get_freeze_clock() { return ATOMIC_LOAD(&freeze_flag_) & (~(1 << 31)); } @@ -247,6 +248,13 @@ public: void print_freezer_statistics(); /* others */ + bool need_resubmit_log() { return ATOMIC_LOAD(&need_resubmit_log_); } + void set_throttle_is_skipping() { throttle_is_skipping_ = true; } + void unset_throttle_is_skipping() { throttle_is_skipping_ = false; } + bool throttle_is_skipping() { return throttle_is_skipping_; } + void set_tenant_replay_is_pending() { tenant_replay_is_pending_ = true; } + void unset_tenant_replay_is_pending() { tenant_replay_is_pending_ = false; } + bool tenant_replay_is_pending() const { return tenant_replay_is_pending_; } // get consequent callbacked log_ts right boundary virtual int get_max_consequent_callbacked_scn(share::SCN &max_consequent_callbacked_scn); // to set snapshot version when memtables meet ready_for_flush @@ -258,7 +266,6 @@ public: int get_newest_snapshot_version(const ObTabletID &tablet_id, share::SCN &snapshot_version); ObFreezerStat& get_stat() { return stat_; } - bool need_resubmit_log() { return ATOMIC_LOAD(&need_resubmit_log_); } void set_need_resubmit_log(bool flag) { return ATOMIC_STORE(&need_resubmit_log_, flag); } int pend_ls_replay(); int restore_ls_replay(); @@ -282,11 +289,26 @@ private: bool need_release_; ObFreezer &parent_; }; - class PendTenantReplayGuard { - public: - PendTenantReplayGuard(); - ~PendTenantReplayGuard(); + class PendTenantReplayHelper { public: + PendTenantReplayHelper(ObFreezer &host, ObLS *current_freeze_ls) + : host_(host), current_freeze_ls_(current_freeze_ls) {} + ~PendTenantReplayHelper() { reset_pend_status_(); } + void set_skip_throttle_flag(); + void check_pend_condition_once(); + private: + bool current_ls_is_leader_(); + bool remain_memory_is_exhausting_(); + void pend_tenant_replay_(); + void restore_tenant_replay_(); + void reset_pend_status_() + { + (void)host_.unset_throttle_is_skipping(); + (void)restore_tenant_replay_(); + } + private: + ObFreezer &host_; + ObLS *current_freeze_ls_; ObSEArray ls_handle_array_; }; @@ -299,10 +321,13 @@ private: void try_freeze_tx_data_(); /* inner subfunctions for freeze process */ - int inner_logstream_freeze(); int submit_log_for_freeze(const bool is_tablet_freeze, const bool is_try); void submit_log_if_needed_(ObIArray &frozen_memtable_handles); void try_submit_log_for_freeze_(const bool is_tablet_freeze); + void resubmit_log_if_needed_(const int64_t start_time, + const bool is_tablet_freeze, + const bool is_try, + const ObITabletMemtable *freeze_memtable = nullptr /* used for tablet freeze */); int wait_data_memtable_freeze_finish_(ObITabletMemtable *tablet_memtable); int wait_direct_load_memtable_freeze_finish_(ObITabletMemtable *tablet_memtable); int set_tablet_freeze_flag_(const int64_t trace_id, @@ -329,12 +354,12 @@ private: ObTabletID &record_tablet_id, bool &try_guard); int submit_wait_freeze_finish_task_(const bool is_ls_freeze, ObFuture *result, ObTableHandleV2 &handle); - int wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tablet_memtable); + int wait_memtable_ready_for_flush_(ObITabletMemtable *tablet_memtable); int try_set_tablet_freeze_begin_(); void set_tablet_freeze_begin_(); - void set_tablet_freeze_end_(); + void set_tablet_freeze_end_() { ATOMIC_DEC(&low_priority_freeze_cnt_); } void set_ls_freeze_begin_(); - void set_ls_freeze_end_(); + void set_ls_freeze_end_() { ATOMIC_DEC(&high_priority_freeze_cnt_); } int check_ls_state(); // must be used under the protection of ls_lock int tablet_freeze_(const int64_t trace_id, const ObIArray &tablet_ids, @@ -343,11 +368,6 @@ private: ObIArray &frozen_memtable_handles, ObIArray &freeze_failed_tablets); int inner_wait_memtable_freeze_finish_(ObTableHandleV2 &memtable_handle); - int try_wait_memtable_ready_for_flush_with_ls_lock(ObITabletMemtable *tablet_memtable, - bool &ready_for_flush, - bool &is_force_released, - const int64_t start, - int64_t &last_submit_log_time); void submit_checkpoint_task(); private: // flag whether the logsteram is freezing @@ -375,6 +395,8 @@ private: bool enable_; // whether we can do freeze now bool is_inited_; bool is_async_tablet_freeze_task_running_; + bool throttle_is_skipping_; + bool tenant_replay_is_pending_; common::hash::ObHashSet async_freeze_tablets_; }; diff --git a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp index 15b3a9a46..344e02217 100644 --- a/src/storage/tx/ob_trans_ctx_mgr_v4.cpp +++ b/src/storage/tx/ob_trans_ctx_mgr_v4.cpp @@ -1329,12 +1329,16 @@ int ObLSTxCtxMgr::traverse_tx_to_submit_redo_log(ObTransID &fail_tx_id, const ui int ret = OB_SUCCESS; RLockGuard guard(rwlock_); ObTxSubmitLogFunctor fn(ObTxSubmitLogFunctor::SUBMIT_REDO_LOG, freeze_clock); - if (!is_follower_() && OB_FAIL(ls_tx_ctx_map_.for_each(fn))) { + if (is_follower_()) { + // quit submit log because this is a follower + } else if (OB_FAIL(ls_tx_ctx_map_.for_each(fn))) { if (OB_SUCCESS != fn.get_result()) { // get real ret code ret = fn.get_result(); } TRANS_LOG(WARN, "failed to submit log", K(ret)); + } else { + TRANS_LOG(INFO, "traverse tx to submit redo log finish", K(ret), K(freeze_clock)); } fail_tx_id = fn.get_fail_tx_id(); diff --git a/src/storage/tx_storage/ob_tenant_freezer.cpp b/src/storage/tx_storage/ob_tenant_freezer.cpp index ec5972600..415f4ae63 100644 --- a/src/storage/tx_storage/ob_tenant_freezer.cpp +++ b/src/storage/tx_storage/ob_tenant_freezer.cpp @@ -46,10 +46,10 @@ ObTenantFreezer::ObTenantFreezer() rs_mgr_(nullptr), freeze_thread_pool_(), freeze_thread_pool_lock_(common::ObLatchIds::FREEZE_THREAD_POOL_LOCK), - exist_ls_freezing_(false), - last_update_ts_(0), freezer_stat_(), - freezer_history_() + freezer_history_(), + throttle_is_skipping_cache_(), + memstore_remain_memory_is_exhausting_cache_() { freezer_stat_.reset(); } @@ -63,13 +63,14 @@ void ObTenantFreezer::destroy() { freeze_trigger_timer_.destroy(); is_freezing_tx_data_ = false; - exist_ls_freezing_ = false; self_.reset(); svr_rpc_proxy_ = nullptr; common_rpc_proxy_ = nullptr; rs_mgr_ = nullptr; freezer_stat_.reset(); freezer_history_.reset(); + throttle_is_skipping_cache_.reset(); + memstore_remain_memory_is_exhausting_cache_.reset(); is_inited_ = false; } @@ -160,12 +161,46 @@ void ObTenantFreezer::wait() bool ObTenantFreezer::exist_ls_freezing() { - int64_t cur_ts = ObTimeUtility::fast_current_time(); - int64_t old_ts = last_update_ts_; + int ret = OB_SUCCESS; + bool exist_ls_freezing = false; + common::ObSharedGuard iter; + ObLSService *ls_srv = MTL(ObLSService *); + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("[TenantFreezer] tenant freezer not inited", KR(ret)); + } else if (OB_FAIL(ls_srv->get_ls_iter(iter, ObLSGetMod::TXSTORAGE_MOD))) { + LOG_WARN("[TenantFreezer] fail to get log stream iterator", KR(ret)); + } else { + ObLS *ls = nullptr; + while (OB_SUCC(iter->get_next(ls))) { + if (ls->get_freezer()->is_ls_freeze_running()) { + exist_ls_freezing = true; + break; + } + } + + if (ret == OB_ITER_END) { + ret = OB_SUCCESS; + } + + if (OB_FAIL(ret)) { + LOG_WARN("[TenantFreezer] iter ls failed", K(ret)); + } + } + + return exist_ls_freezing; +} + +bool ObTenantFreezer::exist_ls_throttle_is_skipping() +{ + int ret = OB_SUCCESS; + int64_t cur_ts = ObClockGenerator::getClock(); + int64_t last_update_ts = throttle_is_skipping_cache_.update_ts_; + + if ((cur_ts - last_update_ts > UPDATE_INTERVAL) && + ATOMIC_BCAS(&throttle_is_skipping_cache_.update_ts_, last_update_ts, cur_ts)) { + bool exist_ls_throttle_is_skipping = false; - if ((cur_ts - last_update_ts_ > UPDATE_INTERVAL) && - old_ts == ATOMIC_CAS(&last_update_ts_, old_ts, cur_ts)) { - int ret = OB_SUCCESS; common::ObSharedGuard iter; ObLSService *ls_srv = MTL(ObLSService *); if (IS_NOT_INIT) { @@ -175,31 +210,71 @@ bool ObTenantFreezer::exist_ls_freezing() LOG_WARN("[TenantFreezer] fail to get log stream iterator", KR(ret)); } else { ObLS *ls = nullptr; - int ls_cnt = 0; - int exist_ls_freezing = false; - for (; OB_SUCC(iter->get_next(ls)); ++ls_cnt) { - int tmp_ret = OB_SUCCESS; - ObRole role; - int64_t proposal_id = 0; - if (OB_TMP_FAIL(ls->get_log_handler()->get_role(role, proposal_id))) { - LOG_WARN("get ls role failed", KR(tmp_ret), K(ls->get_ls_id())); - } else if (common::is_strong_leader(role)) { - // skip check leader logstream - } else if (ls->get_freezer()->is_freeze()) { - exist_ls_freezing = true; + while (OB_SUCC(iter->get_next(ls))) { + if (ls->get_freezer()->throttle_is_skipping()) { + exist_ls_throttle_is_skipping = true; + break; } } - exist_ls_freezing_ = exist_ls_freezing; - if (ret == OB_ITER_END) { ret = OB_SUCCESS; - } else { - LOG_WARN("[TenantFreezer] iter ls failed", K(ret)); + } + + if (OB_FAIL(ret)) { + LOG_WARN("[TenantFreezer] iter ls failed", K(ret)); } } + + // assign need_skip_throttle here because if some error happened, the value can be reset to false + throttle_is_skipping_cache_.value_ = exist_ls_throttle_is_skipping; } - return exist_ls_freezing_; + return throttle_is_skipping_cache_.value_; +} + +bool ObTenantFreezer::memstore_remain_memory_is_exhausting() +{ + int ret = OB_SUCCESS; + int64_t cur_ts = ObClockGenerator::getClock(); + int64_t last_update_ts = memstore_remain_memory_is_exhausting_cache_.update_ts_; + + if ((cur_ts - last_update_ts > UPDATE_INTERVAL) && + ATOMIC_BCAS(&memstore_remain_memory_is_exhausting_cache_.update_ts_, last_update_ts, cur_ts)) { + bool remain_mem_exhausting = false; + if (false == tenant_info_.is_loaded_) { + LOG_INFO("[TenantFreezer] This tenant not exist", KR(ret)); + } else { + const int64_t MEMORY_IS_EXHAUSTING_PERCENTAGE = 10; + + // tenant memory condition + const int64_t tenant_memory_limit = get_tenant_memory_limit(MTL_ID()); + const int64_t tenant_memory_remain = get_tenant_memory_remain(MTL_ID()); + const bool tenant_memory_exhausting = + tenant_memory_remain < (tenant_memory_limit * MEMORY_IS_EXHAUSTING_PERCENTAGE / 100); + + // memstore memory condition + const int64_t memstore_limit = tenant_info_.get_memstore_limit(); + const int64_t memstore_remain = (memstore_limit - get_tenant_memory_hold(MTL_ID(), ObCtxIds::MEMSTORE_CTX_ID)); + const bool memstore_memory_exhausting = memstore_remain < (memstore_limit * MEMORY_IS_EXHAUSTING_PERCENTAGE / 100); + + remain_mem_exhausting = tenant_memory_exhausting || memstore_memory_exhausting; + + if (remain_mem_exhausting && REACH_TIME_INTERVAL(1LL * 1000LL * 1000LL /* 1 second */)) { + STORAGE_LOG(INFO, + "[TenantFreezer] memstore remain memory is exhausting", + K(tenant_memory_limit), + K(tenant_memory_remain), + K(tenant_memory_exhausting), + K(memstore_limit), + K(memstore_remain), + K(memstore_memory_exhausting)); + } + } + + memstore_remain_memory_is_exhausting_cache_.value_ = remain_mem_exhausting; + } + + return memstore_remain_memory_is_exhausting_cache_.value_; } int ObTenantFreezer::ls_freeze_(ObLS *ls, const bool is_sync, const int64_t abs_timeout_ts) diff --git a/src/storage/tx_storage/ob_tenant_freezer.h b/src/storage/tx_storage/ob_tenant_freezer.h index 05b3d0ca2..b7e26f91f 100644 --- a/src/storage/tx_storage/ob_tenant_freezer.h +++ b/src/storage/tx_storage/ob_tenant_freezer.h @@ -102,6 +102,16 @@ class ObTenantFreezer { friend ObTenantTxDataFreezeGuard; friend class ObFreezer; +struct PeriodicalUpdateValueCache { + PeriodicalUpdateValueCache() : value_(false), update_ts_(0) {} + void reset() + { + value_ = false; + update_ts_ = 0; + } + bool value_; + int64_t update_ts_; +}; public: const static int64_t TIME_WHEEL_PRECISION = 100_ms; @@ -218,6 +228,8 @@ public: } static int64_t get_freeze_trigger_interval() { return FREEZE_TRIGGER_INTERVAL; } bool exist_ls_freezing(); + bool exist_ls_throttle_is_skipping(); + bool memstore_remain_memory_is_exhausting(); // freezer stat collector and generator void add_merge_event(const compaction::ObMergeType type, const int64_t cost) @@ -305,13 +317,13 @@ private: common::ObOccamTimerTaskRAIIHandle timer_handle_; common::ObOccamThreadPool freeze_thread_pool_; ObSpinLock freeze_thread_pool_lock_; - bool exist_ls_freezing_; - int64_t last_update_ts_; // diagnose only, we capture the freeze stats every 30 minutes ObTenantFreezerStat freezer_stat_; // diagnose only, we capture the freeze history in one monthes ObTenantFreezerStatHistory freezer_history_; + PeriodicalUpdateValueCache throttle_is_skipping_cache_; + PeriodicalUpdateValueCache memstore_remain_memory_is_exhausting_cache_; }; class ObTenantTxDataFreezeGuard diff --git a/src/storage/tx_storage/ob_tenant_freezer_common.cpp b/src/storage/tx_storage/ob_tenant_freezer_common.cpp index 0ca3a4673..736d8765a 100644 --- a/src/storage/tx_storage/ob_tenant_freezer_common.cpp +++ b/src/storage/tx_storage/ob_tenant_freezer_common.cpp @@ -37,9 +37,7 @@ OB_SERIALIZE_MEMBER(ObTenantFreezeArg, try_frozen_scn_); ObTenantFreezeCtx::ObTenantFreezeCtx() - : mem_lower_limit_(0), - mem_upper_limit_(0), - mem_memstore_limit_(0), + : mem_memstore_limit_(0), memstore_freeze_trigger_(0), max_mem_memstore_can_get_now_(0), active_memstore_used_(0), @@ -52,8 +50,6 @@ ObTenantFreezeCtx::ObTenantFreezeCtx() void ObTenantFreezeCtx::reset() { - mem_lower_limit_ = 0; - mem_upper_limit_ = 0; mem_memstore_limit_ = 0; memstore_freeze_trigger_ = 0; max_mem_memstore_can_get_now_ = 0; @@ -181,8 +177,6 @@ bool ObTenantInfo::is_memstore_limit_changed(const int64_t curr_memstore_limit_p void ObTenantInfo::get_freeze_ctx(ObTenantFreezeCtx &ctx) const { SpinRLockGuard guard(lock_); - ctx.mem_lower_limit_ = mem_lower_limit_; - ctx.mem_upper_limit_ = mem_upper_limit_; ctx.mem_memstore_limit_ = mem_memstore_limit_; } diff --git a/src/storage/tx_storage/ob_tenant_freezer_common.h b/src/storage/tx_storage/ob_tenant_freezer_common.h index e113698e6..97c2fc757 100644 --- a/src/storage/tx_storage/ob_tenant_freezer_common.h +++ b/src/storage/tx_storage/ob_tenant_freezer_common.h @@ -61,8 +61,6 @@ public: void reset(); public: // snapshot of tenant_info - int64_t mem_lower_limit_; - int64_t mem_upper_limit_; int64_t mem_memstore_limit_; // running data