support multi LogIOWorker

This commit is contained in:
HaHaJeff
2023-06-05 02:57:22 +00:00
committed by ob-robot
parent 305786a690
commit 24bf0c60d8
25 changed files with 941 additions and 611 deletions

View File

@ -431,10 +431,6 @@ int ObSimpleLogClusterTestEnv::update_disk_options(const int64_t server_id, cons
int ret = OB_SUCCESS;
const int64_t MB = 1024 * 1024;
PalfOptions opts;
opts.disk_options_.log_disk_usage_limit_size_ = file_block_num * PALF_PHY_BLOCK_SIZE;
opts.disk_options_.log_disk_utilization_threshold_ = 80;
opts.disk_options_.log_disk_utilization_limit_threshold_ = 95;
opts.disk_options_.log_disk_throttling_percentage_ = 100;
auto cluster = get_cluster();
if (server_id >= 0 && server_id < cluster.size()) {
ObTenantEnv::set_tenant(cluster[server_id]->get_tenant_base());
@ -443,7 +439,12 @@ int ObSimpleLogClusterTestEnv::update_disk_options(const int64_t server_id, cons
ret = OB_NOT_SUPPORTED;
} else {
auto palf_env_impl = dynamic_cast<PalfEnvImpl*>(srv->get_palf_env());
ret = palf_env_impl->update_options(opts);
if (OB_FAIL(palf_env_impl->get_options(opts))) {
PALF_LOG(ERROR, "get_optiosn failed", K(ret), K(server_id));
} else {
opts.disk_options_.log_disk_usage_limit_size_ = file_block_num * PALF_PHY_BLOCK_SIZE;
ret = palf_env_impl->update_options(opts);
}
}
}
return ret;

View File

@ -289,6 +289,7 @@ int ObSimpleLogServer::init_log_service_()
opts.disk_options_.log_disk_utilization_threshold_ = 80;
opts.disk_options_.log_disk_utilization_limit_threshold_ = 95;
opts.disk_options_.log_disk_throttling_percentage_ = 100;
opts.disk_options_.log_writer_parallelism_ = 2;
std::string clog_dir = clog_dir_ + "/tenant_1";
allocator_ = OB_NEW(ObTenantMutilAllocator, "TestBase", node_id_);
ObMemAttr attr(1, "SimpleLog");

View File

@ -540,8 +540,10 @@ TEST_F(TestObSimpleLogClusterBasicFunc, io_reducer_basic)
int64_t leader_idx = 0;
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
leader.palf_env_impl_->log_io_worker_wrapper_.user_log_io_worker_.batch_io_task_mgr_.has_batched_size_ = 0;
leader.palf_env_impl_->log_io_worker_wrapper_.user_log_io_worker_.batch_io_task_mgr_.handle_count_ = 0;
LogIOWorker *iow = leader.palf_handle_impl_->log_engine_.log_io_worker_;
iow->batch_io_task_mgr_.has_batched_size_ = 0;
iow->batch_io_task_mgr_.handle_count_ = 0;
std::vector<PalfHandleImplGuard*> palf_list;
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
int64_t lag_follower_idx = (leader_idx + 1) % node_cnt_;
@ -552,8 +554,8 @@ TEST_F(TestObSimpleLogClusterBasicFunc, io_reducer_basic)
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10000, leader_idx, 120));
const LSN max_lsn = leader.palf_handle_impl_->get_max_lsn();
wait_lsn_until_flushed(max_lsn, leader);
const int64_t has_batched_size = leader.palf_env_impl_->log_io_worker_wrapper_.user_log_io_worker_.batch_io_task_mgr_.has_batched_size_;
const int64_t handle_count = leader.palf_env_impl_->log_io_worker_wrapper_.user_log_io_worker_.batch_io_task_mgr_.handle_count_;
const int64_t has_batched_size = iow->batch_io_task_mgr_.has_batched_size_;
const int64_t handle_count = iow->batch_io_task_mgr_.handle_count_;
const int64_t log_id = leader.palf_handle_impl_->sw_.get_max_log_id();
PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "batched_size", K(has_batched_size), K(log_id));
@ -567,8 +569,9 @@ TEST_F(TestObSimpleLogClusterBasicFunc, io_reducer_basic)
PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "follower is lagged", K(max_lsn), K(lag_follower_max_lsn));
lag_follower_max_lsn = lag_follower.palf_handle_impl_->sw_.max_flushed_end_lsn_;
}
const int64_t follower_has_batched_size = lag_follower.palf_env_impl_->log_io_worker_wrapper_.user_log_io_worker_.batch_io_task_mgr_.has_batched_size_;
const int64_t follower_handle_count = lag_follower.palf_env_impl_->log_io_worker_wrapper_.user_log_io_worker_.batch_io_task_mgr_.handle_count_;
LogIOWorker *iow_follower = lag_follower.palf_handle_impl_->log_engine_.log_io_worker_;
const int64_t follower_has_batched_size = iow_follower->batch_io_task_mgr_.has_batched_size_;
const int64_t follower_handle_count = iow_follower->batch_io_task_mgr_.handle_count_;
EXPECT_EQ(OB_SUCCESS, revert_cluster_palf_handle_guard(palf_list));
int64_t cost_ts = ObTimeUtility::current_time() - start_ts;

View File

@ -467,7 +467,7 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_1, leader_idx_1, leader_1));
EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx_1, palf_env));
LogIOWorker *log_io_worker = &palf_env->palf_env_impl_.log_io_worker_wrapper_.user_log_io_worker_;
LogIOWorker *log_io_worker = leader_1.palf_handle_impl_->log_engine_.log_io_worker_;;
int64_t prev_log_id_1 = 0;
int64_t prev_has_batched_size = 0;
@ -520,6 +520,7 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
IOTaskVerify io_task_verify_2(id_2, log_engine->palf_epoch_);
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_2, leader_idx_2, leader_2));
{
LogIOWorker *log_io_worker = leader_2.palf_handle_impl_->log_engine_.log_io_worker_;;
// 聚合度为1的忽略
EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_2));
EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1, id_1, 110));
@ -588,6 +589,7 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
IOTaskVerify io_task_verify_3(id_3, log_engine->palf_epoch_);
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_3, leader_idx_3, leader_3));
{
LogIOWorker *log_io_worker = leader_3.palf_handle_impl_->log_engine_.log_io_worker_;;
EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_3));
EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1, id_1, 110));
EXPECT_EQ(OB_SUCCESS, submit_log(leader_2, 1, id_2, 110));
@ -635,6 +637,7 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
IOTaskVerify io_task_verify_4(id_4, log_engine->palf_epoch_);
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_4, leader_idx_4, leader_4));
{
LogIOWorker *log_io_worker = leader_4.palf_handle_impl_->log_engine_.log_io_worker_;;
EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_4));
EXPECT_EQ(OB_SUCCESS, submit_log(leader_4, 10, id_4, 110));
sleep(1);
@ -650,9 +653,13 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
sleep(1);
leader_4.palf_handle_impl_->log_engine_.palf_epoch_++;
io_task_cond_4.cond_.signal();
PALF_LOG(INFO, "after signal");
LSN log_tail = leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_;
PALF_LOG(INFO, "after signal", K(max_lsn), K(log_tail));
wait_lsn_until_flushed(max_lsn, leader_4);
EXPECT_EQ(max_lsn, leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_);
sleep(1);
log_tail = leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_;
PALF_LOG(INFO, "after flused case 4", K(max_lsn), K(log_tail));
EXPECT_EQ(max_lsn, log_tail);
}
// 测试truncate
@ -666,6 +673,7 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
TruncateLogCbCtx ctx(LSN(0));
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_5, leader_idx_5, leader_5));
{
LogIOWorker *log_io_worker = leader_5.palf_handle_impl_->log_engine_.log_io_worker_;;
EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_5));
EXPECT_EQ(OB_SUCCESS, submit_log(leader_5, 10, id_5, 110));
LSN max_lsn = leader_5.palf_handle_impl_->sw_.get_max_lsn();
@ -693,6 +701,7 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
IOTaskVerify io_task_verify_6(id_6, log_engine->palf_epoch_);
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_6, leader_idx_6, leader_6));
{
LogIOWorker *log_io_worker = leader_6.palf_handle_impl_->log_engine_.log_io_worker_;;
{
EXPECT_EQ(OB_SUCCESS, submit_log(leader_6, 15, id_6, MAX_LOG_BODY_SIZE));
sleep(2);
@ -731,7 +740,8 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func)
io_task_cond_6.cond_.signal();
//EXPECT_EQ(max_lsn, leader_6.palf_handle_.palf_handle_impl_->log_engine_.log_storage_.log_tail_);
wait_lsn_until_flushed(max_lsn3, leader_6);
EXPECT_EQ(max_lsn3, leader_6.palf_handle_impl_->log_engine_.log_storage_.log_tail_);
LSN log_tail = leader_6.palf_handle_impl_->log_engine_.log_storage_.log_tail_;
EXPECT_EQ(max_lsn3, log_tail);
}
PALF_LOG(INFO, "end io_reducer_basic_func");

View File

@ -210,7 +210,7 @@ TEST_F(TestObSimpleLogClusterRebuild, test_old_leader_rebuild)
// submit a cond task before unblocking net to stop truncating task
IOTaskCond cond(id, rebuild_server->palf_env_impl_->last_palf_epoch_);
LogIOWorker *io_worker = &rebuild_server->palf_env_impl_->log_io_worker_wrapper_.user_log_io_worker_;
LogIOWorker *io_worker = rebuild_server->palf_handle_impl_->log_engine_.log_io_worker_;
io_worker->submit_io_task(&cond);
// after unblocking net, old leader will do rebuild

View File

@ -1487,7 +1487,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator_with_flashback)
EXPECT_EQ(true, iterator.iterator_impl_.curr_entry_is_raw_write_);
// 需要从磁盘上将后面两日志读上来,但由于受控回放不会吐出
EXPECT_FALSE(iterator_end_lsn == iterator.iterator_storage_.end_lsn_);
// EXPECT_FALSE(iterator_end_lsn == iterator.iterator_storage_.end_lsn_);
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn2, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(false, iterate_end_by_replayable_point);
EXPECT_EQ(true, iterator.iterator_impl_.curr_entry_is_raw_write_);
@ -1503,7 +1503,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator_with_flashback)
LSN last_lsn = raw_write_leader.palf_handle_impl_->get_max_lsn();
SCN last_scn = raw_write_leader.palf_handle_impl_->get_max_scn();
LogIOWorker *io_worker = &raw_write_leader.palf_env_impl_->log_io_worker_wrapper_.user_log_io_worker_;
LogIOWorker *io_worker = raw_write_leader.palf_handle_impl_->log_engine_.log_io_worker_;
IOTaskCond cond(id_raw_write, raw_write_leader.palf_env_impl_->last_palf_epoch_);
io_worker->submit_io_task(&cond);
std::vector<LSN> lsns;

View File

@ -73,8 +73,9 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_sys)
palf_env_impl.disk_options_wrapper_.set_cur_unrecyclable_log_disk_size(unrecyclable_size);
PalfThrottleOptions throttle_options;
palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
LogIOWorker *log_io_worker = &(palf_env_impl.log_io_worker_wrapper_.user_log_io_worker_);
LogWritingThrottle &throttle = log_io_worker->throttle_;
LogIOWorker *log_io_worker = leader.palf_handle_impl_->log_engine_.log_io_worker_;;
LogWritingThrottle *throttle = log_io_worker->throttle_;
// sys log stream no need throttling
PalfThrottleOptions invalid_throttle_options;
PALF_LOG(INFO, "prepare for throttling");
@ -90,9 +91,9 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_sys)
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 2 * MB));
max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn, leader);
ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_);
ASSERT_EQ(false, throttle.need_throttling_with_options_());
ASSERT_EQ(false, throttle.need_throttling_());
ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_);
ASSERT_EQ(false, throttle->need_throttling_with_options_not_guarded_by_lock_());
ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
int64_t break_ts = common::ObClockGenerator::getClock();
ASSERT_EQ(true, (break_ts - cur_ts) < 1 * 1000 * 1000);
@ -133,8 +134,8 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic)
palf_env_impl.disk_options_wrapper_.set_cur_unrecyclable_log_disk_size(unrecyclable_size);
PalfThrottleOptions throttle_options;
palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
LogIOWorker *log_io_worker = &(palf_env_impl.log_io_worker_wrapper_.user_log_io_worker_);
LogWritingThrottle &throttle = log_io_worker->throttle_;
LogIOWorker *log_io_worker = leader.palf_handle_impl_->log_engine_.log_io_worker_;;
LogWritingThrottle *throttle = log_io_worker->throttle_;
PalfThrottleOptions invalid_throttle_options;
PALF_LOG(INFO, "[CASE 1]: test no need throttling while unrecyclable_log_disk_size is no more than trigger_size");
@ -142,10 +143,10 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic)
LSN max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
wait_lsn_until_flushed(max_lsn_1, leader);
ASSERT_EQ(true, throttle.last_update_ts_ != OB_INVALID_TIMESTAMP);
ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_);
ASSERT_EQ(false, throttle.need_throttling_with_options_());
ASSERT_EQ(false, throttle.need_throttling_());
ASSERT_EQ(true, throttle->last_update_ts_ != OB_INVALID_TIMESTAMP);
ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_);
ASSERT_EQ(false, throttle->need_throttling_with_options_not_guarded_by_lock_());
ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
PALF_LOG(INFO, "[CASE 2] no need throttling while log_disk_throttling_percentage_ is off");
usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
@ -153,9 +154,9 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic)
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 31, id, 1 * MB));
max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn_1, leader);
ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_);
ASSERT_EQ(false, throttle.need_throttling_with_options_());
ASSERT_EQ(false, throttle.need_throttling_());
ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_);
ASSERT_EQ(false, throttle->need_throttling_with_options_not_guarded_by_lock_());
ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
PALF_LOG(INFO, "[CASE 3] need throttling");
LogEntryHeader log_entry_header;
@ -173,67 +174,67 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic)
max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn_1, leader);
PALF_LOG(INFO, "case 3: after submit_log", K(before_lsn), K(end_lsn), K(max_lsn_1));
ASSERT_EQ(throttle_options, throttle.throttling_options_);
ASSERT_EQ(true, throttle.need_throttling_());
ASSERT_EQ(true, throttle.stat_.has_ever_throttled());
ASSERT_EQ(log_size, throttle.stat_.total_throttling_size_);
ASSERT_EQ(1, throttle.stat_.total_throttling_task_cnt_);
ASSERT_EQ(0, throttle.stat_.total_skipped_task_cnt_);
ASSERT_EQ(0, throttle.stat_.total_skipped_size_);
ASSERT_EQ(log_size, throttle.appended_log_size_cur_round_);
ASSERT_EQ(5, throttle.submitted_seq_);
ASSERT_EQ(5, throttle.handled_seq_);
ASSERT_EQ(throttle_options, throttle->throttling_options_);
ASSERT_EQ(true, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
ASSERT_EQ(log_size, throttle->stat_.total_throttling_size_);
ASSERT_EQ(1, throttle->stat_.total_throttling_task_cnt_);
ASSERT_EQ(0, throttle->stat_.total_skipped_task_cnt_);
ASSERT_EQ(0, throttle->stat_.total_skipped_size_);
ASSERT_EQ(log_size, throttle->appended_log_size_cur_round_);
ASSERT_EQ(5, log_io_worker->purge_throttling_task_submitted_seq_);
ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_);
PALF_LOG(INFO, "[CASE 4] no need throttling because trigger_percentage is set to 100");
PALF_LOG(INFO, "[CASE 4] no need throttling because trigger_percentage is set to 100", KPC(throttle));
palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * MB));
max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn_1, leader);
ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_);
ASSERT_EQ(false, throttle.need_throttling_());
ASSERT_EQ(true, throttle.stat_.has_ever_throttled());
ASSERT_EQ(log_size, throttle.stat_.total_throttling_size_);
ASSERT_EQ(1, throttle.stat_.total_throttling_task_cnt_);
ASSERT_EQ(0, throttle.stat_.total_skipped_task_cnt_);
ASSERT_EQ(0, throttle.stat_.total_skipped_size_);
ASSERT_EQ(0, throttle.appended_log_size_cur_round_);
ASSERT_EQ(5, throttle.submitted_seq_);
ASSERT_EQ(5, throttle.handled_seq_);
ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_);
ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
ASSERT_EQ(log_size, throttle->stat_.total_throttling_size_);
ASSERT_EQ(1, throttle->stat_.total_throttling_task_cnt_);
ASSERT_EQ(0, throttle->stat_.total_skipped_task_cnt_);
ASSERT_EQ(0, throttle->stat_.total_skipped_size_);
ASSERT_EQ(0, throttle->appended_log_size_cur_round_);
ASSERT_EQ(5, log_io_worker->purge_throttling_task_submitted_seq_);
ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_);
PALF_LOG(INFO, "[CASE 5] no need throttling because log_disk_size is set to 500M");
PALF_LOG(INFO, "[CASE 5] no need throttling because log_disk_size is set to 500M", KPC(throttle));
palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 40;
usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * MB));
max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn_1, leader);
ASSERT_EQ(throttle_options, throttle.throttling_options_);
ASSERT_EQ(true, throttle.need_throttling_());
ASSERT_EQ(true, throttle.stat_.has_ever_throttled());
ASSERT_EQ(throttle_options, throttle->throttling_options_);
ASSERT_EQ(true, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
PALF_LOG(INFO, "[CASE 5] no need throttling because log_disk_size is set to 500M", K(throttle));
ASSERT_EQ(log_size, throttle.stat_.total_throttling_size_);
ASSERT_EQ(1, throttle.stat_.total_throttling_task_cnt_);
ASSERT_EQ(0, throttle.stat_.total_skipped_task_cnt_);
ASSERT_EQ(0, throttle.stat_.total_skipped_size_);
ASSERT_EQ(log_size, throttle.appended_log_size_cur_round_);
ASSERT_EQ(5, throttle.submitted_seq_);
ASSERT_EQ(5, throttle.handled_seq_);
ASSERT_EQ(log_size, throttle->stat_.total_throttling_size_);
ASSERT_EQ(1, throttle->stat_.total_throttling_task_cnt_);
ASSERT_EQ(0, throttle->stat_.total_skipped_task_cnt_);
ASSERT_EQ(0, throttle->stat_.total_skipped_size_);
ASSERT_EQ(log_size, throttle->appended_log_size_cur_round_);
ASSERT_EQ(5, log_io_worker->purge_throttling_task_submitted_seq_);
ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_);
palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ = 500 * MB;
usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * MB));
max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn_1, leader);
ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_);
ASSERT_EQ(false, throttle.need_throttling_());
ASSERT_EQ(true, throttle.stat_.has_ever_throttled());
ASSERT_EQ(true, OB_INVALID_TIMESTAMP != throttle.stat_.stop_ts_);
ASSERT_EQ(log_size, throttle.stat_.total_throttling_size_);
ASSERT_EQ(1, throttle.stat_.total_throttling_task_cnt_);
ASSERT_EQ(0, throttle.stat_.total_skipped_task_cnt_);
ASSERT_EQ(0, throttle.stat_.total_skipped_size_);
ASSERT_EQ(0, throttle.appended_log_size_cur_round_);
ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_);
ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
ASSERT_EQ(true, OB_INVALID_TIMESTAMP != throttle->stat_.stop_ts_);
ASSERT_EQ(log_size, throttle->stat_.total_throttling_size_);
ASSERT_EQ(1, throttle->stat_.total_throttling_task_cnt_);
ASSERT_EQ(0, throttle->stat_.total_skipped_task_cnt_);
ASSERT_EQ(0, throttle->stat_.total_skipped_size_);
ASSERT_EQ(0, throttle->appended_log_size_cur_round_);
max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
wait_lsn_until_flushed(max_lsn_1, leader);
@ -245,8 +246,8 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic)
LogEngine *log_engine = &leader.palf_handle_impl_->log_engine_;
IOTaskCond io_task_cond_1(id, log_engine->palf_epoch_);
EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_1));
ASSERT_EQ(6, throttle.submitted_seq_);
ASSERT_EQ(5, throttle.handled_seq_);
ASSERT_EQ(6, log_io_worker->purge_throttling_task_submitted_seq_);
ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_);
leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 1 * MB));
max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn();
@ -254,25 +255,25 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic)
wait_lsn_until_submitted(max_lsn_1, leader);
IOTaskCond io_task_cond_2(id, log_engine->palf_epoch_);
EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_2));
ASSERT_EQ(7, throttle.submitted_seq_);
ASSERT_EQ(5, throttle.handled_seq_);
ASSERT_EQ(7, log_io_worker->purge_throttling_task_submitted_seq_);
ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_);
io_task_cond_1.cond_.signal();
wait_lsn_until_flushed(max_lsn_1, leader);
usleep(10 * 1000);
ASSERT_EQ(7, throttle.submitted_seq_);
ASSERT_EQ(6, throttle.handled_seq_);
ASSERT_EQ(7, log_io_worker->purge_throttling_task_submitted_seq_);
ASSERT_EQ(6, log_io_worker->purge_throttling_task_handled_seq_);
io_task_cond_2.cond_.signal();
usleep(10 * 1000);
ASSERT_EQ(7, throttle.submitted_seq_);
ASSERT_EQ(7, throttle.handled_seq_);
ASSERT_EQ(7, log_io_worker->purge_throttling_task_submitted_seq_);
ASSERT_EQ(7, log_io_worker->purge_throttling_task_handled_seq_);
palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
ASSERT_EQ(true, throttle.need_throttling_());
ASSERT_EQ(true, throttle.stat_.has_ever_throttled());
ASSERT_EQ(true, throttle.stat_.start_ts_ > cur_ts);
ASSERT_EQ(0, throttle.stat_.total_throttling_size_);
ASSERT_EQ(0, throttle.stat_.total_throttling_task_cnt_);
ASSERT_EQ(10, throttle.stat_.total_skipped_task_cnt_);
ASSERT_EQ(10 * (log_size), throttle.stat_.total_skipped_size_);
ASSERT_EQ(true, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
ASSERT_EQ(true, throttle->stat_.start_ts_ > cur_ts);
ASSERT_EQ(0, throttle->stat_.total_throttling_size_);
ASSERT_EQ(0, throttle->stat_.total_throttling_task_cnt_);
ASSERT_EQ(10, throttle->stat_.total_skipped_task_cnt_);
ASSERT_EQ(10 * (log_size), throttle->stat_.total_skipped_size_);
int64_t cur_has_batched_size = log_io_worker->batch_io_task_mgr_.has_batched_size_;
// no io reduce during writing throttling
ASSERT_EQ(cur_has_batched_size, prev_has_batched_size);
@ -287,8 +288,8 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic)
wait_lsn_until_flushed(max_lsn_2, leader);
int64_t break_ts = common::ObClockGenerator::getClock();
EXPECT_EQ(true, (break_ts-cur_ts) < 1 * 1000 * 1000);
ASSERT_EQ(false, throttle.need_throttling_());
ASSERT_EQ(true, throttle.stat_.has_ever_throttled());
ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_));
ASSERT_EQ(true, throttle->stat_.has_ever_throttled());
palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_throttling_percentage_ = throttling_percentage;
@ -321,6 +322,7 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic)
palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 40;
usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
cur_ts = common::ObClockGenerator::getClock();
leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE;
ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 2 * MB));

View File

@ -728,23 +728,21 @@ int MockTenantModuleEnv::start_()
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(ERROR, "fail to switch to sys tenant", KP(log_service));
} else {
palf::LogIOWorkerConfig log_io_worker_config;
log_io_worker_config.io_worker_num_ = 1;
log_io_worker_config.io_queue_capcity_ = 100 * 1024;
log_io_worker_config.batch_width_ = 8;
log_io_worker_config.batch_depth_ = palf::PALF_SLIDING_WINDOW_SIZE;
palf::LogIOWorker &use_io_worker = log_service->palf_env_->palf_env_impl_.log_io_worker_wrapper_.user_log_io_worker_;
if (OB_FAIL(use_io_worker.init(log_io_worker_config, tenant_id,
log_service->palf_env_->palf_env_impl_.cb_thread_pool_.get_tg_id(),
log_service->palf_env_->palf_env_impl_.log_alloc_mgr_,
&log_service->palf_env_->palf_env_impl_))) {
STORAGE_LOG(ERROR, "fail to init user_io_worker", KP(log_service));
} else if (OB_FAIL(use_io_worker.start())) {
STORAGE_LOG(ERROR, "fail to init start", KP(log_service));
palf::PalfEnvImpl *palf_env_impl = &log_service->palf_env_->palf_env_impl_;
palf::LogIOWorkerWrapper &log_iow_wrapper = palf_env_impl->log_io_worker_wrapper_;
palf::LogIOWorkerConfig new_config;
const int64_t mock_tenant_id = 1;
palf_env_impl->init_log_io_worker_config_(1, mock_tenant_id, new_config);
new_config.io_worker_num_ = 4;
log_iow_wrapper.destory_and_free_log_io_workers_();
if (OB_FAIL(log_iow_wrapper.create_and_init_log_io_workers_(
new_config, mock_tenant_id, palf_env_impl->cb_thread_pool_.get_tg_id(), palf_env_impl->log_alloc_mgr_, palf_env_impl))) {
STORAGE_LOG(WARN, "failed to create_and_init_log_io_workers_", K(new_config));
} else if (FALSE_IT(log_iow_wrapper.log_writer_parallelism_ = new_config.io_worker_num_)) {
} else if (FALSE_IT(log_iow_wrapper.is_user_tenant_ = true)) {
} else if (OB_FAIL(log_iow_wrapper.start_())) {
STORAGE_LOG(WARN, "failed to start_ log_iow_wrapper", K(new_config));
} else {
//set this to stop user_io_worker
log_service->palf_env_->palf_env_impl_.log_io_worker_wrapper_.is_user_tenant_ = true;
}
}
}

View File

@ -144,6 +144,7 @@ ob_set_subtarget(ob_logservice palf
palf/log_updater.cpp
palf/log_io_utils.cpp
palf/log_io_worker_wrapper.cpp
palf/log_throttle.cpp
)
ob_set_subtarget(ob_logservice palf_election

View File

@ -521,6 +521,7 @@ int ObLogService::update_palf_options_except_disk_usage_limit_size()
palf_opts.compress_options_.enable_transport_compress_ = tenant_config->log_transport_compress_all;
palf_opts.compress_options_.transport_compress_func_ = compressor_type;
palf_opts.rebuild_replica_log_lag_threshold_ = tenant_config->_rebuild_replica_log_lag_threshold;
palf_opts.disk_options_.log_writer_parallelism_ = tenant_config->_log_writer_parallelism;
if (OB_FAIL(palf_env_->update_options(palf_opts))) {
CLOG_LOG(WARN, "palf update_options failed", K(MTL_ID()), K(ret), K(palf_opts));
} else {

View File

@ -20,6 +20,7 @@
#include "share/ob_throttling_utils.h" //ObThrottlingUtils
#include "log_io_task.h" // LogIOTask
#include "palf_env_impl.h" // PalfEnvImpl
#include "log_throttle.h" // LogWritingThrottle
namespace oceanbase
{
@ -27,195 +28,6 @@ using namespace common;
using namespace share;
namespace palf
{
void LogThrottlingStat::reset()
{
start_ts_ = OB_INVALID_TIMESTAMP;
stop_ts_ = OB_INVALID_TIMESTAMP;
total_throttling_interval_ = 0;
total_throttling_size_ = 0;
total_throttling_task_cnt_ = 0;
total_skipped_size_ = 0;
total_skipped_task_cnt_ = 0;
max_throttling_interval_ = 0;
}
void LogWritingThrottle::reset()
{
last_update_ts_ = OB_INVALID_TIMESTAMP;
need_writing_throttling_notified_ = false;
ATOMIC_SET(&submitted_seq_, 0);
ATOMIC_SET(&handled_seq_, 0);
appended_log_size_cur_round_ = 0;
decay_factor_ = 0;
throttling_options_.reset();
stat_.reset();
}
int LogWritingThrottle::update_throttling_options(IPalfEnvImpl *palf_env_impl)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(palf_env_impl)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("palf_env_impl is NULL", KPC(this));
} else {
int64_t cur_ts = ObClockGenerator::getClock();
bool unused_has_freed_up_space = false;
if ((cur_ts > last_update_ts_ + UPDATE_INTERVAL_US)
&& OB_FAIL(update_throtting_options_(palf_env_impl, unused_has_freed_up_space))) {
LOG_WARN("failed to update_throttling_info", KPC(this));
}
}
return ret;
}
int LogWritingThrottle::throttling(const int64_t throttling_size, IPalfEnvImpl *palf_env_impl)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(palf_env_impl)) {
ret = OB_INVALID_ARGUMENT;
} else if (throttling_size < 0) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid throttling_size", K(throttling_size), KPC(this));
} else if (0 == throttling_size) {
//no need throttling
} else {
if (need_throttling_()) {
const int64_t cur_unrecyclable_size = throttling_options_.unrecyclable_disk_space_ + appended_log_size_cur_round_;
const int64_t trigger_base_log_disk_size = throttling_options_.total_disk_space_ * throttling_options_.trigger_percentage_ / 100;
int64_t time_interval = 0;
if (OB_FAIL(ObThrottlingUtils::get_throttling_interval(THROTTLING_CHUNK_SIZE, throttling_size, trigger_base_log_disk_size,
cur_unrecyclable_size, decay_factor_, time_interval))) {
LOG_WARN("failed to get_throttling_interval", KPC(this));
}
int64_t remain_interval_us = time_interval;
bool has_freed_up_space = false;
while (OB_SUCC(ret) && remain_interval_us > 0) {
const int64_t real_interval = MIN(remain_interval_us, DETECT_INTERVAL_US);
usleep(real_interval);
remain_interval_us -= real_interval;
if (remain_interval_us <= 0) {
//do nothing
} else if (OB_FAIL(update_throtting_options_(palf_env_impl, has_freed_up_space))) {
LOG_WARN("failed to update_throttling_info_", KPC(this), K(time_interval), K(remain_interval_us));
} else if (!need_throttling_() || has_freed_up_space) {
LOG_TRACE("no need throttling or log disk has been freed up", KPC(this), K(time_interval), K(remain_interval_us));
break;
}
}
stat_.after_throttling(time_interval - remain_interval_us, throttling_size);
} else if (need_throttling_with_options_()) {
stat_.after_throttling(0, throttling_size);
}
if (stat_.has_ever_throttled()) {
if (REACH_TIME_INTERVAL(2 * 1000 * 1000L)) {
PALF_LOG(INFO, "[LOG DISK THROTTLING] [STAT]", KPC(this));
}
}
}
return ret;
}
int LogWritingThrottle::after_append_log(const int64_t log_size, const int64_t seq)
{
appended_log_size_cur_round_ += log_size;
int ret = OB_SUCCESS;
if (OB_UNLIKELY(log_size < 0 || seq < 0)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "invalid argument", K(seq), K(log_size));
} else if (seq > 0) {
if (seq > ATOMIC_LOAD(&handled_seq_) && seq <= ATOMIC_LOAD(&submitted_seq_)) {
ATOMIC_SET(&handled_seq_, seq);
} else {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "unexpected seq", KPC(this), K(seq));
}
} else {/*do nothing*/}
return ret;
}
int LogWritingThrottle::update_throtting_options_(IPalfEnvImpl *palf_env_impl, bool &has_freed_up_space)
{
int ret = OB_SUCCESS;
const int64_t cur_ts = ObClockGenerator::getClock();
if (ATOMIC_LOAD(&need_writing_throttling_notified_)) {
PalfThrottleOptions new_throttling_options;
if (OB_FAIL(palf_env_impl->get_throttling_options(new_throttling_options))) {
PALF_LOG(WARN, "failed to get_writing_throttling_option");
} else if (OB_UNLIKELY(!new_throttling_options.is_valid())) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(WARN, "options is invalid", K(new_throttling_options), KPC(this));
} else {
const bool need_throttling = new_throttling_options.need_throttling();
const int64_t new_available_size_after_limit = new_throttling_options.get_available_size_after_limit();
bool need_update_decay_factor = false;
bool need_start_throttling = false;
if (need_throttling) {
if (!throttling_options_.need_throttling()) {
need_start_throttling = true;
need_update_decay_factor = true;
} else {
need_update_decay_factor = (throttling_options_.get_available_size_after_limit() != new_available_size_after_limit);
}
if (need_update_decay_factor) {
if (OB_FAIL(ObThrottlingUtils::calc_decay_factor(new_available_size_after_limit, THROTTLING_DURATION_US,
THROTTLING_CHUNK_SIZE, decay_factor_))) {
PALF_LOG(ERROR, "failed to calc_decay_factor", K(throttling_options_), "duration(s)",
THROTTLING_DURATION_US / (1000 * 1000), K(THROTTLING_CHUNK_SIZE));
} else {
PALF_LOG(INFO, "[LOG DISK THROTTLING] success to calc_decay_factor", K(decay_factor_), K(throttling_options_),
K(new_throttling_options), "duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE),
KPC(this));
}
}
if (OB_SUCC(ret)) {
// update other field
has_freed_up_space = new_throttling_options.unrecyclable_disk_space_ < throttling_options_.unrecyclable_disk_space_;
bool has_unrecyclable_space_changed = new_throttling_options.unrecyclable_disk_space_ != throttling_options_.unrecyclable_disk_space_;
if (has_unrecyclable_space_changed || need_start_throttling) {
// reset appended_log_size_cur_round_ when unrecyclable_disk_space_ changed
appended_log_size_cur_round_ = 0;
}
throttling_options_ = new_throttling_options;
if (need_start_throttling) {
const LogThrottlingStat old_stat = stat_;
stat_.start_throttling();
PALF_LOG(INFO, "[LOG DISK THROTTLING] [START]", K(old_stat), KPC(this),
"duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE));
}
}
} else {
if (throttling_options_.need_throttling()) {
PALF_LOG(INFO, "[LOG DISK THROTTLING] [STOP]", KPC(this),
"duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE));
clean_up_();
stat_.stop_throttling();
}
}
}
} else {
if (throttling_options_.need_throttling()) {
PALF_LOG(INFO, "[LOG DISK THROTTLING] [STOP] no need throttling any more", KPC(this),
"duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE));
clean_up_();
stat_.stop_throttling();
}
}
if (OB_SUCC(ret)) {
last_update_ts_ = cur_ts;
}
return ret;
}
void LogWritingThrottle::clean_up_()
{
//do not reset submitted_seq_ && handled_seq_ && last_update_ts_ && stat_
appended_log_size_cur_round_ = 0;
decay_factor_ = 0;
throttling_options_.reset();
}
LogIOWorker::LogIOWorker()
: log_io_worker_num_(-1),
cb_thread_pool_tg_id_(-1),
@ -224,7 +36,11 @@ LogIOWorker::LogIOWorker()
do_task_count_(0),
print_log_interval_(OB_INVALID_TIMESTAMP),
last_working_time_(OB_INVALID_TIMESTAMP),
throttle_(NULL),
log_io_worker_queue_size_stat_("[PALF STAT LOG IO WORKER QUEUE SIZE]", PALF_STAT_PRINT_INTERVAL_US),
purge_throttling_task_submitted_seq_(0),
purge_throttling_task_handled_seq_(0),
need_ignoring_throttling_(false),
is_inited_(false)
{
}
@ -236,8 +52,10 @@ LogIOWorker::~LogIOWorker()
int LogIOWorker::init(const LogIOWorkerConfig &config,
const int64_t tenant_id,
int cb_thread_pool_tg_id,
const int cb_thread_pool_tg_id,
ObIAllocator *allocator,
LogWritingThrottle *throttle,
const bool need_igore_throttle,
IPalfEnvImpl *palf_env_impl)
{
int ret = OB_SUCCESS;
@ -245,10 +63,10 @@ int LogIOWorker::init(const LogIOWorkerConfig &config,
ret = OB_INIT_TWICE;
PALF_LOG(ERROR, "LogIOWorker has been inited", K(ret));
} else if (false == config.is_valid() || 0 >= cb_thread_pool_tg_id || OB_ISNULL(allocator)
|| OB_ISNULL(palf_env_impl)) {
|| OB_ISNULL(throttle) || OB_ISNULL(palf_env_impl)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "invalid argument!!!", K(ret), K(config), K(cb_thread_pool_tg_id), KP(allocator),
KP(palf_env_impl));
KP(throttle), KP(palf_env_impl));
} else if (OB_FAIL(queue_.init(config.io_queue_capcity_, "IOWorkerLQ", tenant_id))) {
PALF_LOG(ERROR, "io task queue init failed", K(ret), K(config));
} else if (OB_FAIL(batch_io_task_mgr_.init(config.batch_width_,
@ -261,10 +79,22 @@ int LogIOWorker::init(const LogIOWorkerConfig &config,
cb_thread_pool_tg_id_ = cb_thread_pool_tg_id;
palf_env_impl_ = palf_env_impl;
PALF_REPORT_INFO_KV(K_(log_io_worker_num), K_(cb_thread_pool_tg_id));
throttle_ = throttle;
log_io_worker_queue_size_stat_.set_extra_info(EXTRA_INFOS);
is_inited_ = true;
PALF_LOG(INFO, "LogIOWorker init success", K(ret), K(config), K(cb_thread_pool_tg_id),
KPC(palf_env_impl));
purge_throttling_task_submitted_seq_ = 0;
purge_throttling_task_handled_seq_ = 0;
need_ignoring_throttling_ = need_igore_throttle;
need_purging_throttling_func_ = [this](){
return has_purge_throttling_tasks_() > 0;
};
if (!need_purging_throttling_func_.is_valid()) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(ERROR, "generate need_purging_throttling_func_ failed!!!", K(ret), K(config));
} else {
is_inited_ = true;
PALF_LOG(INFO, "LogIOWorker init success", K(ret), K(config), K(cb_thread_pool_tg_id),
KPC(palf_env_impl));
}
}
if (OB_FAIL(ret) && OB_INIT_TWICE != ret) {
destroy();
@ -277,7 +107,11 @@ void LogIOWorker::destroy()
(void)stop();
(void)wait();
is_inited_ = false;
need_ignoring_throttling_ = false;
purge_throttling_task_handled_seq_ = 0;
purge_throttling_task_submitted_seq_ = 0;
last_working_time_ = OB_INVALID_TIMESTAMP;
throttle_ = NULL;
cb_thread_pool_tg_id_ = -1;
palf_env_impl_ = NULL;
log_io_worker_num_ = -1;
@ -294,15 +128,25 @@ int LogIOWorker::submit_io_task(LogIOTask *io_task)
ret = OB_INVALID_ARGUMENT;
} else {
const bool need_purge_throttling = io_task->need_purge_throttling();
if (need_purge_throttling) {
ObSpinLockGuard guard(throttling_lock_);
const int64_t submit_seq = throttle_.inc_and_fetch_submitted_seq();
// When 'need_ignoring_throttling_' is true, we no need to advance purge_throttling_task_handled_seq_
if (!need_ignoring_throttling_ && need_purge_throttling) {
// Mush hold lock, otherwise:
// 1. At T1 timestamp, alloc sequence 8 to a LogIOTask1;
// 2. At T2 timestamp, alloc sequence 9 to a LogIOTask2;
// 3. At T3 timestamp, push LogIOTask2 into queue_;
// 4. At T4 timestamp, push LogIOTask1 into queue_.
//
// After handle_io_task_with_throttling_, we need update purge_throttling_task_handled_seq_, and the
// value has been reduced in above case.
//
// If push LogIOTask into queue_ failed, rollback 'purge_throttling_task_submitted_seq_' is incorrect.
SpinLockGuard guard(lock_);
const int64_t submit_seq = inc_and_fetch_purge_throttling_submitted_seq_();
(void)io_task->set_submit_seq(submit_seq);
PALF_LOG(INFO, "submit flush meta task success", KPC(io_task));
if (OB_FAIL(queue_.push(io_task))) {
PALF_LOG(WARN, "fail to push io task into queue", K(ret), KP(io_task));
//rollback submit_seq
(void)throttle_.dec_submitted_seq();
dec_purge_throttling_submitted_seq_();
}
} else {
if (OB_FAIL(queue_.push(io_task))) {
@ -319,8 +163,8 @@ int LogIOWorker::notify_need_writing_throttling(const bool &need_throttling)
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else {
(void)throttle_.notify_need_writing_throttling(need_throttling);
} else if (!need_ignoring_throttling_) {
(void)throttle_->notify_need_writing_throttling(need_throttling);
}
return ret;
}
@ -336,17 +180,29 @@ int LogIOWorker::handle_io_task_with_throttling_(LogIOTask *io_task)
int ret = OB_SUCCESS;
const int64_t throttling_size = io_task->get_io_size();
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = throttle_.throttling(throttling_size, palf_env_impl_))) {
if (!need_ignoring_throttling_
&& OB_SUCCESS != (tmp_ret = throttle_->throttling(throttling_size, need_purging_throttling_func_, palf_env_impl_))) {
LOG_ERROR_RET(tmp_ret, "failed to do_throttling", K(throttling_size));
}
const int64_t submit_seq = io_task->get_submit_seq();
if (OB_FAIL(io_task->do_task(cb_thread_pool_tg_id_, palf_env_impl_))) {
PALF_LOG(WARN, "LogIOTask do_task falied");
} else {
if (OB_SUCCESS != (tmp_ret = throttle_.after_append_log(throttling_size, submit_seq))) {
} else if (!need_ignoring_throttling_) {
const int64_t handled_seq = ATOMIC_LOAD(&purge_throttling_task_handled_seq_);
const int64_t submitted_seq = ATOMIC_LOAD(&purge_throttling_task_submitted_seq_);
// NB: for LogIOFlushLogTask, the submit_seq is always be zero.
if (submit_seq > handled_seq && submit_seq <= submitted_seq) {
ATOMIC_SET(&purge_throttling_task_handled_seq_, submit_seq);
}
if (submitted_seq < handled_seq) {
PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED,
"unexpected error, purge_throttling_task_submitted_seq_ is less than purge_throttling_task_handled_seq_",
KPC(this));
}
if (OB_SUCCESS != (tmp_ret = throttle_->after_append_log(throttling_size))) {
LOG_ERROR_RET(tmp_ret, "after_append failed", KP(io_task));
}
PALF_LOG(TRACE, "handle_io_task_ success", K(submit_seq));
PALF_LOG(TRACE, "handle_io_task_ success", K(submit_seq), KPC(this));
}
return ret;
}
@ -420,7 +276,7 @@ bool LogIOWorker::need_reduce_(LogIOTask *io_task)
break;
}
//do not reduce io when writing throttling is on
return (bool_ret && (!throttle_.need_writing_throttling_notified()));
return (bool_ret && !need_ignoring_throttling_ && !throttle_->need_writing_throttling_notified());
}
int LogIOWorker::reduce_io_task_(void *task)
@ -475,7 +331,8 @@ int LogIOWorker::reduce_io_task_(void *task)
int LogIOWorker::update_throttling_options_()
{
int ret = OB_SUCCESS;
if (OB_FAIL(throttle_.update_throttling_options(palf_env_impl_))) {
// For sys log stream, no need to update throttling options.
if (!need_ignoring_throttling_ && OB_FAIL(throttle_->update_throttling_options(palf_env_impl_))) {
LOG_WARN("failed to update_throttling_options");
}
return ret;
@ -623,5 +480,27 @@ int LogIOWorker::BatchLogIOFlushLogTaskMgr::find_usable_batch_io_task_(
}
return ret;
}
int64_t LogIOWorker::inc_and_fetch_purge_throttling_submitted_seq_()
{
return ATOMIC_AAF(&purge_throttling_task_submitted_seq_, 1);
}
void LogIOWorker::dec_purge_throttling_submitted_seq_()
{
ATOMIC_DEC(&purge_throttling_task_submitted_seq_);
}
bool LogIOWorker::has_purge_throttling_tasks_() const
{
const int64_t handled_seq = ATOMIC_LOAD(&purge_throttling_task_handled_seq_);
const int64_t submitted_seq = ATOMIC_LOAD(&purge_throttling_task_submitted_seq_);
if (submitted_seq < handled_seq) {
PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED,
"unexpected error, purge_throttling_task_submitted_seq_ is less than purge_throttling_task_handled_seq_",
KPC(this));
}
return submitted_seq != handled_seq;
}
} // end namespace palf
} // end namespace oceanbase

View File

@ -17,15 +17,17 @@
#include "lib/queue/ob_lighty_queue.h" // ObLightyQueue
#include "lib/utility/ob_macro_utils.h" // DISALLOW_COPY_AND_ASSIGN
#include "lib/utility/ob_print_utils.h" // TO_STRING_KV
#include "lib/lock/ob_spin_lock.h" // SpinLock
#include "lib/thread/thread_mgr_interface.h" // TGTaskHandler
#include "lib/container/ob_fixed_array.h" // ObSEArrayy
#include "lib/hash/ob_array_hash_map.h" // ObArrayHashMap
#include "lib/atomic/ob_atomic.h" // ATOMIC_LOAD
#include "lib/function/ob_function.h" // ObFunction
#include "share/ob_thread_pool.h" // ObThreadPool
#include "common/ob_clock_generator.h" //ObClockGenerator
#include "common/ob_clock_generator.h" // ObClockGenerator
#include "log_io_task.h" // LogBatchIOFlushLogTask
#include "log_define.h" // ALF_SLIDING_WINDOW_SIZE
#include "log_define.h" // PALF_SLIDING_WINDOW_SIZE
#include "palf_options.h" // PalfThrottleOptions
#include "log_throttle.h" // LogWritingThrottle
namespace oceanbase
{
namespace common
@ -64,145 +66,6 @@ struct LogIOWorkerConfig
int64_t batch_depth_;
TO_STRING_KV(K_(io_worker_num), K_(io_queue_capcity), K_(batch_width), K_(batch_depth));
};
class LogThrottlingStat
{
public:
LogThrottlingStat() {reset();}
~LogThrottlingStat() {reset();}
void reset();
inline bool has_ever_throttled() const;
inline void after_throttling(const int64_t throttling_interval, const int64_t throttling_size);
inline void start_throttling();
inline void stop_throttling();
TO_STRING_KV(K_(start_ts),
K_(stop_ts),
K_(total_throttling_interval),
K_(total_throttling_size),
K_(total_throttling_task_cnt),
K_(total_skipped_size),
K_(total_skipped_task_cnt),
K_(max_throttling_interval));
private:
int64_t start_ts_;
int64_t stop_ts_;
int64_t total_throttling_interval_;
int64_t total_throttling_size_;
int64_t total_throttling_task_cnt_;
//log_size of tasks need for throttling but overlooked
int64_t total_skipped_size_;
//count of tasks need for throttling but overlooked
int64_t total_skipped_task_cnt_;
int64_t max_throttling_interval_;
};
inline void LogThrottlingStat::after_throttling(const int64_t throttling_interval,
const int64_t throttling_size)
{
if (0 == throttling_interval) {
total_skipped_size_ += throttling_size;
total_skipped_task_cnt_++;
} else {
total_throttling_interval_ += throttling_interval;
total_throttling_size_ += throttling_size;
total_throttling_task_cnt_++;
max_throttling_interval_ = MAX(max_throttling_interval_, throttling_interval);
}
}
inline bool LogThrottlingStat::has_ever_throttled() const
{
return common::OB_INVALID_TIMESTAMP != start_ts_;
}
inline void LogThrottlingStat::start_throttling()
{
reset();
start_ts_ = common::ObClockGenerator::getClock();
}
inline void LogThrottlingStat::stop_throttling()
{
stop_ts_ = common::ObClockGenerator::getClock();
}
class LogWritingThrottle
{
public:
LogWritingThrottle() {reset();}
~LogWritingThrottle() {reset();}
void reset();
//invoked by gc thread
inline void notify_need_writing_throttling(const bool is_need);
inline bool need_writing_throttling_notified() const;
inline int64_t inc_and_fetch_submitted_seq();
inline void dec_submitted_seq();
int update_throttling_options(IPalfEnvImpl *palf_env_impl);
int throttling(const int64_t io_size, IPalfEnvImpl *palf_env_impl);
int after_append_log(const int64_t log_size, const int64_t seq);
TO_STRING_KV(K_(last_update_ts),
K_(need_writing_throttling_notified),
K_(submitted_seq),
K_(handled_seq),
K_(appended_log_size_cur_round),
K_(decay_factor),
K_(throttling_options),
K_(stat));
private:
int update_throtting_options_(IPalfEnvImpl *palf_env_impl, bool &has_recycled_log_disk);
inline bool need_throttling_with_options_() const;
inline bool need_throttling_() const;
//reset throttling related member
void clean_up_();
private:
static const int64_t UPDATE_INTERVAL_US = 500 * 1000L;//500ms
const int64_t DETECT_INTERVAL_US = 30 * 1000L;//1ms
const int64_t THROTTLING_DURATION_US = 1800 * 1000 * 1000L;//1800s
const int64_t THROTTLING_CHUNK_SIZE = MAX_LOG_BUFFER_SIZE;
//ts of lastest updating writing throttling info
int64_t last_update_ts_;
//ts when next log can be appended
//log_size can be appended during current round, will be reset when unrecyclable_size changed
// notified by gc, local meta may not be ready
mutable bool need_writing_throttling_notified_;
// local meta is ready after need_writing_throttling_locally_ set true
//used for meta flush task
//max_seq of task ever submitted
mutable int64_t submitted_seq_;
//max_seq of task ever handled
mutable int64_t handled_seq_;
int64_t appended_log_size_cur_round_;
double decay_factor_;
//append_speed during current round, Bytes per usecond
PalfThrottleOptions throttling_options_;
LogThrottlingStat stat_;
};
inline void LogWritingThrottle::notify_need_writing_throttling(const bool is_need) {
ATOMIC_SET(&need_writing_throttling_notified_, is_need);
}
inline bool LogWritingThrottle::need_writing_throttling_notified() const {
return ATOMIC_LOAD(&need_writing_throttling_notified_);
}
inline bool LogWritingThrottle::need_throttling_with_options_() const
{
return ATOMIC_LOAD(&need_writing_throttling_notified_) && throttling_options_.need_throttling();
}
inline bool LogWritingThrottle::need_throttling_() const
{
return need_throttling_with_options_() && ATOMIC_LOAD(&handled_seq_) >= ATOMIC_LOAD(&submitted_seq_);
}
int64_t LogWritingThrottle::inc_and_fetch_submitted_seq()
{
return ATOMIC_AAF(&submitted_seq_, 1);
}
void LogWritingThrottle::dec_submitted_seq()
{
ATOMIC_DEC(&submitted_seq_);
}
class LogIOWorker : public share::ObThreadPool
{
@ -211,8 +74,10 @@ public:
~LogIOWorker();
int init(const LogIOWorkerConfig &config,
const int64_t tenant_id,
int cb_thread_pool_tg_id,
const int cb_thread_pool_tg_id,
ObIAllocator *allocaotr,
LogWritingThrottle *throttle,
const bool need_ignore_throttle,
IPalfEnvImpl *palf_env_impl);
void destroy();
@ -222,7 +87,7 @@ public:
int notify_need_writing_throttling(const bool &need_throtting);
static constexpr int64_t MAX_THREAD_NUM = 1;
TO_STRING_KV(K_(log_io_worker_num), K_(cb_thread_pool_tg_id));
TO_STRING_KV(K_(log_io_worker_num), K_(cb_thread_pool_tg_id), K_(purge_throttling_task_handled_seq), K_(purge_throttling_task_submitted_seq));
private:
bool need_reduce_(LogIOTask *task);
int reduce_io_task_(void *task);
@ -230,11 +95,12 @@ private:
int handle_io_task_with_throttling_(LogIOTask *io_task);
int update_throttling_options_();
int run_loop_();
int64_t inc_and_fetch_purge_throttling_submitted_seq_();
void dec_purge_throttling_submitted_seq_();
bool has_purge_throttling_tasks_() const;
private:
static constexpr int64_t QUEUE_WAIT_TIME = 100 * 1000;
private:
typedef common::ObSpinLock SpinLock;
typedef common::ObSpinLockGuard SpinLockGuard;
class BatchLogIOFlushLogTaskMgr {
public:
@ -256,6 +122,8 @@ private:
int64_t usable_count_;
int64_t batch_width_;
};
typedef common::ObSpinLock SpinLock;
typedef common::ObSpinLockGuard SpinLockGuard;
// TODO: io_task_queue used to store all LogIOTask objects, and the LogIOWorker
// will consume it, at nowdays, the io_task_queue is single consumer and mutil
@ -271,9 +139,18 @@ private:
int64_t do_task_count_;
int64_t print_log_interval_;
int64_t last_working_time_;
LogWritingThrottle throttle_;
SpinLock throttling_lock_;
LogWritingThrottle *throttle_;
ObMiniStat::ObStatItem log_io_worker_queue_size_stat_;
// Each LogIOTask except LogIOFlushLogTask hold a unique sequence, when 'purge_throttling_task_submitted_seq_' minus
// 'purge_throttling_task_handled_seq_' is greater than zero, purge throttling.
// 1. Only incrementing 'purge_throttling_submitted_seq_' when submit LogIOTask which need purge throttling.
// 2. Set 'purge_throttling_task_handled_seq_' to 'purge_throttling_task_submitted_seq_' after handle the LogIOTask successfully.
mutable int64_t purge_throttling_task_submitted_seq_;
mutable int64_t purge_throttling_task_handled_seq_;
// ignoring throttline whatever(ie: for sys log stream, no need throttling)
bool need_ignoring_throttling_;
NeedPurgingThrottlingFunc need_purging_throttling_func_;
SpinLock lock_;
bool is_inited_;
};
} // end namespace palf

View File

@ -13,6 +13,7 @@
#define USING_LOG_PREFIX PALF
#include "log_io_worker_wrapper.h"
#include "lib/ob_define.h"
#include "share/rc/ob_tenant_base.h" // mtl_free
#include "log_define.h"
namespace oceanbase
@ -21,10 +22,13 @@ namespace palf
{
LogIOWorkerWrapper::LogIOWorkerWrapper()
: is_inited_(false),
is_user_tenant_(false),
sys_log_io_worker_(),
user_log_io_worker_() {}
: is_user_tenant_(false),
log_writer_parallelism_(-1),
log_io_workers_(NULL),
throttle_(),
round_robin_idx_(-1),
is_inited_(false) {}
LogIOWorkerWrapper::~LogIOWorkerWrapper()
{
@ -34,51 +38,58 @@ LogIOWorkerWrapper::~LogIOWorkerWrapper()
void LogIOWorkerWrapper::destroy()
{
is_inited_ = false;
round_robin_idx_ = -1;
throttle_.reset();
destory_and_free_log_io_workers_();
// reset after destory_and_free_log_io_workers_
log_writer_parallelism_ = -1;
is_user_tenant_ = false;
sys_log_io_worker_.destroy();
user_log_io_worker_.destroy();
}
int LogIOWorkerWrapper::init(const LogIOWorkerConfig &config,
const int64_t tenant_id,
int cb_thread_pool_tg_id,
ObIAllocator *allocaotor,
ObIAllocator *allocator,
IPalfEnvImpl *palf_env_impl)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) {
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("LogIOWorkerWrapper has inited twice", K(config), K(tenant_id));
} else if (!config.is_valid() || OB_UNLIKELY(!is_valid_tenant_id(tenant_id))
|| 0 >= cb_thread_pool_tg_id || OB_ISNULL(allocator) || OB_ISNULL(palf_env_impl)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", K(tenant_id));
} else if(FALSE_IT(is_user_tenant_ = is_user_tenant(tenant_id))) {
} else if (OB_FAIL(sys_log_io_worker_.init(config,
tenant_id,
cb_thread_pool_tg_id,
allocaotor,
palf_env_impl))) {
LOG_WARN("failed to init sys_log_io_worker", K(ret));
} else if (is_user_tenant_ && OB_FAIL(user_log_io_worker_.init(config,
tenant_id,
cb_thread_pool_tg_id,
allocaotor, palf_env_impl))) {
sys_log_io_worker_.destroy();
LOG_WARN("failed to init user_log_io_worker");
LOG_WARN("invalid tenant_id", K(config), K(tenant_id), K(cb_thread_pool_tg_id), KP(allocator),
KP(palf_env_impl));
} else if (OB_FAIL(create_and_init_log_io_workers_(config, tenant_id, cb_thread_pool_tg_id,
allocator, palf_env_impl))) {
LOG_WARN("init_log_io_workers_ failed", K(config));
} else {
is_user_tenant_ = is_user_tenant(tenant_id);
log_writer_parallelism_ = config.io_worker_num_;
throttle_.reset();
round_robin_idx_ = 0;
is_inited_ = true;
LOG_INFO("success to init LogIOWorkerWrapper", K(tenant_id), KPC(this));
LOG_INFO("success to init LogIOWorkerWrapper", K(config), K(tenant_id), KPC(this));
}
if (OB_FAIL(ret) && OB_INIT_TWICE != ret) {
destory_and_free_log_io_workers_();
}
return ret;
}
LogIOWorker *LogIOWorkerWrapper::get_log_io_worker(const int64_t palf_id)
{
return is_sys_palf_id(palf_id) ? &sys_log_io_worker_ : &user_log_io_worker_;
int64_t index = palf_id_to_index_(palf_id);
LogIOWorker *iow = log_io_workers_ + index;
PALF_LOG(INFO, "get_log_io_worker success", KPC(this), K(palf_id), K(index), KP(iow));
return iow;
}
int LogIOWorkerWrapper::start()
{
int ret = OB_SUCCESS;
if (OB_FAIL(sys_log_io_worker_.start())) {
LOG_WARN("failed to start sys_log_io_worker");
} else if (is_user_tenant_ && OB_FAIL(user_log_io_worker_.start())) {
LOG_WARN("failed to start user_log_io_worker");
if (OB_FAIL(start_())) {
LOG_WARN("failed to start log_io_workers_");
} else {
LOG_INFO("success to start LogIOWorkerWrapper", KPC(this));
}
@ -87,22 +98,16 @@ int LogIOWorkerWrapper::start()
void LogIOWorkerWrapper::stop()
{
LOG_INFO("LogIOWorkerWrapper starts stopping", KPC(this));
sys_log_io_worker_.stop();
if (is_user_tenant_) {
user_log_io_worker_.stop();
}
LOG_INFO("LogIOWorkerWrapper has finished stopping", KPC(this));
PALF_LOG(INFO, "LogIOWorkerWrapper starts stopping", KPC(this));
stop_();
PALF_LOG(INFO, "LogIOWorkerWrapper has finished stopping", KPC(this));
}
void LogIOWorkerWrapper::wait()
{
LOG_INFO("LogIOWorkerWrapper starts waiting", KPC(this));
sys_log_io_worker_.wait();
if (is_user_tenant_) {
user_log_io_worker_.wait();
}
LOG_INFO("LogIOWorkerWrapper has finished waiting", KPC(this));
PALF_LOG(INFO, " LogIOWorkerWrapper starts waiting", KPC(this));
wait_();
PALF_LOG(INFO, "LogIOWorkerWrapper has finished waiting", KPC(this));
}
int LogIOWorkerWrapper::notify_need_writing_throttling(const bool &need_throttling)
@ -112,9 +117,8 @@ int LogIOWorkerWrapper::notify_need_writing_throttling(const bool &need_throttli
ret = OB_NOT_INIT;
} else if (!is_user_tenant_) {
//need no nothing
} else if (OB_FAIL(user_log_io_worker_.notify_need_writing_throttling(need_throttling))) {
LOG_WARN("failed to notify_need_writing_throttling", K(need_throttling));
} else {
throttle_.notify_need_writing_throttling(need_throttling);
if (need_throttling) {
LOG_INFO("success to notify_need_writing_throttling True");
}
@ -124,10 +128,112 @@ int LogIOWorkerWrapper::notify_need_writing_throttling(const bool &need_throttli
int64_t LogIOWorkerWrapper::get_last_working_time() const
{
const int64_t sys_last_working_time = sys_log_io_worker_.get_last_working_time();
const int64_t user_last_working_time = user_log_io_worker_.get_last_working_time();
return MAX(sys_last_working_time, user_last_working_time);
int64_t last_working_time = OB_INVALID_TIMESTAMP;
if (IS_NOT_INIT) {
PALF_LOG_RET(ERROR, OB_NOT_INIT, "LogIOWorkerWrapper not inited", KPC(this));
} else {
for (int64_t i = 0; i < log_writer_parallelism_; i++) {
last_working_time = MAX(last_working_time, log_io_workers_[i].get_last_working_time());
}
}
return last_working_time;
}
int LogIOWorkerWrapper::create_and_init_log_io_workers_(const LogIOWorkerConfig &config,
const int64_t tenant_id,
const int cb_thread_pool_tg_id,
ObIAllocator *allocator,
IPalfEnvImpl *palf_env_impl)
{
int ret = OB_SUCCESS;
const int64_t log_writer_parallelism = config.io_worker_num_;
log_io_workers_ = reinterpret_cast<LogIOWorker *>(share::mtl_malloc(
(log_writer_parallelism) * sizeof(LogIOWorker), "LogIOWS"));
if (NULL == log_io_workers_) {
ret = OB_ALLOCATE_MEMORY_FAILED;
PALF_LOG(WARN, "allocate memory failed", K(log_writer_parallelism));
}
for (int64_t i = 0; i < log_writer_parallelism && OB_SUCC(ret); i++) {
LogIOWorker *iow = log_io_workers_ + i;
iow = new(iow)LogIOWorker();
// NB: the first LogIOWorker need ignoring throttling
bool need_ignoring_throttling = (i == SYS_LOG_IO_WORKER_INDEX);
if (OB_FAIL(iow->init(config, tenant_id, cb_thread_pool_tg_id, allocator,
&throttle_, need_ignoring_throttling, palf_env_impl))) {
PALF_LOG(WARN, "init LogIOWorker failed", K(i), K(config), K(tenant_id),
K(cb_thread_pool_tg_id), KP(allocator), KP(palf_env_impl));
} else {
PALF_LOG(INFO, "init LogIOWorker success", K(i), K(config), K(tenant_id),
K(cb_thread_pool_tg_id), KP(allocator), KP(palf_env_impl), KP(iow),
KP(log_io_workers_));
}
}
return ret;
}
int LogIOWorkerWrapper::start_()
{
int ret = OB_SUCCESS;
for (int64_t i = 0; i < log_writer_parallelism_ && OB_SUCC(ret); i++) {
LogIOWorker *iow = log_io_workers_ + i;
if (OB_FAIL(iow->start())) {
PALF_LOG(WARN, "start LogIOWorker failed", K(i));
}
}
return ret;
}
void LogIOWorkerWrapper::stop_()
{
for (int64_t i = 0; i < log_writer_parallelism_; i++) {
LogIOWorker *iow = log_io_workers_ + i;
iow->stop();
}
}
void LogIOWorkerWrapper::wait_()
{
for (int64_t i = 0; i < log_writer_parallelism_; i++) {
LogIOWorker *iow = log_io_workers_ + i;
iow->wait();
}
}
void LogIOWorkerWrapper::destory_and_free_log_io_workers_()
{
PALF_LOG(INFO, "destory_log_io_workers_ success", KPC(this));
if (NULL != log_io_workers_) {
for (int64_t i = 0; i < log_writer_parallelism_; i++) {
LogIOWorker *iow = log_io_workers_ + i;
iow->stop();
iow->wait();
iow->destroy();
iow->~LogIOWorker();
}
share::mtl_free(log_io_workers_);
log_io_workers_ = NULL;
}
}
int64_t LogIOWorkerWrapper::palf_id_to_index_(const int64_t palf_id)
{
int64_t index = -1;
// For sys log stream, index set to 0.
if (is_sys_palf_id(palf_id)) {
index = SYS_LOG_IO_WORKER_INDEX;
} else {
const int64_t hash_factor = log_writer_parallelism_ - 1;
if (hash_factor <= 0 && is_user_tenant_) {
PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "unexpected error, log_writer_parallelism_ must be greater than 1 when it's user tenant",
KPC(this), K(palf_id));
OB_ASSERT(false);
}
// NB: SYS_LOG_IO_WORKER_INDEX is 0, others should not use this LogIOWorker.
index = (round_robin_idx_++ % hash_factor) + 1;
PALF_LOG(INFO, "palf_id_to_index_ success", KPC(this), K(palf_id), K(index));
OB_ASSERT(index < log_writer_parallelism_);
}
return index;
}
}//end of namespace palf
}//end of namespace oceanbase

View File

@ -13,6 +13,7 @@
#ifndef OCEANBASE_LOGSERVIVE_LOG_IO_WORKER_WRAPPER_
#define OCEANBASE_LOGSERVIVE_LOG_IO_WORKER_WRAPPER_
#include "log_throttle.h"
#include "log_io_worker.h"
namespace oceanbase
@ -24,7 +25,6 @@ class LogIOWorkerWrapper
public:
LogIOWorkerWrapper();
~LogIOWorkerWrapper();
int init(const LogIOWorkerConfig &config,
const int64_t tenant_id,
int cb_thread_pool_tg_id,
@ -34,18 +34,35 @@ public:
int start();
void stop();
void wait();
// NB: nowdays, this interface can only be called when create_palf_handle_impl!!! otherwise, round_robin_idx_
// will not be correct.
LogIOWorker *get_log_io_worker(const int64_t palf_id);
int notify_need_writing_throttling(const bool &need_throtting);
int64_t get_last_working_time() const;
TO_STRING_KV(K_(is_inited), K_(is_user_tenant), K_(sys_log_io_worker), K_(user_log_io_worker));
TO_STRING_KV(K_(is_inited), K_(is_user_tenant), K_(log_writer_parallelism), KP(log_io_workers_), K_(round_robin_idx));
private:
int create_and_init_log_io_workers_(const LogIOWorkerConfig &config,
const int64_t tenant_id,
int cb_thread_pool_tg_id,
ObIAllocator *allocaotr,
IPalfEnvImpl *palf_env_impl);
int start_();
void stop_();
void wait_();
void destory_and_free_log_io_workers_();
int64_t palf_id_to_index_(const int64_t palf_id);
constexpr static int64_t SYS_LOG_IO_WORKER_INDEX = 0;
private:
bool is_inited_;
bool is_user_tenant_;
//for log stream NO.1
LogIOWorker sys_log_io_worker_;
//for log streams except NO.1
LogIOWorker user_log_io_worker_;
// 'log_writer_parallelism_' has include LogIOWorker which is used for sys log stream.
int64_t log_writer_parallelism_;
// The layout of LogIOWorker: | sys log ioworker | others |
LogIOWorker *log_io_workers_;
LogWritingThrottle throttle_;
int64_t round_robin_idx_;
bool is_inited_;
};
}//end of namespace palf

View File

@ -0,0 +1,223 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX PALF
#include "log_throttle.h"
#include "share/ob_throttling_utils.h" //ObThrottlingUtils
#include "palf_env_impl.h" // PalfEnvImpl
namespace oceanbase
{
using namespace common;
using namespace share;
namespace palf
{
void LogThrottlingStat::reset()
{
start_ts_ = OB_INVALID_TIMESTAMP;
stop_ts_ = OB_INVALID_TIMESTAMP;
total_throttling_interval_ = 0;
total_throttling_size_ = 0;
total_throttling_task_cnt_ = 0;
total_skipped_size_ = 0;
total_skipped_task_cnt_ = 0;
max_throttling_interval_ = 0;
}
void LogWritingThrottle::reset()
{
last_update_ts_ = OB_INVALID_TIMESTAMP;
need_writing_throttling_notified_ = false;
appended_log_size_cur_round_ = 0;
decay_factor_ = 0;
throttling_options_.reset();
stat_.reset();
}
int LogWritingThrottle::update_throttling_options(IPalfEnvImpl *palf_env_impl)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(palf_env_impl)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("palf_env_impl is NULL", KPC(this));
} else {
bool unused_has_freed_up_space = false;
if (check_need_update_throtting_options_guarded_by_lock_()
&& OB_FAIL(update_throtting_options_guarded_by_lock_(palf_env_impl, unused_has_freed_up_space))) {
LOG_WARN("failed to update_throttling_info", KPC(this));
}
}
return ret;
}
int LogWritingThrottle::throttling(const int64_t throttling_size,
const NeedPurgingThrottlingFunc &need_purging_throttling_func,
IPalfEnvImpl *palf_env_impl)
{
int ret = OB_SUCCESS;
if (!need_purging_throttling_func.is_valid()
|| OB_ISNULL(palf_env_impl)) {
ret = OB_INVALID_ARGUMENT;
} else if (throttling_size < 0) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid throttling_size", K(throttling_size), KPC(this));
} else if (0 == throttling_size) {
//no need throttling
} else {
// hold lock firstly.
lock_.lock();
if (need_throttling_not_guarded_by_lock_(need_purging_throttling_func)) {
const int64_t cur_unrecyclable_size = throttling_options_.unrecyclable_disk_space_ + appended_log_size_cur_round_;
const int64_t trigger_base_log_disk_size = throttling_options_.total_disk_space_ * throttling_options_.trigger_percentage_ / 100;
int64_t time_interval = 0;
if (OB_FAIL(ObThrottlingUtils::get_throttling_interval(THROTTLING_CHUNK_SIZE, throttling_size, trigger_base_log_disk_size,
cur_unrecyclable_size, decay_factor_, time_interval))) {
LOG_WARN("failed to get_throttling_interval", KPC(this));
}
int64_t remain_interval_us = time_interval;
bool has_freed_up_space = false;
// release lock_ in progress of usleep, therefore, accessing shared members in LogWritingThrottle need be guarded by lock
// in following code block.
lock_.unlock();
while (OB_SUCC(ret) && remain_interval_us > 0) {
const int64_t real_interval = MIN(remain_interval_us, DETECT_INTERVAL_US);
usleep(real_interval);
remain_interval_us -= real_interval;
if (remain_interval_us <= 0) {
//do nothing
} else if (OB_FAIL(update_throtting_options_guarded_by_lock_(palf_env_impl, has_freed_up_space))) {
LOG_WARN("failed to update_throttling_info_", KPC(this), K(time_interval), K(remain_interval_us));
} else if (!need_throttling_not_guarded_by_lock_(need_purging_throttling_func)
|| has_freed_up_space) {
LOG_TRACE("no need throttling or log disk has been freed up", KPC(this), K(time_interval), K(remain_interval_us), K(has_freed_up_space));
break;
}
}
// hold lock_ after ulseep, therefore, accessing shared members in LogWritingThrottle no need be guarded by lock.
lock_.lock();
stat_.after_throttling(time_interval - remain_interval_us, throttling_size);
} else if (need_throttling_with_options_not_guarded_by_lock_()) {
stat_.after_throttling(0, throttling_size);
}
if (stat_.has_ever_throttled()) {
if (REACH_TIME_INTERVAL(2 * 1000 * 1000L)) {
PALF_LOG(INFO, "[LOG DISK THROTTLING] [STAT]", KPC(this));
}
}
// release lock lastly.
lock_.unlock();
}
return ret;
}
int LogWritingThrottle::after_append_log(const int64_t log_size)
{
SpinLockGuard guard(lock_);
appended_log_size_cur_round_ += log_size;
int ret = OB_SUCCESS;
if (OB_UNLIKELY(log_size < 0)) {
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "invalid argument", K(log_size));
} else {/*do nothing*/}
return ret;
}
int LogWritingThrottle::update_throtting_options_guarded_by_lock_(IPalfEnvImpl *palf_env_impl, bool &has_freed_up_space)
{
int ret = OB_SUCCESS;
SpinLockGuard guard(lock_);
const int64_t cur_ts = ObClockGenerator::getClock();
if (ATOMIC_LOAD(&need_writing_throttling_notified_)) {
PalfThrottleOptions new_throttling_options;
if (OB_FAIL(palf_env_impl->get_throttling_options(new_throttling_options))) {
PALF_LOG(WARN, "failed to get_writing_throttling_option");
} else if (OB_UNLIKELY(!new_throttling_options.is_valid())) {
ret = OB_ERR_UNEXPECTED;
PALF_LOG(WARN, "options is invalid", K(new_throttling_options), KPC(this));
} else {
const bool need_throttling = new_throttling_options.need_throttling();
const int64_t new_available_size_after_limit = new_throttling_options.get_available_size_after_limit();
bool need_update_decay_factor = false;
bool need_start_throttling = false;
if (need_throttling) {
if (!throttling_options_.need_throttling()) {
need_start_throttling = true;
need_update_decay_factor = true;
} else {
need_update_decay_factor = (throttling_options_.get_available_size_after_limit() != new_available_size_after_limit);
}
if (need_update_decay_factor) {
if (OB_FAIL(ObThrottlingUtils::calc_decay_factor(new_available_size_after_limit, THROTTLING_DURATION_US,
THROTTLING_CHUNK_SIZE, decay_factor_))) {
PALF_LOG(ERROR, "failed to calc_decay_factor", K(throttling_options_), "duration(s)",
THROTTLING_DURATION_US / (1000 * 1000), K(THROTTLING_CHUNK_SIZE));
} else {
PALF_LOG(INFO, "[LOG DISK THROTTLING] success to calc_decay_factor", K(decay_factor_), K(throttling_options_),
K(new_throttling_options), "duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE));
}
}
if (OB_SUCC(ret)) {
// update other field
has_freed_up_space = new_throttling_options.unrecyclable_disk_space_ < throttling_options_.unrecyclable_disk_space_;
bool has_unrecyclable_space_changed = new_throttling_options.unrecyclable_disk_space_ != throttling_options_.unrecyclable_disk_space_;
if (has_unrecyclable_space_changed || need_start_throttling) {
// reset appended_log_size_cur_round_ when unrecyclable_disk_space_ changed
appended_log_size_cur_round_ = 0;
}
throttling_options_ = new_throttling_options;
if (need_start_throttling) {
stat_.start_throttling();
PALF_LOG(INFO, "[LOG DISK THROTTLING] [START]", KPC(this),
"duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE));
}
}
} else {
if (throttling_options_.need_throttling()) {
PALF_LOG(INFO, "[LOG DISK THROTTLING] [STOP]", KPC(this),
"duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE));
clean_up_not_guarded_by_lock_();
stat_.stop_throttling();
}
}
}
} else {
if (throttling_options_.need_throttling()) {
PALF_LOG(INFO, "[LOG DISK THROTTLING] [STOP] no need throttling any more", KPC(this),
"duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE));
clean_up_not_guarded_by_lock_();
stat_.stop_throttling();
}
}
if (OB_SUCC(ret)) {
last_update_ts_ = cur_ts;
}
return ret;
}
void LogWritingThrottle::clean_up_not_guarded_by_lock_()
{
//do not reset submitted_seq_ && handled_seq_ && last_update_ts_ && stat_
appended_log_size_cur_round_ = 0;
decay_factor_ = 0;
throttling_options_.reset();
}
bool LogWritingThrottle::check_need_update_throtting_options_guarded_by_lock_()
{
SpinLockGuard guard(lock_);
int64_t cur_ts = ObClockGenerator::getClock();
return cur_ts > last_update_ts_ + UPDATE_INTERVAL_US;
}
} // end namespace palf
} // end namespace oceanbase

View File

@ -0,0 +1,163 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_LOGSERVIVE_LOG_THROTTLE_H
#define OCEANBASE_LOGSERVIVE_LOG_THROTTLE_H
#include "lib/utility/ob_macro_utils.h" // DISALLOW_COPY_AND_ASSIGN
#include "lib/utility/ob_print_utils.h" // TO_STRING_KV
#include "lib/lock/ob_spin_lock.h" // SpinLock
#include "common/ob_clock_generator.h" // ObClockGenerator
#include "lib/function/ob_function.h" // ObFunction
#include "log_define.h" // MAX_LOG_BUFFER_SIZE
#include "palf_options.h" // PalfThrottleOptions
namespace oceanbase
{
namespace palf
{
class IPalfEnvImpl;
typedef ObFunction<bool()> NeedPurgingThrottlingFunc;
class LogThrottlingStat
{
public:
LogThrottlingStat() {reset();}
~LogThrottlingStat() {reset();}
void reset();
inline bool has_ever_throttled() const;
inline void after_throttling(const int64_t throttling_interval, const int64_t throttling_size);
inline void start_throttling();
inline void stop_throttling();
TO_STRING_KV(K_(start_ts),
K_(stop_ts),
K_(total_throttling_interval),
K_(total_throttling_size),
K_(total_throttling_task_cnt),
K_(total_skipped_size),
K_(total_skipped_task_cnt),
K_(max_throttling_interval));
private:
int64_t start_ts_;
int64_t stop_ts_;
int64_t total_throttling_interval_;
int64_t total_throttling_size_;
int64_t total_throttling_task_cnt_;
//log_size of tasks need for throttling but overlooked
int64_t total_skipped_size_;
//count of tasks need for throttling but overlooked
int64_t total_skipped_task_cnt_;
int64_t max_throttling_interval_;
};
inline void LogThrottlingStat::after_throttling(const int64_t throttling_interval,
const int64_t throttling_size)
{
if (0 == throttling_interval) {
total_skipped_size_ += throttling_size;
total_skipped_task_cnt_++;
} else {
total_throttling_interval_ += throttling_interval;
total_throttling_size_ += throttling_size;
total_throttling_task_cnt_++;
max_throttling_interval_ = MAX(max_throttling_interval_, throttling_interval);
}
}
inline bool LogThrottlingStat::has_ever_throttled() const
{
return common::OB_INVALID_TIMESTAMP != start_ts_;
}
inline void LogThrottlingStat::start_throttling()
{
reset();
start_ts_ = common::ObClockGenerator::getClock();
}
inline void LogThrottlingStat::stop_throttling()
{
stop_ts_ = common::ObClockGenerator::getClock();
}
class LogWritingThrottle
{
public:
LogWritingThrottle() {reset();}
~LogWritingThrottle() {reset();}
void reset();
//invoked by gc thread
inline void notify_need_writing_throttling(const bool is_need);
inline bool need_writing_throttling_notified() const;
int update_throttling_options(IPalfEnvImpl *palf_env_impl);
int throttling(const int64_t io_size,
const NeedPurgingThrottlingFunc &need_purging_throttling_func,
IPalfEnvImpl *palf_env_impl);
int after_append_log(const int64_t log_size);
TO_STRING_KV(K_(last_update_ts),
K_(need_writing_throttling_notified),
K_(appended_log_size_cur_round),
K_(decay_factor),
K_(throttling_options),
K_(stat));
private:
int update_throtting_options_guarded_by_lock_(IPalfEnvImpl *palf_env_impl, bool &has_recycled_log_disk);
inline bool need_throttling_with_options_not_guarded_by_lock_() const;
inline bool need_throttling_not_guarded_by_lock_(const NeedPurgingThrottlingFunc &need_purge_throttling) const;
//reset throttling related member
void clean_up_not_guarded_by_lock_();
bool check_need_update_throtting_options_guarded_by_lock_();
private:
typedef common::ObSpinLock SpinLock;
typedef common::ObSpinLockGuard SpinLockGuard;
static const int64_t UPDATE_INTERVAL_US = 500 * 1000L;//500ms
const int64_t DETECT_INTERVAL_US = 30 * 1000L;//30ms
const int64_t THROTTLING_DURATION_US = 1800 * 1000 * 1000L;//1800s
const int64_t THROTTLING_CHUNK_SIZE = MAX_LOG_BUFFER_SIZE;
//ts of lastest updating writing throttling info
int64_t last_update_ts_;
//ts when next log can be appended
//log_size can be appended during current round, will be reset when unrecyclable_size changed
// notified by gc, local meta may not be ready
mutable bool need_writing_throttling_notified_;
int64_t appended_log_size_cur_round_;
double decay_factor_;
//append_speed during current round, Bytes per usecond
PalfThrottleOptions throttling_options_;
LogThrottlingStat stat_;
mutable SpinLock lock_;
};
inline void LogWritingThrottle::notify_need_writing_throttling(const bool is_need) {
ATOMIC_SET(&need_writing_throttling_notified_, is_need);
}
inline bool LogWritingThrottle::need_writing_throttling_notified() const {
return ATOMIC_LOAD(&need_writing_throttling_notified_);
}
inline bool LogWritingThrottle::need_throttling_with_options_not_guarded_by_lock_() const
{
return ATOMIC_LOAD(&need_writing_throttling_notified_) && throttling_options_.need_throttling();
}
inline bool LogWritingThrottle::need_throttling_not_guarded_by_lock_(
const NeedPurgingThrottlingFunc &need_purge_throttling) const
{
// Only need throttle under following conditions:
// 1. when the tenant writing throttling has been enabled and has been triggered;
// 2. when there is no need to purge throttle.
return need_throttling_with_options_not_guarded_by_lock_() && !need_purge_throttling();
}
} // end namespace palf
} // end namespace oceanbase
#endif

View File

@ -198,11 +198,6 @@ int PalfEnvImpl::init(
{
int ret = OB_SUCCESS;
int pret = 0;
// TODO by runlin: configurable
log_io_worker_config_.io_worker_num_ = 1;
log_io_worker_config_.io_queue_capcity_ = 100 * 1024;
log_io_worker_config_.batch_width_ = 8;
log_io_worker_config_.batch_depth_ = PALF_SLIDING_WINDOW_SIZE;
const int64_t io_cb_num = PALF_SLIDING_WINDOW_SIZE * 128;
if (is_inited_) {
ret = OB_INIT_TWICE;
@ -212,6 +207,10 @@ int PalfEnvImpl::init(
ret = OB_INVALID_ARGUMENT;
PALF_LOG(ERROR, "invalid arguments", K(ret), KP(transport), K(base_dir), K(self), KP(transport),
KP(log_alloc_mgr), KP(log_block_pool), KP(monitor));
} else if (OB_FAIL(init_log_io_worker_config_(options.disk_options_.log_writer_parallelism_,
tenant_id,
log_io_worker_config_))) {
PALF_LOG(WARN, "init_log_io_worker_config_ failed", K(options));
} else if (OB_FAIL(fetch_log_engine_.init(this, log_alloc_mgr))) {
PALF_LOG(ERROR, "FetchLogEngine init failed", K(ret));
} else if (OB_FAIL(log_rpc_.init(self, cluster_id, tenant_id, transport))) {
@ -1242,5 +1241,40 @@ int PalfEnvImpl::get_throttling_options(PalfThrottleOptions &options)
return ret;
}
int PalfEnvImpl::init_log_io_worker_config_(const int log_writer_parallelism,
const int64_t tenant_id,
LogIOWorkerConfig &config)
{
int ret = OB_SUCCESS;
// log_writer_parallelism only valid when it's user tenant.
// to support writing throttling, sys log stream must has dependent LogIOWorker.
const int64_t real_log_writer_parallelism = is_user_tenant(tenant_id) ? (log_writer_parallelism + 1) : 1;
auto tmp_upper_align_div = [](const int64_t num, const int64_t align) -> int64_t {
return (num + align - 1) / align;
};
constexpr int64_t default_io_queue_cap = 100 * 1024;
constexpr int64_t default_io_batch_width = 8;
// Due to the uniqueness of the log stream ID, using a hash distribution method can
// naturally balance the load in a single-unit environment, therefore, we set the
// min io queue capacity to PALF_SLIDING_WINDOW_SIZE * 2, and set default_io_batch_width
// to 1.
// TODO by zjf225077:
// to support load balance in a multi-unit environment, LogIOWorker needs to use
// a load factor to ensure that the number of log streams on each LogIOWorker is in
// a balanced state.
constexpr int64_t default_min_io_queue_cap = PALF_SLIDING_WINDOW_SIZE * 2;
constexpr int64_t default_min_batch_width = 1;
// Assume that a maximum of 100 * 1024 I/O tasks exist simultaneously in single PalfEnvImpl
config.io_worker_num_ = real_log_writer_parallelism;
config.io_queue_capcity_ = MAX(default_min_io_queue_cap,
tmp_upper_align_div(default_io_queue_cap, real_log_writer_parallelism));
config.batch_width_ = MAX(default_min_batch_width,
tmp_upper_align_div(default_io_batch_width, real_log_writer_parallelism));
config.batch_depth_ = PALF_SLIDING_WINDOW_SIZE;
PALF_LOG(INFO, "init_log_io_worker_config_ success", K(config), K(tenant_id), K(log_writer_parallelism));
return ret;
}
} // end namespace palf
} // end namespace oceanbase

View File

@ -324,6 +324,10 @@ private:
int check_tmp_log_dir_exist_(bool &exist) const;
int remove_stale_incomplete_palf_();
int init_log_io_worker_config_(const int log_writer_parallelism,
const int64_t tenant_id,
LogIOWorkerConfig &config);
private:
typedef common::RWLock RWLock;
typedef RWLock::RLockGuard RLockGuard;

View File

@ -38,6 +38,7 @@ void PalfDiskOptions::reset()
log_disk_utilization_limit_threshold_ = -1;
log_disk_utilization_threshold_ = -1;
log_disk_throttling_percentage_ = -1;
log_writer_parallelism_ = -1;
}
bool PalfDiskOptions::is_valid() const
@ -47,7 +48,8 @@ bool PalfDiskOptions::is_valid() const
&& 1 <=log_disk_utilization_limit_threshold_ && 100 >= log_disk_utilization_limit_threshold_
&& log_disk_utilization_limit_threshold_ > log_disk_utilization_threshold_
&& log_disk_throttling_percentage_ >= MIN_WRITING_THTOTTLING_TRIGGER_PERCENTAGE
&& log_disk_throttling_percentage_ <= 100;
&& log_disk_throttling_percentage_ <= 100
&& log_writer_parallelism_ >= 1 && log_writer_parallelism_ <= 8;
}
bool PalfDiskOptions::operator==(const PalfDiskOptions &palf_disk_options) const
@ -55,7 +57,9 @@ bool PalfDiskOptions::operator==(const PalfDiskOptions &palf_disk_options) const
return log_disk_usage_limit_size_ == palf_disk_options.log_disk_usage_limit_size_
&& log_disk_utilization_threshold_ == palf_disk_options.log_disk_utilization_threshold_
&& log_disk_utilization_limit_threshold_ == palf_disk_options.log_disk_utilization_limit_threshold_
&& log_disk_throttling_percentage_ == palf_disk_options.log_disk_throttling_percentage_;
&& log_disk_throttling_percentage_ == palf_disk_options.log_disk_throttling_percentage_
&& log_writer_parallelism_ == palf_disk_options.log_writer_parallelism_;
}
PalfDiskOptions &PalfDiskOptions::operator=(const PalfDiskOptions &other)
@ -64,6 +68,7 @@ PalfDiskOptions &PalfDiskOptions::operator=(const PalfDiskOptions &other)
log_disk_utilization_threshold_ = other.log_disk_utilization_threshold_;
log_disk_utilization_limit_threshold_ = other.log_disk_utilization_limit_threshold_;
log_disk_throttling_percentage_ = other.log_disk_throttling_percentage_;
log_writer_parallelism_ = other.log_writer_parallelism_;
return *this;
}

View File

@ -19,15 +19,19 @@ namespace oceanbase
{
namespace palf
{
// 1. 磁盘空间总大小;
// 2. 复用百分比;
// 3. 停写百分比;
// Following disk options can be set to Palf.
// 1. log_disk_usage_limit_size_, the total log disk space.
// 2. log_disk_utilization_threshold_, log_disklog disk utilization threshold before reuse log files.
// 3. log_disk_utilization_limit_threshold_, maximum of log disk usage percentage before stop submitting or receiving logs.
// 4. log_disk_throttling_percentage_, the threshold of the size of the log disk when writing_limit will be triggered.
// 5. log_writer_parallelism, the number of parallel log writer processes that can be used to write redo log entries to disk.
struct PalfDiskOptions
{
PalfDiskOptions() : log_disk_usage_limit_size_(-1),
log_disk_utilization_threshold_(-1),
log_disk_utilization_limit_threshold_(-1),
log_disk_throttling_percentage_(-1)
log_disk_throttling_percentage_(-1),
log_writer_parallelism_(-1)
{}
~PalfDiskOptions() { reset(); }
static constexpr int64_t MB = 1024*1024ll;
@ -39,10 +43,12 @@ struct PalfDiskOptions
int log_disk_utilization_threshold_;
int log_disk_utilization_limit_threshold_;
int64_t log_disk_throttling_percentage_;
int log_writer_parallelism_;
TO_STRING_KV("log_disk_size(MB)", log_disk_usage_limit_size_ / MB,
"log_disk_utilization_threshold(%)", log_disk_utilization_threshold_,
"log_disk_utilization_limit_threshold(%)", log_disk_utilization_limit_threshold_,
"log_disk_throttling_percentage(%)", log_disk_throttling_percentage_);
"log_disk_throttling_percentage(%)", log_disk_throttling_percentage_,
"log_writer_parallelism", log_writer_parallelism_);
};

View File

@ -835,6 +835,12 @@ int ObMultiTenant::create_tenant(const ObTenantMeta &meta, bool write_slog, cons
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(OTC_MGR.add_tenant_config(tenant_id))) {
LOG_ERROR("add tenant config fail", K(tenant_id), K(ret));
}
}
if (OB_SUCC(ret)) {
CREATE_WITH_TEMP_ENTITY(RESOURCE_OWNER, tenant->id()) {
WITH_ENTITY(&tenant->ctx()) {
@ -847,8 +853,6 @@ int ObMultiTenant::create_tenant(const ObTenantMeta &meta, bool write_slog, cons
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(OTC_MGR.add_tenant_config(tenant_id))) {
LOG_ERROR("add tenant config fail", K(tenant_id), K(ret));
#ifdef OMT_UNITTEST
} else if (!is_virtual_tenant_id(tenant_id) &&
OB_FAIL(OTC_MGR.got_version(tenant_id, common::ObSystemConfig::INIT_VERSION))) {

View File

@ -705,6 +705,14 @@ int ObTenant::construct_mtl_init_ctx(const ObTenantMeta &meta, share::ObTenantMo
mtl_init_ctx_->palf_options_.disk_options_.log_disk_utilization_threshold_ = 80;
mtl_init_ctx_->palf_options_.disk_options_.log_disk_utilization_limit_threshold_ = 95;
mtl_init_ctx_->palf_options_.disk_options_.log_disk_throttling_percentage_ = 100;
mtl_init_ctx_->palf_options_.disk_options_.log_writer_parallelism_ = 3;
ObTenantConfig *config = TENANT_CONF(id_);
if (OB_ISNULL(config)) {
ret = is_virtual_tenant_id(id_) ? OB_SUCCESS : OB_ENTRY_NOT_EXIST;
} else {
mtl_init_ctx_->palf_options_.disk_options_.log_writer_parallelism_ = config->_log_writer_parallelism;
}
LOG_INFO("construct_mtl_init_ctx success", "palf_options", mtl_init_ctx_->palf_options_.disk_options_);
}
return ret;
}

View File

@ -599,6 +599,10 @@ DEF_TIME(standby_db_fetch_log_rpc_timeout, OB_TENANT_PARAMETER, "15s",
"When the rpc timeout, the log transport service switches to another server of the log restore source tenant to fetch logs. "
"Range: [2s, +∞)",
ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(_log_writer_parallelism, OB_TENANT_PARAMETER, "3",
"[1,8]",
"the number of parallel log writer threads that can be used to write redo log entries to disk. ",
ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::STATIC_EFFECTIVE));
// ========================= LogService Config End =====================
DEF_INT(resource_hard_limit, OB_CLUSTER_PARAMETER, "100", "[100, 10000]",

View File

@ -294,6 +294,7 @@ _io_callback_thread_count
_large_query_io_percentage
_lcl_op_interval
_load_tde_encrypt_engine
_log_writer_parallelism
_max_elr_dependent_trx_count
_max_malloc_sample_interval
_max_schema_slot_num

View File

@ -38,9 +38,13 @@ public:
virtual void SetUp();
virtual void TearDown();
protected:
bool g_need_purging_throttling;
NeedPurgingThrottlingFunc g_need_purging_throttling_func;
};
TestPalfThrottling::TestPalfThrottling() {}
TestPalfThrottling::TestPalfThrottling() : g_need_purging_throttling(false) {
g_need_purging_throttling_func = [this](){ return g_need_purging_throttling; };
}
TestPalfThrottling::~TestPalfThrottling()
{
@ -60,7 +64,8 @@ void TestPalfThrottling::TearDown()
//ObMallocAllocator::get_instance()->recycle_tenant_allocator(1001);
}
TEST(TestPalfThrottling, test_palf_options)
TEST_F(TestPalfThrottling, test_palf_options)
{
char buf[64] = {0};
memset(buf, 0, 64);
@ -76,11 +81,13 @@ TEST(TestPalfThrottling, test_palf_options)
wrapper.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ = total_disk_size;
wrapper.disk_opts_for_stopping_writing_.log_disk_utilization_threshold_ = 80;
wrapper.disk_opts_for_stopping_writing_.log_disk_utilization_limit_threshold_ = utilization_limit_threshold;
wrapper.disk_opts_for_stopping_writing_.log_writer_parallelism_ = 1;
int64_t unrecyclable_size = 0;
wrapper.set_cur_unrecyclable_log_disk_size(unrecyclable_size);
ASSERT_EQ(false, wrapper.need_throttling());
unrecyclable_size = total_disk_size * 70 /100;
wrapper.set_cur_unrecyclable_log_disk_size(unrecyclable_size);
PALF_LOG(INFO, "test_palf_options trace", K(wrapper));
ASSERT_EQ(true, wrapper.need_throttling());
//test PalfThrottleOptions
PalfThrottleOptions throttling_options;
@ -97,7 +104,7 @@ TEST(TestPalfThrottling, test_palf_options)
ASSERT_EQ(total_disk_size * (utilization_limit_threshold - throttling_percentage)/100, throttling_options.get_available_size_after_limit());
}
TEST(TestPalfThrottling, test_throttling_stat)
TEST_F(TestPalfThrottling, test_throttling_stat)
{
LogThrottlingStat stat;
ASSERT_EQ(false, stat.has_ever_throttled());
@ -125,7 +132,7 @@ TEST(TestPalfThrottling, test_throttling_stat)
ASSERT_EQ(0, stat.max_throttling_interval_);
}
TEST(TestPalfThrottling, test_log_write_throttle)
TEST_F(TestPalfThrottling, test_log_write_throttle)
{
int64_t total_disk_size = 1024 * 1024 * 1024L;
int64_t utilization_limit_threshold = 95;
@ -142,7 +149,7 @@ TEST(TestPalfThrottling, test_log_write_throttle)
palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
LogWritingThrottle throttle;
ASSERT_EQ(false, throttle.need_throttling_());
ASSERT_EQ(false, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func));
ASSERT_EQ(false, throttle.need_writing_throttling_notified());
throttle.notify_need_writing_throttling(true);
ASSERT_EQ(true, throttle.need_writing_throttling_notified());
@ -150,13 +157,12 @@ TEST(TestPalfThrottling, test_log_write_throttle)
throttle.notify_need_writing_throttling(false);
ASSERT_EQ(false, throttle.need_writing_throttling_notified());
ASSERT_EQ(OB_INVALID_ARGUMENT, throttle.after_append_log(-1, 0));
ASSERT_EQ(OB_INVALID_ARGUMENT, throttle.after_append_log(0, -1));
ASSERT_EQ(OB_INVALID_ARGUMENT, throttle.after_append_log(-1));
//test throttling only after notified
PALF_LOG(INFO, "case 1: test no need throttling while notify_need_writing_throttling is false");
throttle.update_throttling_options(&palf_env_impl);
throttle.throttling(1024, &palf_env_impl);
throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl);
PalfThrottleOptions invalid_throttle_options;
ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_);
ASSERT_EQ(true, OB_INVALID_TIMESTAMP != throttle.last_update_ts_);
@ -164,7 +170,7 @@ TEST(TestPalfThrottling, test_log_write_throttle)
// test update interval 500ms
PALF_LOG(INFO, "case 2: test update interval");
throttle.throttling(1024, &palf_env_impl);
throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl);
ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_);
ASSERT_EQ(true, OB_INVALID_TIMESTAMP != throttle.last_update_ts_);
ASSERT_EQ(false, throttle.need_writing_throttling_notified_);
@ -174,8 +180,8 @@ TEST(TestPalfThrottling, test_log_write_throttle)
usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
throttle.update_throttling_options(&palf_env_impl);
throttle.notify_need_writing_throttling(true);
throttle.throttling(1024, &palf_env_impl);
ASSERT_EQ(false, throttle.need_throttling_());
throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl);
ASSERT_EQ(false, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func));
ASSERT_EQ(false, throttle.stat_.has_ever_throttled());
@ -185,9 +191,9 @@ TEST(TestPalfThrottling, test_log_write_throttle)
palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100;
usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
throttle.update_throttling_options(&palf_env_impl);
throttle.throttling(1024, &palf_env_impl);
throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl);
palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
ASSERT_EQ(false, throttle.need_throttling_());
ASSERT_EQ(false, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func));
ASSERT_EQ(false, throttle.stat_.has_ever_throttled());
//test need throttling after update
@ -197,109 +203,96 @@ TEST(TestPalfThrottling, test_log_write_throttle)
palf_env_impl.disk_options_wrapper_.set_cur_unrecyclable_log_disk_size(unrecyclable_size);
usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
throttle.update_throttling_options(&palf_env_impl);
throttle.throttling(1024, &palf_env_impl);
throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl);
palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options);
ASSERT_EQ(throttle_options, throttle.throttling_options_);
ASSERT_EQ(true, throttle.need_throttling_());
ASSERT_EQ(true, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func));
ASSERT_EQ(true, throttle.stat_.has_ever_throttled());
ASSERT_EQ(1024, throttle.stat_.total_throttling_size_);
ASSERT_EQ(1, throttle.stat_.total_throttling_task_cnt_);
ASSERT_EQ(0, throttle.stat_.total_skipped_task_cnt_);
ASSERT_EQ(0, throttle.stat_.total_skipped_size_);
throttle.after_append_log(1024, 0);
throttle.after_append_log(1024);
ASSERT_EQ(1024, throttle.appended_log_size_cur_round_);
ASSERT_EQ(0, throttle.submitted_seq_);
ASSERT_EQ(0, throttle.handled_seq_);
//test no need throttling with flush meta task in queue
//.1 flush log task
g_need_purging_throttling = true;
PALF_LOG(INFO, "case 5: test no need throttling while flush meta task ", K(throttle));
throttle.inc_and_fetch_submitted_seq();
throttle.inc_and_fetch_submitted_seq();
throttle.inc_and_fetch_submitted_seq();
ASSERT_EQ(3, throttle.submitted_seq_);
throttle.throttling(1024, &palf_env_impl);
throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl);
ASSERT_EQ(throttle_options, throttle.throttling_options_);
ASSERT_EQ(false, throttle.need_throttling_());
// meta task need purging throttling
ASSERT_EQ(false, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func));
ASSERT_EQ(1024, throttle.stat_.total_throttling_size_);
ASSERT_EQ(1, throttle.stat_.total_throttling_task_cnt_);
ASSERT_EQ(1, throttle.stat_.total_skipped_task_cnt_);
ASSERT_EQ(1024, throttle.stat_.total_skipped_size_);
throttle.after_append_log(1024, 0);
throttle.after_append_log(1024);
ASSERT_EQ(2048, throttle.appended_log_size_cur_round_);
ASSERT_EQ(3, throttle.submitted_seq_);
ASSERT_EQ(0, throttle.handled_seq_);
//.2 flush meta task
PALF_LOG(INFO, "case 6: test no need throttling and flush meta task ", K(throttle));
throttle.throttling(1024, &palf_env_impl);
throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl);
ASSERT_EQ(throttle_options, throttle.throttling_options_);
ASSERT_EQ(true, throttle.need_throttling_with_options_());
ASSERT_EQ(false, throttle.need_throttling_());
ASSERT_EQ(true, throttle.need_throttling_with_options_not_guarded_by_lock_());
ASSERT_EQ(false, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func));
ASSERT_EQ(1024, throttle.stat_.total_throttling_size_);
ASSERT_EQ(1, throttle.stat_.total_throttling_task_cnt_);
ASSERT_EQ(2, throttle.stat_.total_skipped_task_cnt_);
ASSERT_EQ(2048, throttle.stat_.total_skipped_size_);
throttle.after_append_log(1024, 1);
throttle.after_append_log(1024);
ASSERT_EQ(3072, throttle.appended_log_size_cur_round_);
ASSERT_EQ(3, throttle.submitted_seq_);
ASSERT_EQ(1, throttle.handled_seq_);
throttle.throttling(1024, &palf_env_impl);
throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl);
ASSERT_EQ(throttle_options, throttle.throttling_options_);
ASSERT_EQ(true, throttle.need_throttling_with_options_());
ASSERT_EQ(false, throttle.need_throttling_());
ASSERT_EQ(true, throttle.need_throttling_with_options_not_guarded_by_lock_());
ASSERT_EQ(false, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func));
ASSERT_EQ(1024, throttle.stat_.total_throttling_size_);
ASSERT_EQ(1, throttle.stat_.total_throttling_task_cnt_);
ASSERT_EQ(3, throttle.stat_.total_skipped_task_cnt_);
ASSERT_EQ(3072, throttle.stat_.total_skipped_size_);
throttle.after_append_log(1024, 3);
throttle.after_append_log(1024);
ASSERT_EQ(4096, throttle.appended_log_size_cur_round_);
ASSERT_EQ(3, throttle.submitted_seq_);
ASSERT_EQ(3, throttle.handled_seq_);
PALF_LOG(INFO, "case 7: need throttling after all flush meta task handled", K(throttle));
throttle.throttling(1024, &palf_env_impl);
g_need_purging_throttling = false;
throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl);
ASSERT_EQ(throttle_options, throttle.throttling_options_);
ASSERT_EQ(true, throttle.decay_factor_ > 0.0);
ASSERT_EQ(true, throttle.need_throttling_());
ASSERT_EQ(true, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func));
ASSERT_EQ(2048, throttle.stat_.total_throttling_size_);
ASSERT_EQ(2, throttle.stat_.total_throttling_task_cnt_);
ASSERT_EQ(3, throttle.stat_.total_skipped_task_cnt_);
ASSERT_EQ(3072, throttle.stat_.total_skipped_size_);
throttle.after_append_log(1024, 3);
throttle.after_append_log(1024);
ASSERT_EQ(5120, throttle.appended_log_size_cur_round_);
ASSERT_EQ(3, throttle.submitted_seq_);
ASSERT_EQ(3, throttle.handled_seq_);
//test notify_need_writing_throttling(false) changed
PALF_LOG(INFO, "case 8: no need to throttle after notify_need_throttling(false)", K(throttle));
usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
throttle.notify_need_writing_throttling(false);
throttle.update_throttling_options(&palf_env_impl);
throttle.throttling(1024, &palf_env_impl);
throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl);
ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_);
ASSERT_EQ(false, throttle.need_throttling_with_options_());
ASSERT_EQ(false, throttle.need_throttling_());
ASSERT_EQ(false, throttle.need_throttling_with_options_not_guarded_by_lock_());
ASSERT_EQ(false, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func));
ASSERT_EQ(2048, throttle.stat_.total_throttling_size_);
ASSERT_EQ(2, throttle.stat_.total_throttling_task_cnt_);
ASSERT_EQ(3, throttle.stat_.total_skipped_task_cnt_);
ASSERT_EQ(3072, throttle.stat_.total_skipped_size_);
ASSERT_EQ(true, OB_INVALID_TIMESTAMP != throttle.stat_.stop_ts_);
throttle.after_append_log(1024, 0);
throttle.after_append_log(1024);
ASSERT_EQ(1024, throttle.appended_log_size_cur_round_);
ASSERT_EQ(0, throttle.decay_factor_);
ASSERT_EQ(3, throttle.submitted_seq_);
ASSERT_EQ(3, throttle.handled_seq_);
//test need write throttling again
PALF_LOG(INFO, "case 9: need to throttle after notify_need_throttling(true)", K(throttle));
usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
throttle.notify_need_writing_throttling(true);
throttle.update_throttling_options(&palf_env_impl);
throttle.throttling(1024, &palf_env_impl);
ASSERT_EQ(true, throttle.need_throttling_with_options_());
ASSERT_EQ(true, throttle.need_throttling_());
throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl);
ASSERT_EQ(true, throttle.need_throttling_with_options_not_guarded_by_lock_());
ASSERT_EQ(true, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func));
ASSERT_EQ(throttle_options, throttle.throttling_options_);
ASSERT_EQ(true, throttle.decay_factor_ > 0.0);
ASSERT_EQ(1024, throttle.stat_.total_throttling_size_);
@ -308,10 +301,8 @@ TEST(TestPalfThrottling, test_log_write_throttle)
ASSERT_EQ(0, throttle.stat_.total_skipped_size_);
ASSERT_EQ(true, throttle.stat_.total_throttling_interval_ > 0);
ASSERT_EQ(true, OB_INVALID_TIMESTAMP == throttle.stat_.stop_ts_);
throttle.after_append_log(1024, 0);
throttle.after_append_log(1024);
ASSERT_EQ(1024, throttle.appended_log_size_cur_round_);
ASSERT_EQ(3, throttle.submitted_seq_);
ASSERT_EQ(3, throttle.handled_seq_);
double old_decay_factor = throttle.decay_factor_;
//
@ -320,9 +311,9 @@ TEST(TestPalfThrottling, test_log_write_throttle)
palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 55;
palf_env_impl.get_throttling_options(throttle_options);
throttle.update_throttling_options(&palf_env_impl);
throttle.throttling(1024, &palf_env_impl);
ASSERT_EQ(true, throttle.need_throttling_with_options_());
ASSERT_EQ(true, throttle.need_throttling_());
throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl);
ASSERT_EQ(true, throttle.need_throttling_with_options_not_guarded_by_lock_());
ASSERT_EQ(true, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func));
ASSERT_EQ(throttle_options, throttle.throttling_options_);
ASSERT_EQ(true, throttle.decay_factor_ > 0.0);
ASSERT_EQ(true, throttle.decay_factor_ != old_decay_factor);
@ -331,10 +322,8 @@ TEST(TestPalfThrottling, test_log_write_throttle)
ASSERT_EQ(0, throttle.stat_.total_skipped_task_cnt_);
ASSERT_EQ(0, throttle.stat_.total_skipped_size_);
ASSERT_EQ(true, OB_INVALID_TIMESTAMP == throttle.stat_.stop_ts_);
throttle.after_append_log(1024, 0);
throttle.after_append_log(1024);
ASSERT_EQ(2048, throttle.appended_log_size_cur_round_);
ASSERT_EQ(3, throttle.submitted_seq_);
ASSERT_EQ(3, throttle.handled_seq_);
//test reset appended_log_size_cur_round_
PALF_LOG(INFO, "case 11: test reset appended_log_size_cur_round_ after unrecyclable_size changes", K(throttle));
@ -343,9 +332,9 @@ TEST(TestPalfThrottling, test_log_write_throttle)
unrecyclable_size = total_disk_size * 65/100;
palf_env_impl.disk_options_wrapper_.set_cur_unrecyclable_log_disk_size(unrecyclable_size);
throttle.update_throttling_options(&palf_env_impl);
throttle.throttling(1024, &palf_env_impl);
ASSERT_EQ(true, throttle.need_throttling_with_options_());
ASSERT_EQ(true, throttle.need_throttling_());
throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl);
ASSERT_EQ(true, throttle.need_throttling_with_options_not_guarded_by_lock_());
ASSERT_EQ(true, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func));
palf_env_impl.get_throttling_options(throttle_options);
ASSERT_EQ(throttle_options, throttle.throttling_options_);
ASSERT_EQ(true, throttle.decay_factor_ > 0.0);
@ -356,12 +345,9 @@ TEST(TestPalfThrottling, test_log_write_throttle)
ASSERT_EQ(0, throttle.stat_.total_skipped_size_);
ASSERT_EQ(true, OB_INVALID_TIMESTAMP != throttle.stat_.start_ts_);
ASSERT_EQ(true, OB_INVALID_TIMESTAMP == throttle.stat_.stop_ts_);
throttle.after_append_log(1024, 0);
throttle.after_append_log(1024);
ASSERT_EQ(1024, throttle.appended_log_size_cur_round_);
ASSERT_EQ(3, throttle.submitted_seq_);
ASSERT_EQ(3, throttle.handled_seq_);
ASSERT_EQ(OB_ERR_UNEXPECTED, throttle.after_append_log(0, 4));
ASSERT_EQ(OB_ERR_UNEXPECTED, throttle.after_append_log(0, 2));
ASSERT_EQ(OB_SUCCESS, throttle.after_append_log(0));
//test stop write throttling when trigger percentage changed
PALF_LOG(INFO, "case 12: test stop write throttling when trigger percentage changed", K(throttle));
@ -369,9 +355,9 @@ TEST(TestPalfThrottling, test_log_write_throttle)
palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 80;
palf_env_impl.get_throttling_options(throttle_options);
throttle.update_throttling_options(&palf_env_impl);
throttle.throttling(1024, &palf_env_impl);
ASSERT_EQ(false, throttle.need_throttling_with_options_());
ASSERT_EQ(false, throttle.need_throttling_());
throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl);
ASSERT_EQ(false, throttle.need_throttling_with_options_not_guarded_by_lock_());
ASSERT_EQ(false, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func));
ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_);
ASSERT_EQ(0, throttle.decay_factor_);
ASSERT_EQ(3072, throttle.stat_.total_throttling_size_);
@ -380,17 +366,15 @@ TEST(TestPalfThrottling, test_log_write_throttle)
ASSERT_EQ(0, throttle.stat_.total_skipped_size_);
ASSERT_EQ(true, OB_INVALID_TIMESTAMP != throttle.stat_.stop_ts_);
ASSERT_EQ(0, throttle.appended_log_size_cur_round_);
ASSERT_EQ(3, throttle.submitted_seq_);
ASSERT_EQ(3, throttle.handled_seq_);
PALF_LOG(INFO, "case 12: test stop writing throttling when unrecyclable size fallbacks", K(throttle));
usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 60;
throttle.notify_need_writing_throttling(true);
throttle.update_throttling_options(&palf_env_impl);
throttle.throttling(1024, &palf_env_impl);
ASSERT_EQ(true, throttle.need_throttling_with_options_());
ASSERT_EQ(true, throttle.need_throttling_());
throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl);
ASSERT_EQ(true, throttle.need_throttling_with_options_not_guarded_by_lock_());
ASSERT_EQ(true, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func));
palf_env_impl.get_throttling_options(throttle_options);
ASSERT_EQ(throttle_options, throttle.throttling_options_);
ASSERT_EQ(true, throttle.decay_factor_ > 0.0);
@ -400,17 +384,15 @@ TEST(TestPalfThrottling, test_log_write_throttle)
ASSERT_EQ(0, throttle.stat_.total_skipped_size_);
ASSERT_EQ(true, throttle.stat_.total_throttling_interval_ > 0);
ASSERT_EQ(true, OB_INVALID_TIMESTAMP == throttle.stat_.stop_ts_);
throttle.after_append_log(1024, 0);
throttle.after_append_log(1024);
ASSERT_EQ(1024, throttle.appended_log_size_cur_round_);
ASSERT_EQ(3, throttle.submitted_seq_);
ASSERT_EQ(3, throttle.handled_seq_);
usleep(LogWritingThrottle::UPDATE_INTERVAL_US);
unrecyclable_size = total_disk_size * 45/100;
palf_env_impl.disk_options_wrapper_.set_cur_unrecyclable_log_disk_size(unrecyclable_size);
throttle.update_throttling_options(&palf_env_impl);
throttle.throttling(1024, &palf_env_impl);
ASSERT_EQ(false, throttle.need_throttling_with_options_());
throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl);
ASSERT_EQ(false, throttle.need_throttling_with_options_not_guarded_by_lock_());
}