From c5d6ca2c5d09f9ac5218e7ac0deb5355f3326f87 Mon Sep 17 00:00:00 2001 From: yyy-hust Date: Tue, 16 May 2023 07:11:27 +0000 Subject: [PATCH] [log write throttling] reduce thread count of IOWorker --- mittest/logservice/CMakeLists.txt | 4 +- .../test_ob_simple_log_throttling.cpp | 3 +- .../test_ob_simple_log_throttling_arb.cpp | 2 +- ...ob_simple_log_throttling_member_change.cpp | 9 ++++ mittest/mtlenv/mock_tenant_module_env.h | 27 +++++++++++ src/logservice/palf/log_engine.cpp | 8 +++- src/logservice/palf/log_engine.h | 3 +- src/logservice/palf/log_io_worker.cpp | 7 ++- src/logservice/palf/log_io_worker_wrapper.cpp | 48 +++++++++++-------- src/logservice/palf/log_io_worker_wrapper.h | 42 ++++++++-------- src/logservice/palf/log_reconfirm.cpp | 30 ++++++++++-- src/logservice/palf/log_reconfirm.h | 5 +- src/logservice/palf/log_sliding_window.cpp | 2 +- src/logservice/palf/palf_env_impl.cpp | 25 +++++----- src/logservice/palf/palf_env_impl.h | 3 +- src/logservice/palf/palf_handle_impl.cpp | 10 ++-- src/logservice/palf/palf_handle_impl.h | 4 +- src/logservice/palf/palf_options.h | 5 +- 18 files changed, 157 insertions(+), 80 deletions(-) diff --git a/mittest/logservice/CMakeLists.txt b/mittest/logservice/CMakeLists.txt index e4b8535aa..dea4fb91a 100644 --- a/mittest/logservice/CMakeLists.txt +++ b/mittest/logservice/CMakeLists.txt @@ -31,8 +31,8 @@ ob_unittest_clog(test_ob_simple_arb_server_single_replica test_ob_simple_arb_ser ob_unittest_clog(test_ob_simple_arb_server_mutil_replica test_ob_simple_arb_server_mutil_replica.cpp) ob_unittest_clog(test_ob_simple_log_data_intergrity test_ob_simple_log_data_intergrity.cpp) ob_unittest_clog(test_ob_simple_log_throttling test_ob_simple_log_throttling.cpp) -#ob_unittest_clog(test_ob_simple_log_throttling_member_change test_ob_simple_log_throttling_member_change.cpp) -#ob_unittest_clog(test_ob_simple_log_throttling_arb test_ob_simple_log_throttling_arb.cpp) +ob_unittest_clog(test_ob_simple_log_throttling_member_change test_ob_simple_log_throttling_member_change.cpp) +ob_unittest_clog(test_ob_simple_log_throttling_arb test_ob_simple_log_throttling_arb.cpp) add_subdirectory(archiveservice) diff --git a/mittest/logservice/test_ob_simple_log_throttling.cpp b/mittest/logservice/test_ob_simple_log_throttling.cpp index 7fdad3f4e..503aa3eec 100644 --- a/mittest/logservice/test_ob_simple_log_throttling.cpp +++ b/mittest/logservice/test_ob_simple_log_throttling.cpp @@ -96,7 +96,7 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_sys) int64_t break_ts = common::ObClockGenerator::getClock(); ASSERT_EQ(true, (break_ts - cur_ts) < 1 * 1000 * 1000); - PALF_LOG(INFO, "end test throttling_basic", K(id)); + PALF_LOG(INFO, "end test throttling_sys_log_stream", K(id)); leader.reset(); palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_ = disk_options; @@ -258,6 +258,7 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic) ASSERT_EQ(5, throttle.handled_seq_); io_task_cond_1.cond_.signal(); wait_lsn_until_flushed(max_lsn_1, leader); + usleep(10 * 1000); ASSERT_EQ(7, throttle.submitted_seq_); ASSERT_EQ(6, throttle.handled_seq_); io_task_cond_2.cond_.signal(); diff --git a/mittest/logservice/test_ob_simple_log_throttling_arb.cpp b/mittest/logservice/test_ob_simple_log_throttling_arb.cpp index 91b2c16f1..af7651967 100644 --- a/mittest/logservice/test_ob_simple_log_throttling_arb.cpp +++ b/mittest/logservice/test_ob_simple_log_throttling_arb.cpp @@ -181,7 +181,7 @@ TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_major) ret = leader.palf_handle_impl_->replace_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1), ObMember(get_cluster()[another_f_idx]->get_addr(), 1), CONFIG_CHANGE_TIMEOUT); - //TODO(yaoying.yyy): + //timeout because added member can flush new meta when prev log is throttling ASSERT_TRUE(OB_TIMEOUT == ret || OB_SUCCESS == ret); int64_t new_leader_idx = OB_TIMEOUT == ret ? another_f_idx : follower_D_idx; ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128)); diff --git a/mittest/logservice/test_ob_simple_log_throttling_member_change.cpp b/mittest/logservice/test_ob_simple_log_throttling_member_change.cpp index 76eb58528..eedf62258 100644 --- a/mittest/logservice/test_ob_simple_log_throttling_member_change.cpp +++ b/mittest/logservice/test_ob_simple_log_throttling_member_change.cpp @@ -274,8 +274,17 @@ TEST_F(TestObSimpleLogIOWorkerThrottlingV2, test_throttling_minor_leader) wait_lsn_until_flushed(max_lsn, new_leader); loc_cb.leader_ = get_cluster()[new_leader_idx]->get_addr(); + usleep(5*1000*1000);//wait follower_c log sync + LSN old_max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn(); + LSN leader_old_max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn(); + new_leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE; ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 2, id, 512 * KB)); + LSN cur_max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn(); + LSN leader_cur_max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn(); usleep(500 * 1000); + LSN new_max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn(); + LSN leader_new_max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn(); + PALF_LOG(INFO, "before remove member",K(leader_old_max_lsn), K(leader_cur_max_lsn), K(leader_new_max_lsn), K(old_max_lsn), K(cur_max_lsn), K(new_max_lsn)); ASSERT_EQ(OB_TIMEOUT, new_leader.palf_handle_impl_->remove_member(ObMember(get_cluster()[follower_C_idx]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT)); ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB)); usleep(500 * 1000); diff --git a/mittest/mtlenv/mock_tenant_module_env.h b/mittest/mtlenv/mock_tenant_module_env.h index 1897b3d51..52e7ed739 100644 --- a/mittest/mtlenv/mock_tenant_module_env.h +++ b/mittest/mtlenv/mock_tenant_module_env.h @@ -74,6 +74,7 @@ #include "storage/concurrency_control/ob_multi_version_garbage_collector.h" #include "storage/tablelock/ob_table_lock_service.h" #include "storage/tx/wrs/ob_tenant_weak_read_service.h" +#include "logservice/palf/log_define.h" namespace oceanbase { @@ -718,6 +719,31 @@ int MockTenantModuleEnv::start_() } else if (OB_FAIL(tenant->acquire_more_worker(TENANT_WORKER_COUNT, succ_num))) { } else if (OB_FAIL(guard_.switch_to(tenant_id))) { // switch mtl context STORAGE_LOG(ERROR, "fail to switch to sys tenant", K(ret)); + } else { + ObLogService *log_service = MTL(logservice::ObLogService*); + if (OB_ISNULL(log_service) || OB_ISNULL(log_service->palf_env_)) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(ERROR, "fail to switch to sys tenant", KP(log_service)); + } else { + palf::LogIOWorkerConfig log_io_worker_config; + log_io_worker_config.io_worker_num_ = 1; + log_io_worker_config.io_queue_capcity_ = 100 * 1024; + log_io_worker_config.batch_width_ = 8; + log_io_worker_config.batch_depth_ = palf::PALF_SLIDING_WINDOW_SIZE; + + palf::LogIOWorker &use_io_worker = log_service->palf_env_->palf_env_impl_.log_io_worker_wrapper_.user_log_io_worker_; + if (OB_FAIL(use_io_worker.init(log_io_worker_config, tenant_id, + log_service->palf_env_->palf_env_impl_.cb_thread_pool_.get_tg_id(), + log_service->palf_env_->palf_env_impl_.log_alloc_mgr_, + &log_service->palf_env_->palf_env_impl_))) { + STORAGE_LOG(ERROR, "fail to init user_io_worker", KP(log_service)); + } else if (OB_FAIL(use_io_worker.start())) { + STORAGE_LOG(ERROR, "fail to init start", KP(log_service)); + } else { + //set this to stop user_io_worker + log_service->palf_env_->palf_env_impl_.log_io_worker_wrapper_.is_user_tenant_ = true; + } + } } if (OB_SUCC(ret)) { @@ -745,6 +771,7 @@ void MockTenantModuleEnv::destroy() // 释放租户上下文 guard_.release(); + multi_tenant_.stop(); multi_tenant_.wait(); multi_tenant_.destroy(); diff --git a/src/logservice/palf/log_engine.cpp b/src/logservice/palf/log_engine.cpp index c485d1148..818ebdd59 100644 --- a/src/logservice/palf/log_engine.cpp +++ b/src/logservice/palf/log_engine.cpp @@ -770,7 +770,8 @@ int LogEngine::get_min_block_info(block_id_t &min_block_id, SCN &min_block_scn) return ret; } -int LogEngine::get_total_used_disk_space(int64_t &total_used_size_byte) const +int LogEngine::get_total_used_disk_space(int64_t &total_used_size_byte, + int64_t &unrecyclable_disk_space) const { int ret = OB_SUCCESS; block_id_t min_block_id = LOG_INVALID_BLOCK_ID; @@ -795,8 +796,11 @@ int LogEngine::get_total_used_disk_space(int64_t &total_used_size_byte) const } else if (OB_FAIL(log_meta_storage_.get_block_id_range(min_block_id, max_block_id))) { PALF_LOG(WARN, "get_block_id_range failed", K(ret), KPC(this)); } else { - meta_storage_used = (max_block_id - min_block_id + 1) * (PALF_META_BLOCK_SIZE + MAX_INFO_BLOCK_SIZE); + meta_storage_used = PALF_META_BLOCK_SIZE + MAX_INFO_BLOCK_SIZE; total_used_size_byte = log_storage_used + meta_storage_used; + + const int64_t unrecyclable_meta_size = meta_storage_used; + unrecyclable_disk_space = log_storage_.get_end_lsn() - get_base_lsn_used_for_block_gc() + unrecyclable_meta_size; PALF_LOG(TRACE, "get_total_used_disk_space", K(meta_storage_used), K(log_storage_used), K(total_used_size_byte)); } return ret; diff --git a/src/logservice/palf/log_engine.h b/src/logservice/palf/log_engine.h index 45fe1fc70..17cd70d96 100644 --- a/src/logservice/palf/log_engine.h +++ b/src/logservice/palf/log_engine.h @@ -419,7 +419,8 @@ public: // ===================== NetService end ======================== LogStorage *get_log_storage() { return &log_storage_; } LogStorage *get_log_meta_storage() { return &log_meta_storage_; } - int get_total_used_disk_space(int64_t &total_used_size_byte) const; + int get_total_used_disk_space(int64_t &total_used_size_byte, + int64_t &unrecyclable_disk_space) const; virtual int64_t get_palf_epoch() const { return palf_epoch_; } TO_STRING_KV(K_(palf_id), K_(is_inited), K_(min_block_max_scn), K_(min_block_id), K_(base_lsn_for_block_gc), K_(log_meta), K_(log_meta_storage), K_(log_storage), K_(palf_epoch), K_(last_purge_throttling_ts), KP(this)); diff --git a/src/logservice/palf/log_io_worker.cpp b/src/logservice/palf/log_io_worker.cpp index 893d3585d..05069231b 100644 --- a/src/logservice/palf/log_io_worker.cpp +++ b/src/logservice/palf/log_io_worker.cpp @@ -164,7 +164,8 @@ int LogWritingThrottle::update_throtting_options_(IPalfEnvImpl *palf_env_impl, b THROTTLING_DURATION_US / (1000 * 1000), K(THROTTLING_CHUNK_SIZE)); } else { PALF_LOG(INFO, "[LOG DISK THROTTLING] success to calc_decay_factor", K(decay_factor_), K(throttling_options_), - K(new_throttling_options), "duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE)); + K(new_throttling_options), "duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE), + KPC(this)); } } @@ -178,8 +179,9 @@ int LogWritingThrottle::update_throtting_options_(IPalfEnvImpl *palf_env_impl, b } throttling_options_ = new_throttling_options; if (need_start_throttling) { + const LogThrottlingStat old_stat = stat_; stat_.start_throttling(); - PALF_LOG(INFO, "[LOG DISK THROTTLING] [START]", KPC(this), + PALF_LOG(INFO, "[LOG DISK THROTTLING] [START]", K(old_stat), KPC(this), "duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE)); } } @@ -311,6 +313,7 @@ int LogIOWorker::submit_io_task(LogIOTask *io_task) } return ret; } + int LogIOWorker::notify_need_writing_throttling(const bool &need_throttling) { int ret = OB_SUCCESS; diff --git a/src/logservice/palf/log_io_worker_wrapper.cpp b/src/logservice/palf/log_io_worker_wrapper.cpp index 9534fbcbc..59cd0a8cd 100644 --- a/src/logservice/palf/log_io_worker_wrapper.cpp +++ b/src/logservice/palf/log_io_worker_wrapper.cpp @@ -34,6 +34,7 @@ LogIOWorkerWrapper::~LogIOWorkerWrapper() void LogIOWorkerWrapper::destroy() { is_inited_ = false; + is_user_tenant_ = false; sys_log_io_worker_.destroy(); user_log_io_worker_.destroy(); } @@ -54,10 +55,10 @@ int LogIOWorkerWrapper::init(const LogIOWorkerConfig &config, allocaotor, palf_env_impl))) { LOG_WARN("failed to init sys_log_io_worker", K(ret)); - } else if (OB_FAIL(user_log_io_worker_.init(config, - tenant_id, - cb_thread_pool_tg_id, - allocaotor, palf_env_impl))) { + } else if (is_user_tenant_ && OB_FAIL(user_log_io_worker_.init(config, + tenant_id, + cb_thread_pool_tg_id, + allocaotor, palf_env_impl))) { sys_log_io_worker_.destroy(); LOG_WARN("failed to init user_log_io_worker"); } else { @@ -70,44 +71,53 @@ LogIOWorker *LogIOWorkerWrapper::get_log_io_worker(const int64_t palf_id) { return is_sys_palf_id(palf_id) ? &sys_log_io_worker_ : &user_log_io_worker_; } + int LogIOWorkerWrapper::start() { int ret = OB_SUCCESS; if (OB_FAIL(sys_log_io_worker_.start())) { LOG_WARN("failed to start sys_log_io_worker"); - } else if (OB_FAIL(user_log_io_worker_.start())) { + } else if (is_user_tenant_ && OB_FAIL(user_log_io_worker_.start())) { LOG_WARN("failed to start user_log_io_worker"); } else { LOG_INFO("success to start LogIOWorkerWrapper", KPC(this)); } return ret; } + void LogIOWorkerWrapper::stop() { - PALF_LOG(INFO, "LogIOWorkerWrapper begin stop", KPC(this)); + LOG_INFO("LogIOWorkerWrapper starts stopping", KPC(this)); sys_log_io_worker_.stop(); - user_log_io_worker_.stop(); - PALF_LOG(INFO, "LogIOWorkerWrapper begin stop", KPC(this)); -} -void LogIOWorkerWrapper::wait() -{ - PALF_LOG(INFO, " LogIOWorkerWrapper begin wait", KPC(this)); - sys_log_io_worker_.wait(); - user_log_io_worker_.wait(); - PALF_LOG(INFO, "LogIOWorkerWrapper begin wait", KPC(this)); + if (is_user_tenant_) { + user_log_io_worker_.stop(); + } + LOG_INFO("LogIOWorkerWrapper has finished stopping", KPC(this)); } -int LogIOWorkerWrapper::notify_need_writing_throttling(const bool &need_throtting) +void LogIOWorkerWrapper::wait() +{ + LOG_INFO("LogIOWorkerWrapper starts waiting", KPC(this)); + sys_log_io_worker_.wait(); + if (is_user_tenant_) { + user_log_io_worker_.wait(); + } + LOG_INFO("LogIOWorkerWrapper has finished waiting", KPC(this)); +} + +int LogIOWorkerWrapper::notify_need_writing_throttling(const bool &need_throttling) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; } else if (!is_user_tenant_) { //need no nothing - } else if (OB_FAIL(user_log_io_worker_.notify_need_writing_throttling(need_throtting))) { - LOG_WARN("failed to notify_need_writing_throttling", K(need_throtting)); + } else if (OB_FAIL(user_log_io_worker_.notify_need_writing_throttling(need_throttling))) { + LOG_WARN("failed to notify_need_writing_throttling", K(need_throttling)); } else { - LOG_WARN("success to notify_need_writing_throttling", K(need_throtting)); + if (need_throttling) { + LOG_INFO("success to notify_need_writing_throttling True"); + } } return ret; } diff --git a/src/logservice/palf/log_io_worker_wrapper.h b/src/logservice/palf/log_io_worker_wrapper.h index 3ad94a8ee..b26b6d222 100644 --- a/src/logservice/palf/log_io_worker_wrapper.h +++ b/src/logservice/palf/log_io_worker_wrapper.h @@ -22,30 +22,30 @@ namespace palf class LogIOWorkerWrapper { public: -LogIOWorkerWrapper(); -~LogIOWorkerWrapper(); + LogIOWorkerWrapper(); + ~LogIOWorkerWrapper(); -int init(const LogIOWorkerConfig &config, - const int64_t tenant_id, - int cb_thread_pool_tg_id, - ObIAllocator *allocaotr, - IPalfEnvImpl *palf_env_impl); -void destroy(); -int start(); -void stop(); -void wait(); -LogIOWorker *get_log_io_worker(const int64_t palf_id); -int notify_need_writing_throttling(const bool &need_throtting); -int64_t get_last_working_time() const; -TO_STRING_KV(K_(is_inited), K_(is_user_tenant), K_(sys_log_io_worker), K_(user_log_io_worker)); + int init(const LogIOWorkerConfig &config, + const int64_t tenant_id, + int cb_thread_pool_tg_id, + ObIAllocator *allocaotr, + IPalfEnvImpl *palf_env_impl); + void destroy(); + int start(); + void stop(); + void wait(); + LogIOWorker *get_log_io_worker(const int64_t palf_id); + int notify_need_writing_throttling(const bool &need_throtting); + int64_t get_last_working_time() const; + TO_STRING_KV(K_(is_inited), K_(is_user_tenant), K_(sys_log_io_worker), K_(user_log_io_worker)); private: -bool is_inited_; -bool is_user_tenant_; -//for log stream NO.1 -LogIOWorker sys_log_io_worker_; -//for log streams except NO.1 -LogIOWorker user_log_io_worker_; + bool is_inited_; + bool is_user_tenant_; + //for log stream NO.1 + LogIOWorker sys_log_io_worker_; + //for log streams except NO.1 + LogIOWorker user_log_io_worker_; }; }//end of namespace palf diff --git a/src/logservice/palf/log_reconfirm.cpp b/src/logservice/palf/log_reconfirm.cpp index e4bfe1912..ea1e36a11 100644 --- a/src/logservice/palf/log_reconfirm.cpp +++ b/src/logservice/palf/log_reconfirm.cpp @@ -58,6 +58,7 @@ LogReconfirm::LogReconfirm() wait_slide_print_time_us_(OB_INVALID_TIMESTAMP), wait_majority_time_us_(OB_INVALID_TIMESTAMP), last_notify_fetch_time_us_(OB_INVALID_TIMESTAMP), + last_purge_throttling_time_us_(OB_INVALID_TIMESTAMP), is_inited_(false) {} @@ -112,6 +113,7 @@ void LogReconfirm::reset_state() last_fetch_log_time_us_ = OB_INVALID_TIMESTAMP; last_record_sw_start_id_ = OB_INVALID_LOG_ID; last_notify_fetch_time_us_ = OB_INVALID_TIMESTAMP; + last_purge_throttling_time_us_ = OB_INVALID_TIMESTAMP; } } @@ -121,6 +123,7 @@ void LogReconfirm::destroy() if (is_inited_) { is_inited_ = false; last_notify_fetch_time_us_ = OB_INVALID_TIMESTAMP; + last_purge_throttling_time_us_ = OB_INVALID_TIMESTAMP; state_ = INITED; new_proposal_id_ = INVALID_PROPOSAL_ID; prepare_log_ack_list_.reset(); @@ -345,16 +348,26 @@ int LogReconfirm::ack_log_with_end_lsn_() return ret; } -bool LogReconfirm::is_fetch_log_finished_() const +bool LogReconfirm::is_fetch_log_finished_() { bool bool_ret = false; - int tmp_ret = OB_SUCCESS; LSN max_flushed_end_lsn; (void) sw_->get_max_flushed_end_lsn(max_flushed_end_lsn); + const LSN max_lsn = sw_->get_max_lsn(); if (majority_max_lsn_ == max_flushed_end_lsn) { bool_ret = true; + } else { + //In a scenario with writhing throttling, ensure that the fetched logs are written to disk as soon + //as possible. + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = purge_throttling_())) { + PALF_LOG_RET(WARN, tmp_ret, "purge throttling failed", K_(palf_id), K_(majority_max_lsn), + K(max_flushed_end_lsn), K(max_lsn)); + } } - PALF_LOG(TRACE, "is_fetch_log_finished_", K_(palf_id), K(bool_ret), K_(majority_max_lsn), K(max_flushed_end_lsn)); + + PALF_LOG(TRACE, "is_fetch_log_finished_", K_(palf_id), K(bool_ret), K_(majority_max_lsn), + K(max_flushed_end_lsn), K(max_lsn)); return bool_ret; } @@ -467,6 +480,8 @@ int LogReconfirm::reconfirm() PALF_LOG(WARN, "submit_prepare_log_ failed", K_(palf_id)); } else { state_ = FETCH_MAX_LOG_LSN; + //reset last_purge_throttling_time_us_to avoid impacting purging throttling during RECONFIRM_FETCH_LOG + last_purge_throttling_time_us_ = OB_INVALID_TIMESTAMP; PALF_EVENT("Reconfirm come into FETCH_MAX_LOG_LSN state", palf_id_, K_(self), K_(majority_max_accept_pid)); } break; @@ -618,11 +633,16 @@ int LogReconfirm::reconfirm() } return ret; } + int LogReconfirm::purge_throttling_() { int ret = OB_SUCCESS; - if (OB_FAIL(log_engine_->submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_RECONFIRM))) { - PALF_LOG(WARN, "submit_purge_throttling_task", K_(palf_id)); + if (palf_reach_time_interval(100 * 1000L, last_purge_throttling_time_us_)) { + if (OB_FAIL(log_engine_->submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_RECONFIRM))) { + PALF_LOG(WARN, "submit_purge_throttling_task", K_(palf_id)); + } else { + PALF_LOG(INFO, "submit_purge_throttling_task during reconfirming", K_(palf_id)); + } } return ret; } diff --git a/src/logservice/palf/log_reconfirm.h b/src/logservice/palf/log_reconfirm.h index 60d112259..8dc135239 100644 --- a/src/logservice/palf/log_reconfirm.h +++ b/src/logservice/palf/log_reconfirm.h @@ -63,12 +63,12 @@ public: TO_STRING_KV(K_(palf_id), K_(self), "state", state_to_string(state_), K_(new_proposal_id), K_(prepare_log_ack_list), \ K_(curr_paxos_follower_list), K_(majority_cnt), K_(majority_max_log_server), \ K_(majority_max_accept_pid), K_(majority_max_lsn), K_(saved_end_lsn), K_(last_submit_prepare_req_time_us), \ - K_(last_fetch_log_time_us), K_(last_record_sw_start_id), K_(last_notify_fetch_time_us), KP(this)); + K_(last_fetch_log_time_us), K_(last_record_sw_start_id), K_(last_notify_fetch_time_us), K_(last_purge_throttling_time_us), KP(this)); private: int init_reconfirm_(); int submit_prepare_log_(); int wait_all_log_flushed_(); - bool is_fetch_log_finished_() const; + bool is_fetch_log_finished_(); bool is_confirm_log_finished_() const; int try_fetch_log_(); int update_follower_end_lsn_(const common::ObAddr &server, const LSN &committed_end_lsn); @@ -187,6 +187,7 @@ private: int64_t wait_slide_print_time_us_; int64_t wait_majority_time_us_; int64_t last_notify_fetch_time_us_; + int64_t last_purge_throttling_time_us_; bool is_inited_; private: DISALLOW_COPY_AND_ASSIGN(LogReconfirm); diff --git a/src/logservice/palf/log_sliding_window.cpp b/src/logservice/palf/log_sliding_window.cpp index 2b1f241b0..8f64f0a08 100644 --- a/src/logservice/palf/log_sliding_window.cpp +++ b/src/logservice/palf/log_sliding_window.cpp @@ -2251,7 +2251,7 @@ bool LogSlidingWindow::is_all_log_flushed_() if (OB_SUCCESS != (tmp_ret = lsn_allocator_.get_curr_end_lsn(curr_end_lsn))) { PALF_LOG_RET(WARN, tmp_ret, "get_curr_end_lsn failed", K(tmp_ret), K_(palf_id), K_(self)); } else if (max_flushed_end_lsn < curr_end_lsn) { - PALF_LOG_RET(WARN, OB_ERR_UNEXPECTED, "there is some log has not been flushed", K_(palf_id), K_(self), K(curr_end_lsn), + PALF_LOG_RET(WARN, OB_EAGAIN, "there is some log has not been flushed", K_(palf_id), K_(self), K(curr_end_lsn), K(max_flushed_end_lsn), K_(max_flushed_lsn)); } else { bool_ret = true; diff --git a/src/logservice/palf/palf_env_impl.cpp b/src/logservice/palf/palf_env_impl.cpp index 47cda7cbf..8ab70957c 100644 --- a/src/logservice/palf/palf_env_impl.cpp +++ b/src/logservice/palf/palf_env_impl.cpp @@ -723,6 +723,7 @@ int PalfEnvImpl::try_recycle_blocks() "warn_percent(%)", disk_opts_for_recycling_blocks.log_disk_utilization_threshold_, "limit_size(MB)", (total_size_to_recycle_blocks*disk_opts_for_recycling_blocks.log_disk_utilization_limit_threshold_)/100/MB, "limit_percent(%)", disk_opts_for_recycling_blocks.log_disk_utilization_limit_threshold_, + "total_unrecyclable_size_byte(MB)", total_unrecyclable_size_byte/MB, "maximum_used_size(MB)", maximum_used_size/MB, "maximum_log_stream", palf_id, "oldest_log_stream", oldest_palf_id, @@ -755,26 +756,24 @@ bool PalfEnvImpl::GetTotalUsedDiskSpace::operator() (const LSKey &ls_key, IPalfH bool_ret = false; } else { constexpr int64_t MB = 1024 * 1024; - const int64_t used_size = palf_handle_impl->get_total_used_disk_space(); - if (used_size >= maximum_used_size_) { - maximum_used_size_ = used_size; - palf_id_ = ls_key.id_; - } - total_used_disk_space_ += palf_handle_impl->get_total_used_disk_space(); - LSN base_lsn; int ret = OB_SUCCESS; - if (OB_FAIL(palf_handle_impl->get_base_lsn(base_lsn))) { - PALF_LOG(WARN, "failed to get_base_lsn", K(ls_key)); + int64_t used_size = 0; + int64_t unrecyclable_size = 0; + if (OB_FAIL(palf_handle_impl->get_total_used_disk_space(used_size, unrecyclable_size))) { + PALF_LOG(WARN, "failed to get_total_used_disk_space", K(ls_key)); ret_code_ = ret; bool_ret = false; } else { - const int64_t unrecyclable_meta_size = (PALF_META_BLOCK_SIZE + MAX_INFO_BLOCK_SIZE); - total_unrecyclable_disk_space_ += (palf_handle_impl->get_end_lsn() - base_lsn + unrecyclable_meta_size); + if (used_size >= maximum_used_size_) { + maximum_used_size_ = used_size; + palf_id_ = ls_key.id_; + } + total_used_disk_space_ += used_size; + total_unrecyclable_disk_space_ += unrecyclable_size; PALF_LOG(TRACE, "get_total_used_disk_space success", K(ls_key), "total_used_disk_space(MB):", total_used_disk_space_/MB, "total_unrecyclable_disk_space(MB):", total_unrecyclable_disk_space_/MB, - "end_lsn", palf_handle_impl->get_end_lsn(), - "base_lsn", base_lsn); + "end_lsn", palf_handle_impl->get_end_lsn()); } } return bool_ret; diff --git a/src/logservice/palf/palf_env_impl.h b/src/logservice/palf/palf_env_impl.h index b809bdb78..3d3e99e22 100644 --- a/src/logservice/palf/palf_env_impl.h +++ b/src/logservice/palf/palf_env_impl.h @@ -132,8 +132,9 @@ public: ObSpinLockGuard guard(disk_opts_lock_); return Status::SHRINKING_STATUS == status_; } + static constexpr int64_t MB = 1024*1024ll; TO_STRING_KV(K_(disk_opts_for_stopping_writing), K_(disk_opts_for_recycling_blocks), K_(status), - K_(cur_unrecyclable_log_disk_size)); + "cur_unrecyclable_log_disk_size(MB)", cur_unrecyclable_log_disk_size_/MB); private: int update_disk_options_not_guarded_by_lock_(const PalfDiskOptions &new_opts); diff --git a/src/logservice/palf/palf_handle_impl.cpp b/src/logservice/palf/palf_handle_impl.cpp index f80bc95f1..57a0857b0 100644 --- a/src/logservice/palf/palf_handle_impl.cpp +++ b/src/logservice/palf/palf_handle_impl.cpp @@ -3355,15 +3355,15 @@ int PalfHandleImpl::ack_config_log(const common::ObAddr &server, return ret; } -int64_t PalfHandleImpl::get_total_used_disk_space() const +int PalfHandleImpl::get_total_used_disk_space(int64_t &total_used_disk_space, int64_t &unrecyclable_disk_space) const { - int64_t total_used_disk_space = 0; int ret = OB_SUCCESS; - if (OB_FAIL(log_engine_.get_total_used_disk_space(total_used_disk_space))) { + total_used_disk_space = 0; + unrecyclable_disk_space = 0; + if (OB_FAIL(log_engine_.get_total_used_disk_space(total_used_disk_space, unrecyclable_disk_space))) { PALF_LOG(WARN, "get_total_used_disk_space failed", K(ret), KPC(this)); - } else { } - return total_used_disk_space; + return ret; } int PalfHandleImpl::advance_reuse_lsn(const LSN &flush_log_end_lsn) diff --git a/src/logservice/palf/palf_handle_impl.h b/src/logservice/palf/palf_handle_impl.h index ccfa6edd9..a917a35b0 100755 --- a/src/logservice/palf/palf_handle_impl.h +++ b/src/logservice/palf/palf_handle_impl.h @@ -502,7 +502,7 @@ public: virtual const share::SCN get_max_scn() const = 0; virtual const share::SCN get_end_scn() const = 0; virtual int get_last_rebuild_lsn(LSN &last_rebuild_lsn) const = 0; - virtual int64_t get_total_used_disk_space() const = 0; + virtual int get_total_used_disk_space(int64_t &total_used_disk_space, int64_t &unrecyclable_disk_space) const = 0; virtual const LSN &get_base_lsn_used_for_block_gc() const = 0; // @desc: get ack_info_array and degraded_list for judging to degrade/upgrade // @params [in] member_ts_array: ack info array of all paxos members @@ -850,7 +850,7 @@ public: return sw_.get_last_slide_scn(); } int get_last_rebuild_lsn(LSN &last_rebuild_lsn) const override final; - int64_t get_total_used_disk_space() const; + int get_total_used_disk_space(int64_t &total_used_disk_space, int64_t &unrecyclable_disk_space) const; // return the smallest recycable lsn const LSN &get_base_lsn_used_for_block_gc() const override final { diff --git a/src/logservice/palf/palf_options.h b/src/logservice/palf/palf_options.h index 48e83246b..27cba9fad 100644 --- a/src/logservice/palf/palf_options.h +++ b/src/logservice/palf/palf_options.h @@ -185,8 +185,9 @@ struct PalfThrottleOptions // size of available log disk when writing throttling triggered inline int64_t get_available_size_after_limit() const; inline bool need_throttling() const; - TO_STRING_KV(K_(total_disk_space), K_(stopping_writing_percentage), - K_(trigger_percentage), K_(unrecyclable_disk_space)); + static constexpr int64_t MB = 1024*1024ll; + TO_STRING_KV("total_disk_space", total_disk_space_ / MB, K_(stopping_writing_percentage), + K_(trigger_percentage), "unrecyclable_disk_space(MB)", unrecyclable_disk_space_ / MB); public: int64_t total_disk_space_; int64_t stopping_writing_percentage_;