[log write throttling] reduce thread count of IOWorker
This commit is contained in:
parent
888f3d8387
commit
c5d6ca2c5d
@ -31,8 +31,8 @@ ob_unittest_clog(test_ob_simple_arb_server_single_replica test_ob_simple_arb_ser
|
||||
ob_unittest_clog(test_ob_simple_arb_server_mutil_replica test_ob_simple_arb_server_mutil_replica.cpp)
|
||||
ob_unittest_clog(test_ob_simple_log_data_intergrity test_ob_simple_log_data_intergrity.cpp)
|
||||
ob_unittest_clog(test_ob_simple_log_throttling test_ob_simple_log_throttling.cpp)
|
||||
#ob_unittest_clog(test_ob_simple_log_throttling_member_change test_ob_simple_log_throttling_member_change.cpp)
|
||||
#ob_unittest_clog(test_ob_simple_log_throttling_arb test_ob_simple_log_throttling_arb.cpp)
|
||||
ob_unittest_clog(test_ob_simple_log_throttling_member_change test_ob_simple_log_throttling_member_change.cpp)
|
||||
ob_unittest_clog(test_ob_simple_log_throttling_arb test_ob_simple_log_throttling_arb.cpp)
|
||||
|
||||
|
||||
add_subdirectory(archiveservice)
|
||||
|
@ -96,7 +96,7 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_sys)
|
||||
int64_t break_ts = common::ObClockGenerator::getClock();
|
||||
ASSERT_EQ(true, (break_ts - cur_ts) < 1 * 1000 * 1000);
|
||||
|
||||
PALF_LOG(INFO, "end test throttling_basic", K(id));
|
||||
PALF_LOG(INFO, "end test throttling_sys_log_stream", K(id));
|
||||
leader.reset();
|
||||
|
||||
palf_env->palf_env_impl_.disk_options_wrapper_.disk_opts_for_recycling_blocks_ = disk_options;
|
||||
@ -258,6 +258,7 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic)
|
||||
ASSERT_EQ(5, throttle.handled_seq_);
|
||||
io_task_cond_1.cond_.signal();
|
||||
wait_lsn_until_flushed(max_lsn_1, leader);
|
||||
usleep(10 * 1000);
|
||||
ASSERT_EQ(7, throttle.submitted_seq_);
|
||||
ASSERT_EQ(6, throttle.handled_seq_);
|
||||
io_task_cond_2.cond_.signal();
|
||||
|
@ -181,7 +181,7 @@ TEST_F(TestObSimpleLogThrottleArb, test_2f1a_throttling_major)
|
||||
ret = leader.palf_handle_impl_->replace_member(ObMember(get_cluster()[follower_D_idx]->get_addr(), 1),
|
||||
ObMember(get_cluster()[another_f_idx]->get_addr(), 1),
|
||||
CONFIG_CHANGE_TIMEOUT);
|
||||
//TODO(yaoying.yyy):
|
||||
//timeout because added member can flush new meta when prev log is throttling
|
||||
ASSERT_TRUE(OB_TIMEOUT == ret || OB_SUCCESS == ret);
|
||||
int64_t new_leader_idx = OB_TIMEOUT == ret ? another_f_idx : follower_D_idx;
|
||||
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 5, id, 128));
|
||||
|
@ -274,8 +274,17 @@ TEST_F(TestObSimpleLogIOWorkerThrottlingV2, test_throttling_minor_leader)
|
||||
wait_lsn_until_flushed(max_lsn, new_leader);
|
||||
loc_cb.leader_ = get_cluster()[new_leader_idx]->get_addr();
|
||||
|
||||
usleep(5*1000*1000);//wait follower_c log sync
|
||||
LSN old_max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
|
||||
LSN leader_old_max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn();
|
||||
new_leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
|
||||
ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 2, id, 512 * KB));
|
||||
LSN cur_max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
|
||||
LSN leader_cur_max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn();
|
||||
usleep(500 * 1000);
|
||||
LSN new_max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
|
||||
LSN leader_new_max_lsn = new_leader.palf_handle_impl_->sw_.get_max_lsn();
|
||||
PALF_LOG(INFO, "before remove member",K(leader_old_max_lsn), K(leader_cur_max_lsn), K(leader_new_max_lsn), K(old_max_lsn), K(cur_max_lsn), K(new_max_lsn));
|
||||
ASSERT_EQ(OB_TIMEOUT, new_leader.palf_handle_impl_->remove_member(ObMember(get_cluster()[follower_C_idx]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT));
|
||||
ASSERT_EQ(OB_SUCCESS, submit_log(new_leader, 1, id, 1 * KB));
|
||||
usleep(500 * 1000);
|
||||
|
@ -74,6 +74,7 @@
|
||||
#include "storage/concurrency_control/ob_multi_version_garbage_collector.h"
|
||||
#include "storage/tablelock/ob_table_lock_service.h"
|
||||
#include "storage/tx/wrs/ob_tenant_weak_read_service.h"
|
||||
#include "logservice/palf/log_define.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -718,6 +719,31 @@ int MockTenantModuleEnv::start_()
|
||||
} else if (OB_FAIL(tenant->acquire_more_worker(TENANT_WORKER_COUNT, succ_num))) {
|
||||
} else if (OB_FAIL(guard_.switch_to(tenant_id))) { // switch mtl context
|
||||
STORAGE_LOG(ERROR, "fail to switch to sys tenant", K(ret));
|
||||
} else {
|
||||
ObLogService *log_service = MTL(logservice::ObLogService*);
|
||||
if (OB_ISNULL(log_service) || OB_ISNULL(log_service->palf_env_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "fail to switch to sys tenant", KP(log_service));
|
||||
} else {
|
||||
palf::LogIOWorkerConfig log_io_worker_config;
|
||||
log_io_worker_config.io_worker_num_ = 1;
|
||||
log_io_worker_config.io_queue_capcity_ = 100 * 1024;
|
||||
log_io_worker_config.batch_width_ = 8;
|
||||
log_io_worker_config.batch_depth_ = palf::PALF_SLIDING_WINDOW_SIZE;
|
||||
|
||||
palf::LogIOWorker &use_io_worker = log_service->palf_env_->palf_env_impl_.log_io_worker_wrapper_.user_log_io_worker_;
|
||||
if (OB_FAIL(use_io_worker.init(log_io_worker_config, tenant_id,
|
||||
log_service->palf_env_->palf_env_impl_.cb_thread_pool_.get_tg_id(),
|
||||
log_service->palf_env_->palf_env_impl_.log_alloc_mgr_,
|
||||
&log_service->palf_env_->palf_env_impl_))) {
|
||||
STORAGE_LOG(ERROR, "fail to init user_io_worker", KP(log_service));
|
||||
} else if (OB_FAIL(use_io_worker.start())) {
|
||||
STORAGE_LOG(ERROR, "fail to init start", KP(log_service));
|
||||
} else {
|
||||
//set this to stop user_io_worker
|
||||
log_service->palf_env_->palf_env_impl_.log_io_worker_wrapper_.is_user_tenant_ = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -745,6 +771,7 @@ void MockTenantModuleEnv::destroy()
|
||||
// 释放租户上下文
|
||||
guard_.release();
|
||||
|
||||
|
||||
multi_tenant_.stop();
|
||||
multi_tenant_.wait();
|
||||
multi_tenant_.destroy();
|
||||
|
@ -770,7 +770,8 @@ int LogEngine::get_min_block_info(block_id_t &min_block_id, SCN &min_block_scn)
|
||||
return ret;
|
||||
}
|
||||
|
||||
int LogEngine::get_total_used_disk_space(int64_t &total_used_size_byte) const
|
||||
int LogEngine::get_total_used_disk_space(int64_t &total_used_size_byte,
|
||||
int64_t &unrecyclable_disk_space) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
block_id_t min_block_id = LOG_INVALID_BLOCK_ID;
|
||||
@ -795,8 +796,11 @@ int LogEngine::get_total_used_disk_space(int64_t &total_used_size_byte) const
|
||||
} else if (OB_FAIL(log_meta_storage_.get_block_id_range(min_block_id, max_block_id))) {
|
||||
PALF_LOG(WARN, "get_block_id_range failed", K(ret), KPC(this));
|
||||
} else {
|
||||
meta_storage_used = (max_block_id - min_block_id + 1) * (PALF_META_BLOCK_SIZE + MAX_INFO_BLOCK_SIZE);
|
||||
meta_storage_used = PALF_META_BLOCK_SIZE + MAX_INFO_BLOCK_SIZE;
|
||||
total_used_size_byte = log_storage_used + meta_storage_used;
|
||||
|
||||
const int64_t unrecyclable_meta_size = meta_storage_used;
|
||||
unrecyclable_disk_space = log_storage_.get_end_lsn() - get_base_lsn_used_for_block_gc() + unrecyclable_meta_size;
|
||||
PALF_LOG(TRACE, "get_total_used_disk_space", K(meta_storage_used), K(log_storage_used), K(total_used_size_byte));
|
||||
}
|
||||
return ret;
|
||||
|
@ -419,7 +419,8 @@ public:
|
||||
// ===================== NetService end ========================
|
||||
LogStorage *get_log_storage() { return &log_storage_; }
|
||||
LogStorage *get_log_meta_storage() { return &log_meta_storage_; }
|
||||
int get_total_used_disk_space(int64_t &total_used_size_byte) const;
|
||||
int get_total_used_disk_space(int64_t &total_used_size_byte,
|
||||
int64_t &unrecyclable_disk_space) const;
|
||||
virtual int64_t get_palf_epoch() const { return palf_epoch_; }
|
||||
TO_STRING_KV(K_(palf_id), K_(is_inited), K_(min_block_max_scn), K_(min_block_id), K_(base_lsn_for_block_gc),
|
||||
K_(log_meta), K_(log_meta_storage), K_(log_storage), K_(palf_epoch), K_(last_purge_throttling_ts), KP(this));
|
||||
|
@ -164,7 +164,8 @@ int LogWritingThrottle::update_throtting_options_(IPalfEnvImpl *palf_env_impl, b
|
||||
THROTTLING_DURATION_US / (1000 * 1000), K(THROTTLING_CHUNK_SIZE));
|
||||
} else {
|
||||
PALF_LOG(INFO, "[LOG DISK THROTTLING] success to calc_decay_factor", K(decay_factor_), K(throttling_options_),
|
||||
K(new_throttling_options), "duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE));
|
||||
K(new_throttling_options), "duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE),
|
||||
KPC(this));
|
||||
}
|
||||
}
|
||||
|
||||
@ -178,8 +179,9 @@ int LogWritingThrottle::update_throtting_options_(IPalfEnvImpl *palf_env_impl, b
|
||||
}
|
||||
throttling_options_ = new_throttling_options;
|
||||
if (need_start_throttling) {
|
||||
const LogThrottlingStat old_stat = stat_;
|
||||
stat_.start_throttling();
|
||||
PALF_LOG(INFO, "[LOG DISK THROTTLING] [START]", KPC(this),
|
||||
PALF_LOG(INFO, "[LOG DISK THROTTLING] [START]", K(old_stat), KPC(this),
|
||||
"duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE));
|
||||
}
|
||||
}
|
||||
@ -311,6 +313,7 @@ int LogIOWorker::submit_io_task(LogIOTask *io_task)
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int LogIOWorker::notify_need_writing_throttling(const bool &need_throttling)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -34,6 +34,7 @@ LogIOWorkerWrapper::~LogIOWorkerWrapper()
|
||||
void LogIOWorkerWrapper::destroy()
|
||||
{
|
||||
is_inited_ = false;
|
||||
is_user_tenant_ = false;
|
||||
sys_log_io_worker_.destroy();
|
||||
user_log_io_worker_.destroy();
|
||||
}
|
||||
@ -54,10 +55,10 @@ int LogIOWorkerWrapper::init(const LogIOWorkerConfig &config,
|
||||
allocaotor,
|
||||
palf_env_impl))) {
|
||||
LOG_WARN("failed to init sys_log_io_worker", K(ret));
|
||||
} else if (OB_FAIL(user_log_io_worker_.init(config,
|
||||
tenant_id,
|
||||
cb_thread_pool_tg_id,
|
||||
allocaotor, palf_env_impl))) {
|
||||
} else if (is_user_tenant_ && OB_FAIL(user_log_io_worker_.init(config,
|
||||
tenant_id,
|
||||
cb_thread_pool_tg_id,
|
||||
allocaotor, palf_env_impl))) {
|
||||
sys_log_io_worker_.destroy();
|
||||
LOG_WARN("failed to init user_log_io_worker");
|
||||
} else {
|
||||
@ -70,44 +71,53 @@ LogIOWorker *LogIOWorkerWrapper::get_log_io_worker(const int64_t palf_id)
|
||||
{
|
||||
return is_sys_palf_id(palf_id) ? &sys_log_io_worker_ : &user_log_io_worker_;
|
||||
}
|
||||
|
||||
int LogIOWorkerWrapper::start()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(sys_log_io_worker_.start())) {
|
||||
LOG_WARN("failed to start sys_log_io_worker");
|
||||
} else if (OB_FAIL(user_log_io_worker_.start())) {
|
||||
} else if (is_user_tenant_ && OB_FAIL(user_log_io_worker_.start())) {
|
||||
LOG_WARN("failed to start user_log_io_worker");
|
||||
} else {
|
||||
LOG_INFO("success to start LogIOWorkerWrapper", KPC(this));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void LogIOWorkerWrapper::stop()
|
||||
{
|
||||
PALF_LOG(INFO, "LogIOWorkerWrapper begin stop", KPC(this));
|
||||
LOG_INFO("LogIOWorkerWrapper starts stopping", KPC(this));
|
||||
sys_log_io_worker_.stop();
|
||||
user_log_io_worker_.stop();
|
||||
PALF_LOG(INFO, "LogIOWorkerWrapper begin stop", KPC(this));
|
||||
}
|
||||
void LogIOWorkerWrapper::wait()
|
||||
{
|
||||
PALF_LOG(INFO, " LogIOWorkerWrapper begin wait", KPC(this));
|
||||
sys_log_io_worker_.wait();
|
||||
user_log_io_worker_.wait();
|
||||
PALF_LOG(INFO, "LogIOWorkerWrapper begin wait", KPC(this));
|
||||
if (is_user_tenant_) {
|
||||
user_log_io_worker_.stop();
|
||||
}
|
||||
LOG_INFO("LogIOWorkerWrapper has finished stopping", KPC(this));
|
||||
}
|
||||
|
||||
int LogIOWorkerWrapper::notify_need_writing_throttling(const bool &need_throtting)
|
||||
void LogIOWorkerWrapper::wait()
|
||||
{
|
||||
LOG_INFO("LogIOWorkerWrapper starts waiting", KPC(this));
|
||||
sys_log_io_worker_.wait();
|
||||
if (is_user_tenant_) {
|
||||
user_log_io_worker_.wait();
|
||||
}
|
||||
LOG_INFO("LogIOWorkerWrapper has finished waiting", KPC(this));
|
||||
}
|
||||
|
||||
int LogIOWorkerWrapper::notify_need_writing_throttling(const bool &need_throttling)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (!is_user_tenant_) {
|
||||
//need no nothing
|
||||
} else if (OB_FAIL(user_log_io_worker_.notify_need_writing_throttling(need_throtting))) {
|
||||
LOG_WARN("failed to notify_need_writing_throttling", K(need_throtting));
|
||||
} else if (OB_FAIL(user_log_io_worker_.notify_need_writing_throttling(need_throttling))) {
|
||||
LOG_WARN("failed to notify_need_writing_throttling", K(need_throttling));
|
||||
} else {
|
||||
LOG_WARN("success to notify_need_writing_throttling", K(need_throtting));
|
||||
if (need_throttling) {
|
||||
LOG_INFO("success to notify_need_writing_throttling True");
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -22,30 +22,30 @@ namespace palf
|
||||
class LogIOWorkerWrapper
|
||||
{
|
||||
public:
|
||||
LogIOWorkerWrapper();
|
||||
~LogIOWorkerWrapper();
|
||||
LogIOWorkerWrapper();
|
||||
~LogIOWorkerWrapper();
|
||||
|
||||
int init(const LogIOWorkerConfig &config,
|
||||
const int64_t tenant_id,
|
||||
int cb_thread_pool_tg_id,
|
||||
ObIAllocator *allocaotr,
|
||||
IPalfEnvImpl *palf_env_impl);
|
||||
void destroy();
|
||||
int start();
|
||||
void stop();
|
||||
void wait();
|
||||
LogIOWorker *get_log_io_worker(const int64_t palf_id);
|
||||
int notify_need_writing_throttling(const bool &need_throtting);
|
||||
int64_t get_last_working_time() const;
|
||||
TO_STRING_KV(K_(is_inited), K_(is_user_tenant), K_(sys_log_io_worker), K_(user_log_io_worker));
|
||||
int init(const LogIOWorkerConfig &config,
|
||||
const int64_t tenant_id,
|
||||
int cb_thread_pool_tg_id,
|
||||
ObIAllocator *allocaotr,
|
||||
IPalfEnvImpl *palf_env_impl);
|
||||
void destroy();
|
||||
int start();
|
||||
void stop();
|
||||
void wait();
|
||||
LogIOWorker *get_log_io_worker(const int64_t palf_id);
|
||||
int notify_need_writing_throttling(const bool &need_throtting);
|
||||
int64_t get_last_working_time() const;
|
||||
TO_STRING_KV(K_(is_inited), K_(is_user_tenant), K_(sys_log_io_worker), K_(user_log_io_worker));
|
||||
|
||||
private:
|
||||
bool is_inited_;
|
||||
bool is_user_tenant_;
|
||||
//for log stream NO.1
|
||||
LogIOWorker sys_log_io_worker_;
|
||||
//for log streams except NO.1
|
||||
LogIOWorker user_log_io_worker_;
|
||||
bool is_inited_;
|
||||
bool is_user_tenant_;
|
||||
//for log stream NO.1
|
||||
LogIOWorker sys_log_io_worker_;
|
||||
//for log streams except NO.1
|
||||
LogIOWorker user_log_io_worker_;
|
||||
};
|
||||
|
||||
}//end of namespace palf
|
||||
|
@ -58,6 +58,7 @@ LogReconfirm::LogReconfirm()
|
||||
wait_slide_print_time_us_(OB_INVALID_TIMESTAMP),
|
||||
wait_majority_time_us_(OB_INVALID_TIMESTAMP),
|
||||
last_notify_fetch_time_us_(OB_INVALID_TIMESTAMP),
|
||||
last_purge_throttling_time_us_(OB_INVALID_TIMESTAMP),
|
||||
is_inited_(false)
|
||||
{}
|
||||
|
||||
@ -112,6 +113,7 @@ void LogReconfirm::reset_state()
|
||||
last_fetch_log_time_us_ = OB_INVALID_TIMESTAMP;
|
||||
last_record_sw_start_id_ = OB_INVALID_LOG_ID;
|
||||
last_notify_fetch_time_us_ = OB_INVALID_TIMESTAMP;
|
||||
last_purge_throttling_time_us_ = OB_INVALID_TIMESTAMP;
|
||||
}
|
||||
}
|
||||
|
||||
@ -121,6 +123,7 @@ void LogReconfirm::destroy()
|
||||
if (is_inited_) {
|
||||
is_inited_ = false;
|
||||
last_notify_fetch_time_us_ = OB_INVALID_TIMESTAMP;
|
||||
last_purge_throttling_time_us_ = OB_INVALID_TIMESTAMP;
|
||||
state_ = INITED;
|
||||
new_proposal_id_ = INVALID_PROPOSAL_ID;
|
||||
prepare_log_ack_list_.reset();
|
||||
@ -345,16 +348,26 @@ int LogReconfirm::ack_log_with_end_lsn_()
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool LogReconfirm::is_fetch_log_finished_() const
|
||||
bool LogReconfirm::is_fetch_log_finished_()
|
||||
{
|
||||
bool bool_ret = false;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
LSN max_flushed_end_lsn;
|
||||
(void) sw_->get_max_flushed_end_lsn(max_flushed_end_lsn);
|
||||
const LSN max_lsn = sw_->get_max_lsn();
|
||||
if (majority_max_lsn_ == max_flushed_end_lsn) {
|
||||
bool_ret = true;
|
||||
} else {
|
||||
//In a scenario with writhing throttling, ensure that the fetched logs are written to disk as soon
|
||||
//as possible.
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
if (OB_SUCCESS != (tmp_ret = purge_throttling_())) {
|
||||
PALF_LOG_RET(WARN, tmp_ret, "purge throttling failed", K_(palf_id), K_(majority_max_lsn),
|
||||
K(max_flushed_end_lsn), K(max_lsn));
|
||||
}
|
||||
}
|
||||
PALF_LOG(TRACE, "is_fetch_log_finished_", K_(palf_id), K(bool_ret), K_(majority_max_lsn), K(max_flushed_end_lsn));
|
||||
|
||||
PALF_LOG(TRACE, "is_fetch_log_finished_", K_(palf_id), K(bool_ret), K_(majority_max_lsn),
|
||||
K(max_flushed_end_lsn), K(max_lsn));
|
||||
return bool_ret;
|
||||
}
|
||||
|
||||
@ -467,6 +480,8 @@ int LogReconfirm::reconfirm()
|
||||
PALF_LOG(WARN, "submit_prepare_log_ failed", K_(palf_id));
|
||||
} else {
|
||||
state_ = FETCH_MAX_LOG_LSN;
|
||||
//reset last_purge_throttling_time_us_to avoid impacting purging throttling during RECONFIRM_FETCH_LOG
|
||||
last_purge_throttling_time_us_ = OB_INVALID_TIMESTAMP;
|
||||
PALF_EVENT("Reconfirm come into FETCH_MAX_LOG_LSN state", palf_id_, K_(self), K_(majority_max_accept_pid));
|
||||
}
|
||||
break;
|
||||
@ -618,11 +633,16 @@ int LogReconfirm::reconfirm()
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int LogReconfirm::purge_throttling_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(log_engine_->submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_RECONFIRM))) {
|
||||
PALF_LOG(WARN, "submit_purge_throttling_task", K_(palf_id));
|
||||
if (palf_reach_time_interval(100 * 1000L, last_purge_throttling_time_us_)) {
|
||||
if (OB_FAIL(log_engine_->submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_RECONFIRM))) {
|
||||
PALF_LOG(WARN, "submit_purge_throttling_task", K_(palf_id));
|
||||
} else {
|
||||
PALF_LOG(INFO, "submit_purge_throttling_task during reconfirming", K_(palf_id));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -63,12 +63,12 @@ public:
|
||||
TO_STRING_KV(K_(palf_id), K_(self), "state", state_to_string(state_), K_(new_proposal_id), K_(prepare_log_ack_list), \
|
||||
K_(curr_paxos_follower_list), K_(majority_cnt), K_(majority_max_log_server), \
|
||||
K_(majority_max_accept_pid), K_(majority_max_lsn), K_(saved_end_lsn), K_(last_submit_prepare_req_time_us), \
|
||||
K_(last_fetch_log_time_us), K_(last_record_sw_start_id), K_(last_notify_fetch_time_us), KP(this));
|
||||
K_(last_fetch_log_time_us), K_(last_record_sw_start_id), K_(last_notify_fetch_time_us), K_(last_purge_throttling_time_us), KP(this));
|
||||
private:
|
||||
int init_reconfirm_();
|
||||
int submit_prepare_log_();
|
||||
int wait_all_log_flushed_();
|
||||
bool is_fetch_log_finished_() const;
|
||||
bool is_fetch_log_finished_();
|
||||
bool is_confirm_log_finished_() const;
|
||||
int try_fetch_log_();
|
||||
int update_follower_end_lsn_(const common::ObAddr &server, const LSN &committed_end_lsn);
|
||||
@ -187,6 +187,7 @@ private:
|
||||
int64_t wait_slide_print_time_us_;
|
||||
int64_t wait_majority_time_us_;
|
||||
int64_t last_notify_fetch_time_us_;
|
||||
int64_t last_purge_throttling_time_us_;
|
||||
bool is_inited_;
|
||||
private:
|
||||
DISALLOW_COPY_AND_ASSIGN(LogReconfirm);
|
||||
|
@ -2251,7 +2251,7 @@ bool LogSlidingWindow::is_all_log_flushed_()
|
||||
if (OB_SUCCESS != (tmp_ret = lsn_allocator_.get_curr_end_lsn(curr_end_lsn))) {
|
||||
PALF_LOG_RET(WARN, tmp_ret, "get_curr_end_lsn failed", K(tmp_ret), K_(palf_id), K_(self));
|
||||
} else if (max_flushed_end_lsn < curr_end_lsn) {
|
||||
PALF_LOG_RET(WARN, OB_ERR_UNEXPECTED, "there is some log has not been flushed", K_(palf_id), K_(self), K(curr_end_lsn),
|
||||
PALF_LOG_RET(WARN, OB_EAGAIN, "there is some log has not been flushed", K_(palf_id), K_(self), K(curr_end_lsn),
|
||||
K(max_flushed_end_lsn), K_(max_flushed_lsn));
|
||||
} else {
|
||||
bool_ret = true;
|
||||
|
@ -723,6 +723,7 @@ int PalfEnvImpl::try_recycle_blocks()
|
||||
"warn_percent(%)", disk_opts_for_recycling_blocks.log_disk_utilization_threshold_,
|
||||
"limit_size(MB)", (total_size_to_recycle_blocks*disk_opts_for_recycling_blocks.log_disk_utilization_limit_threshold_)/100/MB,
|
||||
"limit_percent(%)", disk_opts_for_recycling_blocks.log_disk_utilization_limit_threshold_,
|
||||
"total_unrecyclable_size_byte(MB)", total_unrecyclable_size_byte/MB,
|
||||
"maximum_used_size(MB)", maximum_used_size/MB,
|
||||
"maximum_log_stream", palf_id,
|
||||
"oldest_log_stream", oldest_palf_id,
|
||||
@ -755,26 +756,24 @@ bool PalfEnvImpl::GetTotalUsedDiskSpace::operator() (const LSKey &ls_key, IPalfH
|
||||
bool_ret = false;
|
||||
} else {
|
||||
constexpr int64_t MB = 1024 * 1024;
|
||||
const int64_t used_size = palf_handle_impl->get_total_used_disk_space();
|
||||
if (used_size >= maximum_used_size_) {
|
||||
maximum_used_size_ = used_size;
|
||||
palf_id_ = ls_key.id_;
|
||||
}
|
||||
total_used_disk_space_ += palf_handle_impl->get_total_used_disk_space();
|
||||
LSN base_lsn;
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(palf_handle_impl->get_base_lsn(base_lsn))) {
|
||||
PALF_LOG(WARN, "failed to get_base_lsn", K(ls_key));
|
||||
int64_t used_size = 0;
|
||||
int64_t unrecyclable_size = 0;
|
||||
if (OB_FAIL(palf_handle_impl->get_total_used_disk_space(used_size, unrecyclable_size))) {
|
||||
PALF_LOG(WARN, "failed to get_total_used_disk_space", K(ls_key));
|
||||
ret_code_ = ret;
|
||||
bool_ret = false;
|
||||
} else {
|
||||
const int64_t unrecyclable_meta_size = (PALF_META_BLOCK_SIZE + MAX_INFO_BLOCK_SIZE);
|
||||
total_unrecyclable_disk_space_ += (palf_handle_impl->get_end_lsn() - base_lsn + unrecyclable_meta_size);
|
||||
if (used_size >= maximum_used_size_) {
|
||||
maximum_used_size_ = used_size;
|
||||
palf_id_ = ls_key.id_;
|
||||
}
|
||||
total_used_disk_space_ += used_size;
|
||||
total_unrecyclable_disk_space_ += unrecyclable_size;
|
||||
PALF_LOG(TRACE, "get_total_used_disk_space success", K(ls_key),
|
||||
"total_used_disk_space(MB):", total_used_disk_space_/MB,
|
||||
"total_unrecyclable_disk_space(MB):", total_unrecyclable_disk_space_/MB,
|
||||
"end_lsn", palf_handle_impl->get_end_lsn(),
|
||||
"base_lsn", base_lsn);
|
||||
"end_lsn", palf_handle_impl->get_end_lsn());
|
||||
}
|
||||
}
|
||||
return bool_ret;
|
||||
|
@ -132,8 +132,9 @@ public:
|
||||
ObSpinLockGuard guard(disk_opts_lock_);
|
||||
return Status::SHRINKING_STATUS == status_;
|
||||
}
|
||||
static constexpr int64_t MB = 1024*1024ll;
|
||||
TO_STRING_KV(K_(disk_opts_for_stopping_writing), K_(disk_opts_for_recycling_blocks), K_(status),
|
||||
K_(cur_unrecyclable_log_disk_size));
|
||||
"cur_unrecyclable_log_disk_size(MB)", cur_unrecyclable_log_disk_size_/MB);
|
||||
|
||||
private:
|
||||
int update_disk_options_not_guarded_by_lock_(const PalfDiskOptions &new_opts);
|
||||
|
@ -3355,15 +3355,15 @@ int PalfHandleImpl::ack_config_log(const common::ObAddr &server,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t PalfHandleImpl::get_total_used_disk_space() const
|
||||
int PalfHandleImpl::get_total_used_disk_space(int64_t &total_used_disk_space, int64_t &unrecyclable_disk_space) const
|
||||
{
|
||||
int64_t total_used_disk_space = 0;
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_FAIL(log_engine_.get_total_used_disk_space(total_used_disk_space))) {
|
||||
total_used_disk_space = 0;
|
||||
unrecyclable_disk_space = 0;
|
||||
if (OB_FAIL(log_engine_.get_total_used_disk_space(total_used_disk_space, unrecyclable_disk_space))) {
|
||||
PALF_LOG(WARN, "get_total_used_disk_space failed", K(ret), KPC(this));
|
||||
} else {
|
||||
}
|
||||
return total_used_disk_space;
|
||||
return ret;
|
||||
}
|
||||
|
||||
int PalfHandleImpl::advance_reuse_lsn(const LSN &flush_log_end_lsn)
|
||||
|
@ -502,7 +502,7 @@ public:
|
||||
virtual const share::SCN get_max_scn() const = 0;
|
||||
virtual const share::SCN get_end_scn() const = 0;
|
||||
virtual int get_last_rebuild_lsn(LSN &last_rebuild_lsn) const = 0;
|
||||
virtual int64_t get_total_used_disk_space() const = 0;
|
||||
virtual int get_total_used_disk_space(int64_t &total_used_disk_space, int64_t &unrecyclable_disk_space) const = 0;
|
||||
virtual const LSN &get_base_lsn_used_for_block_gc() const = 0;
|
||||
// @desc: get ack_info_array and degraded_list for judging to degrade/upgrade
|
||||
// @params [in] member_ts_array: ack info array of all paxos members
|
||||
@ -850,7 +850,7 @@ public:
|
||||
return sw_.get_last_slide_scn();
|
||||
}
|
||||
int get_last_rebuild_lsn(LSN &last_rebuild_lsn) const override final;
|
||||
int64_t get_total_used_disk_space() const;
|
||||
int get_total_used_disk_space(int64_t &total_used_disk_space, int64_t &unrecyclable_disk_space) const;
|
||||
// return the smallest recycable lsn
|
||||
const LSN &get_base_lsn_used_for_block_gc() const override final
|
||||
{
|
||||
|
@ -185,8 +185,9 @@ struct PalfThrottleOptions
|
||||
// size of available log disk when writing throttling triggered
|
||||
inline int64_t get_available_size_after_limit() const;
|
||||
inline bool need_throttling() const;
|
||||
TO_STRING_KV(K_(total_disk_space), K_(stopping_writing_percentage),
|
||||
K_(trigger_percentage), K_(unrecyclable_disk_space));
|
||||
static constexpr int64_t MB = 1024*1024ll;
|
||||
TO_STRING_KV("total_disk_space", total_disk_space_ / MB, K_(stopping_writing_percentage),
|
||||
K_(trigger_percentage), "unrecyclable_disk_space(MB)", unrecyclable_disk_space_ / MB);
|
||||
public:
|
||||
int64_t total_disk_space_;
|
||||
int64_t stopping_writing_percentage_;
|
||||
|
Loading…
x
Reference in New Issue
Block a user