diff --git a/mittest/logservice/env/ob_simple_log_cluster_env.cpp b/mittest/logservice/env/ob_simple_log_cluster_env.cpp index 5cfda495ad..a247336096 100755 --- a/mittest/logservice/env/ob_simple_log_cluster_env.cpp +++ b/mittest/logservice/env/ob_simple_log_cluster_env.cpp @@ -431,10 +431,6 @@ int ObSimpleLogClusterTestEnv::update_disk_options(const int64_t server_id, cons int ret = OB_SUCCESS; const int64_t MB = 1024 * 1024; PalfOptions opts; - opts.disk_options_.log_disk_usage_limit_size_ = file_block_num * PALF_PHY_BLOCK_SIZE; - opts.disk_options_.log_disk_utilization_threshold_ = 80; - opts.disk_options_.log_disk_utilization_limit_threshold_ = 95; - opts.disk_options_.log_disk_throttling_percentage_ = 100; auto cluster = get_cluster(); if (server_id >= 0 && server_id < cluster.size()) { ObTenantEnv::set_tenant(cluster[server_id]->get_tenant_base()); @@ -443,7 +439,12 @@ int ObSimpleLogClusterTestEnv::update_disk_options(const int64_t server_id, cons ret = OB_NOT_SUPPORTED; } else { auto palf_env_impl = dynamic_cast(srv->get_palf_env()); - ret = palf_env_impl->update_options(opts); + if (OB_FAIL(palf_env_impl->get_options(opts))) { + PALF_LOG(ERROR, "get_optiosn failed", K(ret), K(server_id)); + } else { + opts.disk_options_.log_disk_usage_limit_size_ = file_block_num * PALF_PHY_BLOCK_SIZE; + ret = palf_env_impl->update_options(opts); + } } } return ret; diff --git a/mittest/logservice/env/ob_simple_log_server.cpp b/mittest/logservice/env/ob_simple_log_server.cpp index c9cc4b719c..ec9624faf3 100644 --- a/mittest/logservice/env/ob_simple_log_server.cpp +++ b/mittest/logservice/env/ob_simple_log_server.cpp @@ -289,6 +289,7 @@ int ObSimpleLogServer::init_log_service_() opts.disk_options_.log_disk_utilization_threshold_ = 80; opts.disk_options_.log_disk_utilization_limit_threshold_ = 95; opts.disk_options_.log_disk_throttling_percentage_ = 100; + opts.disk_options_.log_writer_parallelism_ = 2; std::string clog_dir = clog_dir_ + "/tenant_1"; allocator_ = OB_NEW(ObTenantMutilAllocator, "TestBase", node_id_); ObMemAttr attr(1, "SimpleLog"); diff --git a/mittest/logservice/test_ob_simple_log_basic_func.cpp b/mittest/logservice/test_ob_simple_log_basic_func.cpp index 0a1672a059..73c2de2a4e 100644 --- a/mittest/logservice/test_ob_simple_log_basic_func.cpp +++ b/mittest/logservice/test_ob_simple_log_basic_func.cpp @@ -540,8 +540,10 @@ TEST_F(TestObSimpleLogClusterBasicFunc, io_reducer_basic) int64_t leader_idx = 0; PalfHandleImplGuard leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); - leader.palf_env_impl_->log_io_worker_wrapper_.user_log_io_worker_.batch_io_task_mgr_.has_batched_size_ = 0; - leader.palf_env_impl_->log_io_worker_wrapper_.user_log_io_worker_.batch_io_task_mgr_.handle_count_ = 0; + LogIOWorker *iow = leader.palf_handle_impl_->log_engine_.log_io_worker_; + + iow->batch_io_task_mgr_.has_batched_size_ = 0; + iow->batch_io_task_mgr_.handle_count_ = 0; std::vector palf_list; EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list)); int64_t lag_follower_idx = (leader_idx + 1) % node_cnt_; @@ -552,8 +554,8 @@ TEST_F(TestObSimpleLogClusterBasicFunc, io_reducer_basic) EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10000, leader_idx, 120)); const LSN max_lsn = leader.palf_handle_impl_->get_max_lsn(); wait_lsn_until_flushed(max_lsn, leader); - const int64_t has_batched_size = leader.palf_env_impl_->log_io_worker_wrapper_.user_log_io_worker_.batch_io_task_mgr_.has_batched_size_; - const int64_t handle_count = leader.palf_env_impl_->log_io_worker_wrapper_.user_log_io_worker_.batch_io_task_mgr_.handle_count_; + const int64_t has_batched_size = iow->batch_io_task_mgr_.has_batched_size_; + const int64_t handle_count = iow->batch_io_task_mgr_.handle_count_; const int64_t log_id = leader.palf_handle_impl_->sw_.get_max_log_id(); PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "batched_size", K(has_batched_size), K(log_id)); @@ -567,8 +569,9 @@ TEST_F(TestObSimpleLogClusterBasicFunc, io_reducer_basic) PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "follower is lagged", K(max_lsn), K(lag_follower_max_lsn)); lag_follower_max_lsn = lag_follower.palf_handle_impl_->sw_.max_flushed_end_lsn_; } - const int64_t follower_has_batched_size = lag_follower.palf_env_impl_->log_io_worker_wrapper_.user_log_io_worker_.batch_io_task_mgr_.has_batched_size_; - const int64_t follower_handle_count = lag_follower.palf_env_impl_->log_io_worker_wrapper_.user_log_io_worker_.batch_io_task_mgr_.handle_count_; + LogIOWorker *iow_follower = lag_follower.palf_handle_impl_->log_engine_.log_io_worker_; + const int64_t follower_has_batched_size = iow_follower->batch_io_task_mgr_.has_batched_size_; + const int64_t follower_handle_count = iow_follower->batch_io_task_mgr_.handle_count_; EXPECT_EQ(OB_SUCCESS, revert_cluster_palf_handle_guard(palf_list)); int64_t cost_ts = ObTimeUtility::current_time() - start_ts; diff --git a/mittest/logservice/test_ob_simple_log_engine.cpp b/mittest/logservice/test_ob_simple_log_engine.cpp index 9c59a10dd4..413ba6f082 100644 --- a/mittest/logservice/test_ob_simple_log_engine.cpp +++ b/mittest/logservice/test_ob_simple_log_engine.cpp @@ -467,7 +467,7 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func) EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_1, leader_idx_1, leader_1)); EXPECT_EQ(OB_SUCCESS, get_palf_env(leader_idx_1, palf_env)); - LogIOWorker *log_io_worker = &palf_env->palf_env_impl_.log_io_worker_wrapper_.user_log_io_worker_; + LogIOWorker *log_io_worker = leader_1.palf_handle_impl_->log_engine_.log_io_worker_;; int64_t prev_log_id_1 = 0; int64_t prev_has_batched_size = 0; @@ -520,6 +520,7 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func) IOTaskVerify io_task_verify_2(id_2, log_engine->palf_epoch_); EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_2, leader_idx_2, leader_2)); { + LogIOWorker *log_io_worker = leader_2.palf_handle_impl_->log_engine_.log_io_worker_;; // 聚合度为1的忽略 EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_2)); EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1, id_1, 110)); @@ -588,6 +589,7 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func) IOTaskVerify io_task_verify_3(id_3, log_engine->palf_epoch_); EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_3, leader_idx_3, leader_3)); { + LogIOWorker *log_io_worker = leader_3.palf_handle_impl_->log_engine_.log_io_worker_;; EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_3)); EXPECT_EQ(OB_SUCCESS, submit_log(leader_1, 1, id_1, 110)); EXPECT_EQ(OB_SUCCESS, submit_log(leader_2, 1, id_2, 110)); @@ -635,6 +637,7 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func) IOTaskVerify io_task_verify_4(id_4, log_engine->palf_epoch_); EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_4, leader_idx_4, leader_4)); { + LogIOWorker *log_io_worker = leader_4.palf_handle_impl_->log_engine_.log_io_worker_;; EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_4)); EXPECT_EQ(OB_SUCCESS, submit_log(leader_4, 10, id_4, 110)); sleep(1); @@ -650,9 +653,13 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func) sleep(1); leader_4.palf_handle_impl_->log_engine_.palf_epoch_++; io_task_cond_4.cond_.signal(); - PALF_LOG(INFO, "after signal"); + LSN log_tail = leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_; + PALF_LOG(INFO, "after signal", K(max_lsn), K(log_tail)); wait_lsn_until_flushed(max_lsn, leader_4); - EXPECT_EQ(max_lsn, leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_); + sleep(1); + log_tail = leader_4.palf_handle_impl_->log_engine_.log_storage_.log_tail_; + PALF_LOG(INFO, "after flused case 4", K(max_lsn), K(log_tail)); + EXPECT_EQ(max_lsn, log_tail); } // 测试truncate @@ -666,6 +673,7 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func) TruncateLogCbCtx ctx(LSN(0)); EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_5, leader_idx_5, leader_5)); { + LogIOWorker *log_io_worker = leader_5.palf_handle_impl_->log_engine_.log_io_worker_;; EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_5)); EXPECT_EQ(OB_SUCCESS, submit_log(leader_5, 10, id_5, 110)); LSN max_lsn = leader_5.palf_handle_impl_->sw_.get_max_lsn(); @@ -693,6 +701,7 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func) IOTaskVerify io_task_verify_6(id_6, log_engine->palf_epoch_); EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_6, leader_idx_6, leader_6)); { + LogIOWorker *log_io_worker = leader_6.palf_handle_impl_->log_engine_.log_io_worker_;; { EXPECT_EQ(OB_SUCCESS, submit_log(leader_6, 15, id_6, MAX_LOG_BODY_SIZE)); sleep(2); @@ -731,7 +740,8 @@ TEST_F(TestObSimpleLogClusterLogEngine, io_reducer_basic_func) io_task_cond_6.cond_.signal(); //EXPECT_EQ(max_lsn, leader_6.palf_handle_.palf_handle_impl_->log_engine_.log_storage_.log_tail_); wait_lsn_until_flushed(max_lsn3, leader_6); - EXPECT_EQ(max_lsn3, leader_6.palf_handle_impl_->log_engine_.log_storage_.log_tail_); + LSN log_tail = leader_6.palf_handle_impl_->log_engine_.log_storage_.log_tail_; + EXPECT_EQ(max_lsn3, log_tail); } PALF_LOG(INFO, "end io_reducer_basic_func"); diff --git a/mittest/logservice/test_ob_simple_log_rebuild.cpp b/mittest/logservice/test_ob_simple_log_rebuild.cpp index ca799e95b8..f8b830afe2 100644 --- a/mittest/logservice/test_ob_simple_log_rebuild.cpp +++ b/mittest/logservice/test_ob_simple_log_rebuild.cpp @@ -210,7 +210,7 @@ TEST_F(TestObSimpleLogClusterRebuild, test_old_leader_rebuild) // submit a cond task before unblocking net to stop truncating task IOTaskCond cond(id, rebuild_server->palf_env_impl_->last_palf_epoch_); - LogIOWorker *io_worker = &rebuild_server->palf_env_impl_->log_io_worker_wrapper_.user_log_io_worker_; + LogIOWorker *io_worker = rebuild_server->palf_handle_impl_->log_engine_.log_io_worker_; io_worker->submit_io_task(&cond); // after unblocking net, old leader will do rebuild diff --git a/mittest/logservice/test_ob_simple_log_single_replica_func.cpp b/mittest/logservice/test_ob_simple_log_single_replica_func.cpp index de86264850..e44d58af61 100644 --- a/mittest/logservice/test_ob_simple_log_single_replica_func.cpp +++ b/mittest/logservice/test_ob_simple_log_single_replica_func.cpp @@ -1487,7 +1487,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator_with_flashback) EXPECT_EQ(true, iterator.iterator_impl_.curr_entry_is_raw_write_); // 需要从磁盘上将后面两日志读上来,但由于受控回放不会吐出 - EXPECT_FALSE(iterator_end_lsn == iterator.iterator_storage_.end_lsn_); + // EXPECT_FALSE(iterator_end_lsn == iterator.iterator_storage_.end_lsn_); EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn2, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(false, iterate_end_by_replayable_point); EXPECT_EQ(true, iterator.iterator_impl_.curr_entry_is_raw_write_); @@ -1503,7 +1503,7 @@ TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator_with_flashback) LSN last_lsn = raw_write_leader.palf_handle_impl_->get_max_lsn(); SCN last_scn = raw_write_leader.palf_handle_impl_->get_max_scn(); - LogIOWorker *io_worker = &raw_write_leader.palf_env_impl_->log_io_worker_wrapper_.user_log_io_worker_; + LogIOWorker *io_worker = raw_write_leader.palf_handle_impl_->log_engine_.log_io_worker_; IOTaskCond cond(id_raw_write, raw_write_leader.palf_env_impl_->last_palf_epoch_); io_worker->submit_io_task(&cond); std::vector lsns; diff --git a/mittest/logservice/test_ob_simple_log_throttling.cpp b/mittest/logservice/test_ob_simple_log_throttling.cpp index bcc42c8059..84698b1d56 100644 --- a/mittest/logservice/test_ob_simple_log_throttling.cpp +++ b/mittest/logservice/test_ob_simple_log_throttling.cpp @@ -73,8 +73,9 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_sys) palf_env_impl.disk_options_wrapper_.set_cur_unrecyclable_log_disk_size(unrecyclable_size); PalfThrottleOptions throttle_options; palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options); - LogIOWorker *log_io_worker = &(palf_env_impl.log_io_worker_wrapper_.user_log_io_worker_); - LogWritingThrottle &throttle = log_io_worker->throttle_; + LogIOWorker *log_io_worker = leader.palf_handle_impl_->log_engine_.log_io_worker_;; + LogWritingThrottle *throttle = log_io_worker->throttle_; + // sys log stream no need throttling PalfThrottleOptions invalid_throttle_options; PALF_LOG(INFO, "prepare for throttling"); @@ -90,9 +91,9 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_sys) ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 2 * MB)); max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn, leader); - ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_); - ASSERT_EQ(false, throttle.need_throttling_with_options_()); - ASSERT_EQ(false, throttle.need_throttling_()); + ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_); + ASSERT_EQ(false, throttle->need_throttling_with_options_not_guarded_by_lock_()); + ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_)); int64_t break_ts = common::ObClockGenerator::getClock(); ASSERT_EQ(true, (break_ts - cur_ts) < 1 * 1000 * 1000); @@ -133,8 +134,8 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic) palf_env_impl.disk_options_wrapper_.set_cur_unrecyclable_log_disk_size(unrecyclable_size); PalfThrottleOptions throttle_options; palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options); - LogIOWorker *log_io_worker = &(palf_env_impl.log_io_worker_wrapper_.user_log_io_worker_); - LogWritingThrottle &throttle = log_io_worker->throttle_; + LogIOWorker *log_io_worker = leader.palf_handle_impl_->log_engine_.log_io_worker_;; + LogWritingThrottle *throttle = log_io_worker->throttle_; PalfThrottleOptions invalid_throttle_options; PALF_LOG(INFO, "[CASE 1]: test no need throttling while unrecyclable_log_disk_size is no more than trigger_size"); @@ -142,10 +143,10 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic) LSN max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn(); leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE; wait_lsn_until_flushed(max_lsn_1, leader); - ASSERT_EQ(true, throttle.last_update_ts_ != OB_INVALID_TIMESTAMP); - ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_); - ASSERT_EQ(false, throttle.need_throttling_with_options_()); - ASSERT_EQ(false, throttle.need_throttling_()); + ASSERT_EQ(true, throttle->last_update_ts_ != OB_INVALID_TIMESTAMP); + ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_); + ASSERT_EQ(false, throttle->need_throttling_with_options_not_guarded_by_lock_()); + ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_)); PALF_LOG(INFO, "[CASE 2] no need throttling while log_disk_throttling_percentage_ is off"); usleep(LogWritingThrottle::UPDATE_INTERVAL_US); @@ -153,9 +154,9 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic) EXPECT_EQ(OB_SUCCESS, submit_log(leader, 31, id, 1 * MB)); max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn_1, leader); - ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_); - ASSERT_EQ(false, throttle.need_throttling_with_options_()); - ASSERT_EQ(false, throttle.need_throttling_()); + ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_); + ASSERT_EQ(false, throttle->need_throttling_with_options_not_guarded_by_lock_()); + ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_)); PALF_LOG(INFO, "[CASE 3] need throttling"); LogEntryHeader log_entry_header; @@ -173,67 +174,67 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic) max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn_1, leader); PALF_LOG(INFO, "case 3: after submit_log", K(before_lsn), K(end_lsn), K(max_lsn_1)); - ASSERT_EQ(throttle_options, throttle.throttling_options_); - ASSERT_EQ(true, throttle.need_throttling_()); - ASSERT_EQ(true, throttle.stat_.has_ever_throttled()); - ASSERT_EQ(log_size, throttle.stat_.total_throttling_size_); - ASSERT_EQ(1, throttle.stat_.total_throttling_task_cnt_); - ASSERT_EQ(0, throttle.stat_.total_skipped_task_cnt_); - ASSERT_EQ(0, throttle.stat_.total_skipped_size_); - ASSERT_EQ(log_size, throttle.appended_log_size_cur_round_); - ASSERT_EQ(5, throttle.submitted_seq_); - ASSERT_EQ(5, throttle.handled_seq_); + ASSERT_EQ(throttle_options, throttle->throttling_options_); + ASSERT_EQ(true, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_)); + ASSERT_EQ(true, throttle->stat_.has_ever_throttled()); + ASSERT_EQ(log_size, throttle->stat_.total_throttling_size_); + ASSERT_EQ(1, throttle->stat_.total_throttling_task_cnt_); + ASSERT_EQ(0, throttle->stat_.total_skipped_task_cnt_); + ASSERT_EQ(0, throttle->stat_.total_skipped_size_); + ASSERT_EQ(log_size, throttle->appended_log_size_cur_round_); + ASSERT_EQ(5, log_io_worker->purge_throttling_task_submitted_seq_); + ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_); - PALF_LOG(INFO, "[CASE 4] no need throttling because trigger_percentage is set to 100"); + PALF_LOG(INFO, "[CASE 4] no need throttling because trigger_percentage is set to 100", KPC(throttle)); palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100; usleep(LogWritingThrottle::UPDATE_INTERVAL_US); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * MB)); max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn_1, leader); - ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_); - ASSERT_EQ(false, throttle.need_throttling_()); - ASSERT_EQ(true, throttle.stat_.has_ever_throttled()); - ASSERT_EQ(log_size, throttle.stat_.total_throttling_size_); - ASSERT_EQ(1, throttle.stat_.total_throttling_task_cnt_); - ASSERT_EQ(0, throttle.stat_.total_skipped_task_cnt_); - ASSERT_EQ(0, throttle.stat_.total_skipped_size_); - ASSERT_EQ(0, throttle.appended_log_size_cur_round_); - ASSERT_EQ(5, throttle.submitted_seq_); - ASSERT_EQ(5, throttle.handled_seq_); + ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_); + ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_)); + ASSERT_EQ(true, throttle->stat_.has_ever_throttled()); + ASSERT_EQ(log_size, throttle->stat_.total_throttling_size_); + ASSERT_EQ(1, throttle->stat_.total_throttling_task_cnt_); + ASSERT_EQ(0, throttle->stat_.total_skipped_task_cnt_); + ASSERT_EQ(0, throttle->stat_.total_skipped_size_); + ASSERT_EQ(0, throttle->appended_log_size_cur_round_); + ASSERT_EQ(5, log_io_worker->purge_throttling_task_submitted_seq_); + ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_); - PALF_LOG(INFO, "[CASE 5] no need throttling because log_disk_size is set to 500M"); + PALF_LOG(INFO, "[CASE 5] no need throttling because log_disk_size is set to 500M", KPC(throttle)); palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 40; usleep(LogWritingThrottle::UPDATE_INTERVAL_US); palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * MB)); max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn_1, leader); - ASSERT_EQ(throttle_options, throttle.throttling_options_); - ASSERT_EQ(true, throttle.need_throttling_()); - ASSERT_EQ(true, throttle.stat_.has_ever_throttled()); + ASSERT_EQ(throttle_options, throttle->throttling_options_); + ASSERT_EQ(true, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_)); + ASSERT_EQ(true, throttle->stat_.has_ever_throttled()); PALF_LOG(INFO, "[CASE 5] no need throttling because log_disk_size is set to 500M", K(throttle)); - ASSERT_EQ(log_size, throttle.stat_.total_throttling_size_); - ASSERT_EQ(1, throttle.stat_.total_throttling_task_cnt_); - ASSERT_EQ(0, throttle.stat_.total_skipped_task_cnt_); - ASSERT_EQ(0, throttle.stat_.total_skipped_size_); - ASSERT_EQ(log_size, throttle.appended_log_size_cur_round_); - ASSERT_EQ(5, throttle.submitted_seq_); - ASSERT_EQ(5, throttle.handled_seq_); + ASSERT_EQ(log_size, throttle->stat_.total_throttling_size_); + ASSERT_EQ(1, throttle->stat_.total_throttling_task_cnt_); + ASSERT_EQ(0, throttle->stat_.total_skipped_task_cnt_); + ASSERT_EQ(0, throttle->stat_.total_skipped_size_); + ASSERT_EQ(log_size, throttle->appended_log_size_cur_round_); + ASSERT_EQ(5, log_io_worker->purge_throttling_task_submitted_seq_); + ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_); palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ = 500 * MB; usleep(LogWritingThrottle::UPDATE_INTERVAL_US); palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 1 * MB)); max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn_1, leader); - ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_); - ASSERT_EQ(false, throttle.need_throttling_()); - ASSERT_EQ(true, throttle.stat_.has_ever_throttled()); - ASSERT_EQ(true, OB_INVALID_TIMESTAMP != throttle.stat_.stop_ts_); - ASSERT_EQ(log_size, throttle.stat_.total_throttling_size_); - ASSERT_EQ(1, throttle.stat_.total_throttling_task_cnt_); - ASSERT_EQ(0, throttle.stat_.total_skipped_task_cnt_); - ASSERT_EQ(0, throttle.stat_.total_skipped_size_); - ASSERT_EQ(0, throttle.appended_log_size_cur_round_); + ASSERT_EQ(invalid_throttle_options, throttle->throttling_options_); + ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_)); + ASSERT_EQ(true, throttle->stat_.has_ever_throttled()); + ASSERT_EQ(true, OB_INVALID_TIMESTAMP != throttle->stat_.stop_ts_); + ASSERT_EQ(log_size, throttle->stat_.total_throttling_size_); + ASSERT_EQ(1, throttle->stat_.total_throttling_task_cnt_); + ASSERT_EQ(0, throttle->stat_.total_skipped_task_cnt_); + ASSERT_EQ(0, throttle->stat_.total_skipped_size_); + ASSERT_EQ(0, throttle->appended_log_size_cur_round_); max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn(); wait_lsn_until_flushed(max_lsn_1, leader); @@ -245,8 +246,8 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic) LogEngine *log_engine = &leader.palf_handle_impl_->log_engine_; IOTaskCond io_task_cond_1(id, log_engine->palf_epoch_); EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_1)); - ASSERT_EQ(6, throttle.submitted_seq_); - ASSERT_EQ(5, throttle.handled_seq_); + ASSERT_EQ(6, log_io_worker->purge_throttling_task_submitted_seq_); + ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_); leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE; EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 1 * MB)); max_lsn_1 = leader.palf_handle_impl_->sw_.get_max_lsn(); @@ -254,25 +255,25 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic) wait_lsn_until_submitted(max_lsn_1, leader); IOTaskCond io_task_cond_2(id, log_engine->palf_epoch_); EXPECT_EQ(OB_SUCCESS, log_io_worker->submit_io_task(&io_task_cond_2)); - ASSERT_EQ(7, throttle.submitted_seq_); - ASSERT_EQ(5, throttle.handled_seq_); + ASSERT_EQ(7, log_io_worker->purge_throttling_task_submitted_seq_); + ASSERT_EQ(5, log_io_worker->purge_throttling_task_handled_seq_); io_task_cond_1.cond_.signal(); wait_lsn_until_flushed(max_lsn_1, leader); usleep(10 * 1000); - ASSERT_EQ(7, throttle.submitted_seq_); - ASSERT_EQ(6, throttle.handled_seq_); + ASSERT_EQ(7, log_io_worker->purge_throttling_task_submitted_seq_); + ASSERT_EQ(6, log_io_worker->purge_throttling_task_handled_seq_); io_task_cond_2.cond_.signal(); usleep(10 * 1000); - ASSERT_EQ(7, throttle.submitted_seq_); - ASSERT_EQ(7, throttle.handled_seq_); + ASSERT_EQ(7, log_io_worker->purge_throttling_task_submitted_seq_); + ASSERT_EQ(7, log_io_worker->purge_throttling_task_handled_seq_); palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options); - ASSERT_EQ(true, throttle.need_throttling_()); - ASSERT_EQ(true, throttle.stat_.has_ever_throttled()); - ASSERT_EQ(true, throttle.stat_.start_ts_ > cur_ts); - ASSERT_EQ(0, throttle.stat_.total_throttling_size_); - ASSERT_EQ(0, throttle.stat_.total_throttling_task_cnt_); - ASSERT_EQ(10, throttle.stat_.total_skipped_task_cnt_); - ASSERT_EQ(10 * (log_size), throttle.stat_.total_skipped_size_); + ASSERT_EQ(true, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_)); + ASSERT_EQ(true, throttle->stat_.has_ever_throttled()); + ASSERT_EQ(true, throttle->stat_.start_ts_ > cur_ts); + ASSERT_EQ(0, throttle->stat_.total_throttling_size_); + ASSERT_EQ(0, throttle->stat_.total_throttling_task_cnt_); + ASSERT_EQ(10, throttle->stat_.total_skipped_task_cnt_); + ASSERT_EQ(10 * (log_size), throttle->stat_.total_skipped_size_); int64_t cur_has_batched_size = log_io_worker->batch_io_task_mgr_.has_batched_size_; // no io reduce during writing throttling ASSERT_EQ(cur_has_batched_size, prev_has_batched_size); @@ -287,8 +288,8 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic) wait_lsn_until_flushed(max_lsn_2, leader); int64_t break_ts = common::ObClockGenerator::getClock(); EXPECT_EQ(true, (break_ts-cur_ts) < 1 * 1000 * 1000); - ASSERT_EQ(false, throttle.need_throttling_()); - ASSERT_EQ(true, throttle.stat_.has_ever_throttled()); + ASSERT_EQ(false, throttle->need_throttling_not_guarded_by_lock_(log_io_worker->need_purging_throttling_func_)); + ASSERT_EQ(true, throttle->stat_.has_ever_throttled()); palf_env_impl.disk_options_wrapper_.disk_opts_for_recycling_blocks_.log_disk_throttling_percentage_ = throttling_percentage; @@ -321,6 +322,7 @@ TEST_F(TestObSimpleLogClusterLogThrottling, test_throttling_basic) palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 40; usleep(LogWritingThrottle::UPDATE_INTERVAL_US); + cur_ts = common::ObClockGenerator::getClock(); leader.palf_handle_impl_->sw_.freeze_mode_ = PERIOD_FREEZE_MODE; ASSERT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 2 * MB)); diff --git a/mittest/mtlenv/mock_tenant_module_env.h b/mittest/mtlenv/mock_tenant_module_env.h index c5db1a84b2..be5b4c8c42 100644 --- a/mittest/mtlenv/mock_tenant_module_env.h +++ b/mittest/mtlenv/mock_tenant_module_env.h @@ -728,23 +728,21 @@ int MockTenantModuleEnv::start_() ret = OB_ERR_UNEXPECTED; STORAGE_LOG(ERROR, "fail to switch to sys tenant", KP(log_service)); } else { - palf::LogIOWorkerConfig log_io_worker_config; - log_io_worker_config.io_worker_num_ = 1; - log_io_worker_config.io_queue_capcity_ = 100 * 1024; - log_io_worker_config.batch_width_ = 8; - log_io_worker_config.batch_depth_ = palf::PALF_SLIDING_WINDOW_SIZE; - - palf::LogIOWorker &use_io_worker = log_service->palf_env_->palf_env_impl_.log_io_worker_wrapper_.user_log_io_worker_; - if (OB_FAIL(use_io_worker.init(log_io_worker_config, tenant_id, - log_service->palf_env_->palf_env_impl_.cb_thread_pool_.get_tg_id(), - log_service->palf_env_->palf_env_impl_.log_alloc_mgr_, - &log_service->palf_env_->palf_env_impl_))) { - STORAGE_LOG(ERROR, "fail to init user_io_worker", KP(log_service)); - } else if (OB_FAIL(use_io_worker.start())) { - STORAGE_LOG(ERROR, "fail to init start", KP(log_service)); + palf::PalfEnvImpl *palf_env_impl = &log_service->palf_env_->palf_env_impl_; + palf::LogIOWorkerWrapper &log_iow_wrapper = palf_env_impl->log_io_worker_wrapper_; + palf::LogIOWorkerConfig new_config; + const int64_t mock_tenant_id = 1; + palf_env_impl->init_log_io_worker_config_(1, mock_tenant_id, new_config); + new_config.io_worker_num_ = 4; + log_iow_wrapper.destory_and_free_log_io_workers_(); + if (OB_FAIL(log_iow_wrapper.create_and_init_log_io_workers_( + new_config, mock_tenant_id, palf_env_impl->cb_thread_pool_.get_tg_id(), palf_env_impl->log_alloc_mgr_, palf_env_impl))) { + STORAGE_LOG(WARN, "failed to create_and_init_log_io_workers_", K(new_config)); + } else if (FALSE_IT(log_iow_wrapper.log_writer_parallelism_ = new_config.io_worker_num_)) { + } else if (FALSE_IT(log_iow_wrapper.is_user_tenant_ = true)) { + } else if (OB_FAIL(log_iow_wrapper.start_())) { + STORAGE_LOG(WARN, "failed to start_ log_iow_wrapper", K(new_config)); } else { - //set this to stop user_io_worker - log_service->palf_env_->palf_env_impl_.log_io_worker_wrapper_.is_user_tenant_ = true; } } } diff --git a/src/logservice/CMakeLists.txt b/src/logservice/CMakeLists.txt index 452f4da27e..1c150193f2 100644 --- a/src/logservice/CMakeLists.txt +++ b/src/logservice/CMakeLists.txt @@ -144,6 +144,7 @@ ob_set_subtarget(ob_logservice palf palf/log_updater.cpp palf/log_io_utils.cpp palf/log_io_worker_wrapper.cpp + palf/log_throttle.cpp ) ob_set_subtarget(ob_logservice palf_election diff --git a/src/logservice/ob_log_service.cpp b/src/logservice/ob_log_service.cpp index dbed2ef178..deaa9a2818 100644 --- a/src/logservice/ob_log_service.cpp +++ b/src/logservice/ob_log_service.cpp @@ -521,6 +521,7 @@ int ObLogService::update_palf_options_except_disk_usage_limit_size() palf_opts.compress_options_.enable_transport_compress_ = tenant_config->log_transport_compress_all; palf_opts.compress_options_.transport_compress_func_ = compressor_type; palf_opts.rebuild_replica_log_lag_threshold_ = tenant_config->_rebuild_replica_log_lag_threshold; + palf_opts.disk_options_.log_writer_parallelism_ = tenant_config->_log_writer_parallelism; if (OB_FAIL(palf_env_->update_options(palf_opts))) { CLOG_LOG(WARN, "palf update_options failed", K(MTL_ID()), K(ret), K(palf_opts)); } else { diff --git a/src/logservice/palf/log_io_worker.cpp b/src/logservice/palf/log_io_worker.cpp index 05069231b4..9049ee3e8f 100644 --- a/src/logservice/palf/log_io_worker.cpp +++ b/src/logservice/palf/log_io_worker.cpp @@ -20,6 +20,7 @@ #include "share/ob_throttling_utils.h" //ObThrottlingUtils #include "log_io_task.h" // LogIOTask #include "palf_env_impl.h" // PalfEnvImpl +#include "log_throttle.h" // LogWritingThrottle namespace oceanbase { @@ -27,195 +28,6 @@ using namespace common; using namespace share; namespace palf { -void LogThrottlingStat::reset() -{ - start_ts_ = OB_INVALID_TIMESTAMP; - stop_ts_ = OB_INVALID_TIMESTAMP; - total_throttling_interval_ = 0; - total_throttling_size_ = 0; - total_throttling_task_cnt_ = 0; - total_skipped_size_ = 0; - total_skipped_task_cnt_ = 0; - max_throttling_interval_ = 0; -} -void LogWritingThrottle::reset() -{ - last_update_ts_ = OB_INVALID_TIMESTAMP; - need_writing_throttling_notified_ = false; - ATOMIC_SET(&submitted_seq_, 0); - ATOMIC_SET(&handled_seq_, 0); - appended_log_size_cur_round_ = 0; - decay_factor_ = 0; - throttling_options_.reset(); - stat_.reset(); -} - -int LogWritingThrottle::update_throttling_options(IPalfEnvImpl *palf_env_impl) -{ - int ret = OB_SUCCESS; - if (OB_ISNULL(palf_env_impl)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("palf_env_impl is NULL", KPC(this)); - } else { - int64_t cur_ts = ObClockGenerator::getClock(); - bool unused_has_freed_up_space = false; - if ((cur_ts > last_update_ts_ + UPDATE_INTERVAL_US) - && OB_FAIL(update_throtting_options_(palf_env_impl, unused_has_freed_up_space))) { - LOG_WARN("failed to update_throttling_info", KPC(this)); - } - } - return ret; -} - -int LogWritingThrottle::throttling(const int64_t throttling_size, IPalfEnvImpl *palf_env_impl) -{ - int ret = OB_SUCCESS; - if (OB_ISNULL(palf_env_impl)) { - ret = OB_INVALID_ARGUMENT; - } else if (throttling_size < 0) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("invalid throttling_size", K(throttling_size), KPC(this)); - } else if (0 == throttling_size) { - //no need throttling - } else { - if (need_throttling_()) { - const int64_t cur_unrecyclable_size = throttling_options_.unrecyclable_disk_space_ + appended_log_size_cur_round_; - const int64_t trigger_base_log_disk_size = throttling_options_.total_disk_space_ * throttling_options_.trigger_percentage_ / 100; - int64_t time_interval = 0; - if (OB_FAIL(ObThrottlingUtils::get_throttling_interval(THROTTLING_CHUNK_SIZE, throttling_size, trigger_base_log_disk_size, - cur_unrecyclable_size, decay_factor_, time_interval))) { - LOG_WARN("failed to get_throttling_interval", KPC(this)); - } - int64_t remain_interval_us = time_interval; - bool has_freed_up_space = false; - while (OB_SUCC(ret) && remain_interval_us > 0) { - const int64_t real_interval = MIN(remain_interval_us, DETECT_INTERVAL_US); - usleep(real_interval); - remain_interval_us -= real_interval; - if (remain_interval_us <= 0) { - //do nothing - } else if (OB_FAIL(update_throtting_options_(palf_env_impl, has_freed_up_space))) { - LOG_WARN("failed to update_throttling_info_", KPC(this), K(time_interval), K(remain_interval_us)); - } else if (!need_throttling_() || has_freed_up_space) { - LOG_TRACE("no need throttling or log disk has been freed up", KPC(this), K(time_interval), K(remain_interval_us)); - break; - } - } - stat_.after_throttling(time_interval - remain_interval_us, throttling_size); - } else if (need_throttling_with_options_()) { - stat_.after_throttling(0, throttling_size); - } - - if (stat_.has_ever_throttled()) { - if (REACH_TIME_INTERVAL(2 * 1000 * 1000L)) { - PALF_LOG(INFO, "[LOG DISK THROTTLING] [STAT]", KPC(this)); - } - } - } - return ret; -} - -int LogWritingThrottle::after_append_log(const int64_t log_size, const int64_t seq) -{ - appended_log_size_cur_round_ += log_size; - int ret = OB_SUCCESS; - if (OB_UNLIKELY(log_size < 0 || seq < 0)) { - ret = OB_INVALID_ARGUMENT; - PALF_LOG(ERROR, "invalid argument", K(seq), K(log_size)); - } else if (seq > 0) { - if (seq > ATOMIC_LOAD(&handled_seq_) && seq <= ATOMIC_LOAD(&submitted_seq_)) { - ATOMIC_SET(&handled_seq_, seq); - } else { - ret = OB_ERR_UNEXPECTED; - PALF_LOG(ERROR, "unexpected seq", KPC(this), K(seq)); - } - } else {/*do nothing*/} - return ret; -} - -int LogWritingThrottle::update_throtting_options_(IPalfEnvImpl *palf_env_impl, bool &has_freed_up_space) -{ - int ret = OB_SUCCESS; - const int64_t cur_ts = ObClockGenerator::getClock(); - if (ATOMIC_LOAD(&need_writing_throttling_notified_)) { - PalfThrottleOptions new_throttling_options; - if (OB_FAIL(palf_env_impl->get_throttling_options(new_throttling_options))) { - PALF_LOG(WARN, "failed to get_writing_throttling_option"); - } else if (OB_UNLIKELY(!new_throttling_options.is_valid())) { - ret = OB_ERR_UNEXPECTED; - PALF_LOG(WARN, "options is invalid", K(new_throttling_options), KPC(this)); - } else { - const bool need_throttling = new_throttling_options.need_throttling(); - const int64_t new_available_size_after_limit = new_throttling_options.get_available_size_after_limit(); - bool need_update_decay_factor = false; - bool need_start_throttling = false; - - if (need_throttling) { - if (!throttling_options_.need_throttling()) { - need_start_throttling = true; - need_update_decay_factor = true; - } else { - need_update_decay_factor = (throttling_options_.get_available_size_after_limit() != new_available_size_after_limit); - } - if (need_update_decay_factor) { - if (OB_FAIL(ObThrottlingUtils::calc_decay_factor(new_available_size_after_limit, THROTTLING_DURATION_US, - THROTTLING_CHUNK_SIZE, decay_factor_))) { - PALF_LOG(ERROR, "failed to calc_decay_factor", K(throttling_options_), "duration(s)", - THROTTLING_DURATION_US / (1000 * 1000), K(THROTTLING_CHUNK_SIZE)); - } else { - PALF_LOG(INFO, "[LOG DISK THROTTLING] success to calc_decay_factor", K(decay_factor_), K(throttling_options_), - K(new_throttling_options), "duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE), - KPC(this)); - } - } - - if (OB_SUCC(ret)) { - // update other field - has_freed_up_space = new_throttling_options.unrecyclable_disk_space_ < throttling_options_.unrecyclable_disk_space_; - bool has_unrecyclable_space_changed = new_throttling_options.unrecyclable_disk_space_ != throttling_options_.unrecyclable_disk_space_; - if (has_unrecyclable_space_changed || need_start_throttling) { - // reset appended_log_size_cur_round_ when unrecyclable_disk_space_ changed - appended_log_size_cur_round_ = 0; - } - throttling_options_ = new_throttling_options; - if (need_start_throttling) { - const LogThrottlingStat old_stat = stat_; - stat_.start_throttling(); - PALF_LOG(INFO, "[LOG DISK THROTTLING] [START]", K(old_stat), KPC(this), - "duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE)); - } - } - } else { - if (throttling_options_.need_throttling()) { - PALF_LOG(INFO, "[LOG DISK THROTTLING] [STOP]", KPC(this), - "duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE)); - clean_up_(); - stat_.stop_throttling(); - } - } - } - } else { - if (throttling_options_.need_throttling()) { - PALF_LOG(INFO, "[LOG DISK THROTTLING] [STOP] no need throttling any more", KPC(this), - "duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE)); - clean_up_(); - stat_.stop_throttling(); - } - } - if (OB_SUCC(ret)) { - last_update_ts_ = cur_ts; - } - return ret; -} - -void LogWritingThrottle::clean_up_() -{ - //do not reset submitted_seq_ && handled_seq_ && last_update_ts_ && stat_ - appended_log_size_cur_round_ = 0; - decay_factor_ = 0; - throttling_options_.reset(); -} - LogIOWorker::LogIOWorker() : log_io_worker_num_(-1), cb_thread_pool_tg_id_(-1), @@ -224,7 +36,11 @@ LogIOWorker::LogIOWorker() do_task_count_(0), print_log_interval_(OB_INVALID_TIMESTAMP), last_working_time_(OB_INVALID_TIMESTAMP), + throttle_(NULL), log_io_worker_queue_size_stat_("[PALF STAT LOG IO WORKER QUEUE SIZE]", PALF_STAT_PRINT_INTERVAL_US), + purge_throttling_task_submitted_seq_(0), + purge_throttling_task_handled_seq_(0), + need_ignoring_throttling_(false), is_inited_(false) { } @@ -236,8 +52,10 @@ LogIOWorker::~LogIOWorker() int LogIOWorker::init(const LogIOWorkerConfig &config, const int64_t tenant_id, - int cb_thread_pool_tg_id, + const int cb_thread_pool_tg_id, ObIAllocator *allocator, + LogWritingThrottle *throttle, + const bool need_igore_throttle, IPalfEnvImpl *palf_env_impl) { int ret = OB_SUCCESS; @@ -245,10 +63,10 @@ int LogIOWorker::init(const LogIOWorkerConfig &config, ret = OB_INIT_TWICE; PALF_LOG(ERROR, "LogIOWorker has been inited", K(ret)); } else if (false == config.is_valid() || 0 >= cb_thread_pool_tg_id || OB_ISNULL(allocator) - || OB_ISNULL(palf_env_impl)) { + || OB_ISNULL(throttle) || OB_ISNULL(palf_env_impl)) { ret = OB_INVALID_ARGUMENT; PALF_LOG(ERROR, "invalid argument!!!", K(ret), K(config), K(cb_thread_pool_tg_id), KP(allocator), - KP(palf_env_impl)); + KP(throttle), KP(palf_env_impl)); } else if (OB_FAIL(queue_.init(config.io_queue_capcity_, "IOWorkerLQ", tenant_id))) { PALF_LOG(ERROR, "io task queue init failed", K(ret), K(config)); } else if (OB_FAIL(batch_io_task_mgr_.init(config.batch_width_, @@ -261,10 +79,22 @@ int LogIOWorker::init(const LogIOWorkerConfig &config, cb_thread_pool_tg_id_ = cb_thread_pool_tg_id; palf_env_impl_ = palf_env_impl; PALF_REPORT_INFO_KV(K_(log_io_worker_num), K_(cb_thread_pool_tg_id)); + throttle_ = throttle; log_io_worker_queue_size_stat_.set_extra_info(EXTRA_INFOS); - is_inited_ = true; - PALF_LOG(INFO, "LogIOWorker init success", K(ret), K(config), K(cb_thread_pool_tg_id), - KPC(palf_env_impl)); + purge_throttling_task_submitted_seq_ = 0; + purge_throttling_task_handled_seq_ = 0; + need_ignoring_throttling_ = need_igore_throttle; + need_purging_throttling_func_ = [this](){ + return has_purge_throttling_tasks_() > 0; + }; + if (!need_purging_throttling_func_.is_valid()) { + ret = OB_ERR_UNEXPECTED; + PALF_LOG(ERROR, "generate need_purging_throttling_func_ failed!!!", K(ret), K(config)); + } else { + is_inited_ = true; + PALF_LOG(INFO, "LogIOWorker init success", K(ret), K(config), K(cb_thread_pool_tg_id), + KPC(palf_env_impl)); + } } if (OB_FAIL(ret) && OB_INIT_TWICE != ret) { destroy(); @@ -277,7 +107,11 @@ void LogIOWorker::destroy() (void)stop(); (void)wait(); is_inited_ = false; + need_ignoring_throttling_ = false; + purge_throttling_task_handled_seq_ = 0; + purge_throttling_task_submitted_seq_ = 0; last_working_time_ = OB_INVALID_TIMESTAMP; + throttle_ = NULL; cb_thread_pool_tg_id_ = -1; palf_env_impl_ = NULL; log_io_worker_num_ = -1; @@ -294,15 +128,25 @@ int LogIOWorker::submit_io_task(LogIOTask *io_task) ret = OB_INVALID_ARGUMENT; } else { const bool need_purge_throttling = io_task->need_purge_throttling(); - if (need_purge_throttling) { - ObSpinLockGuard guard(throttling_lock_); - const int64_t submit_seq = throttle_.inc_and_fetch_submitted_seq(); + // When 'need_ignoring_throttling_' is true, we no need to advance purge_throttling_task_handled_seq_ + if (!need_ignoring_throttling_ && need_purge_throttling) { + // Mush hold lock, otherwise: + // 1. At T1 timestamp, alloc sequence 8 to a LogIOTask1; + // 2. At T2 timestamp, alloc sequence 9 to a LogIOTask2; + // 3. At T3 timestamp, push LogIOTask2 into queue_; + // 4. At T4 timestamp, push LogIOTask1 into queue_. + // + // After handle_io_task_with_throttling_, we need update purge_throttling_task_handled_seq_, and the + // value has been reduced in above case. + // + // If push LogIOTask into queue_ failed, rollback 'purge_throttling_task_submitted_seq_' is incorrect. + SpinLockGuard guard(lock_); + const int64_t submit_seq = inc_and_fetch_purge_throttling_submitted_seq_(); (void)io_task->set_submit_seq(submit_seq); PALF_LOG(INFO, "submit flush meta task success", KPC(io_task)); if (OB_FAIL(queue_.push(io_task))) { PALF_LOG(WARN, "fail to push io task into queue", K(ret), KP(io_task)); - //rollback submit_seq - (void)throttle_.dec_submitted_seq(); + dec_purge_throttling_submitted_seq_(); } } else { if (OB_FAIL(queue_.push(io_task))) { @@ -319,8 +163,8 @@ int LogIOWorker::notify_need_writing_throttling(const bool &need_throttling) int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; - } else { - (void)throttle_.notify_need_writing_throttling(need_throttling); + } else if (!need_ignoring_throttling_) { + (void)throttle_->notify_need_writing_throttling(need_throttling); } return ret; } @@ -336,17 +180,29 @@ int LogIOWorker::handle_io_task_with_throttling_(LogIOTask *io_task) int ret = OB_SUCCESS; const int64_t throttling_size = io_task->get_io_size(); int tmp_ret = OB_SUCCESS; - if (OB_SUCCESS != (tmp_ret = throttle_.throttling(throttling_size, palf_env_impl_))) { + if (!need_ignoring_throttling_ + && OB_SUCCESS != (tmp_ret = throttle_->throttling(throttling_size, need_purging_throttling_func_, palf_env_impl_))) { LOG_ERROR_RET(tmp_ret, "failed to do_throttling", K(throttling_size)); } const int64_t submit_seq = io_task->get_submit_seq(); if (OB_FAIL(io_task->do_task(cb_thread_pool_tg_id_, palf_env_impl_))) { PALF_LOG(WARN, "LogIOTask do_task falied"); - } else { - if (OB_SUCCESS != (tmp_ret = throttle_.after_append_log(throttling_size, submit_seq))) { + } else if (!need_ignoring_throttling_) { + const int64_t handled_seq = ATOMIC_LOAD(&purge_throttling_task_handled_seq_); + const int64_t submitted_seq = ATOMIC_LOAD(&purge_throttling_task_submitted_seq_); + // NB: for LogIOFlushLogTask, the submit_seq is always be zero. + if (submit_seq > handled_seq && submit_seq <= submitted_seq) { + ATOMIC_SET(&purge_throttling_task_handled_seq_, submit_seq); + } + if (submitted_seq < handled_seq) { + PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, + "unexpected error, purge_throttling_task_submitted_seq_ is less than purge_throttling_task_handled_seq_", + KPC(this)); + } + if (OB_SUCCESS != (tmp_ret = throttle_->after_append_log(throttling_size))) { LOG_ERROR_RET(tmp_ret, "after_append failed", KP(io_task)); } - PALF_LOG(TRACE, "handle_io_task_ success", K(submit_seq)); + PALF_LOG(TRACE, "handle_io_task_ success", K(submit_seq), KPC(this)); } return ret; } @@ -420,7 +276,7 @@ bool LogIOWorker::need_reduce_(LogIOTask *io_task) break; } //do not reduce io when writing throttling is on - return (bool_ret && (!throttle_.need_writing_throttling_notified())); + return (bool_ret && !need_ignoring_throttling_ && !throttle_->need_writing_throttling_notified()); } int LogIOWorker::reduce_io_task_(void *task) @@ -475,7 +331,8 @@ int LogIOWorker::reduce_io_task_(void *task) int LogIOWorker::update_throttling_options_() { int ret = OB_SUCCESS; - if (OB_FAIL(throttle_.update_throttling_options(palf_env_impl_))) { + // For sys log stream, no need to update throttling options. + if (!need_ignoring_throttling_ && OB_FAIL(throttle_->update_throttling_options(palf_env_impl_))) { LOG_WARN("failed to update_throttling_options"); } return ret; @@ -623,5 +480,27 @@ int LogIOWorker::BatchLogIOFlushLogTaskMgr::find_usable_batch_io_task_( } return ret; } + +int64_t LogIOWorker::inc_and_fetch_purge_throttling_submitted_seq_() +{ + return ATOMIC_AAF(&purge_throttling_task_submitted_seq_, 1); +} + +void LogIOWorker::dec_purge_throttling_submitted_seq_() +{ + ATOMIC_DEC(&purge_throttling_task_submitted_seq_); +} + +bool LogIOWorker::has_purge_throttling_tasks_() const +{ + const int64_t handled_seq = ATOMIC_LOAD(&purge_throttling_task_handled_seq_); + const int64_t submitted_seq = ATOMIC_LOAD(&purge_throttling_task_submitted_seq_); + if (submitted_seq < handled_seq) { + PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, + "unexpected error, purge_throttling_task_submitted_seq_ is less than purge_throttling_task_handled_seq_", + KPC(this)); + } + return submitted_seq != handled_seq; +} } // end namespace palf } // end namespace oceanbase diff --git a/src/logservice/palf/log_io_worker.h b/src/logservice/palf/log_io_worker.h index af67217f9f..e560f299a2 100644 --- a/src/logservice/palf/log_io_worker.h +++ b/src/logservice/palf/log_io_worker.h @@ -17,15 +17,17 @@ #include "lib/queue/ob_lighty_queue.h" // ObLightyQueue #include "lib/utility/ob_macro_utils.h" // DISALLOW_COPY_AND_ASSIGN #include "lib/utility/ob_print_utils.h" // TO_STRING_KV -#include "lib/lock/ob_spin_lock.h" // SpinLock #include "lib/thread/thread_mgr_interface.h" // TGTaskHandler #include "lib/container/ob_fixed_array.h" // ObSEArrayy #include "lib/hash/ob_array_hash_map.h" // ObArrayHashMap +#include "lib/atomic/ob_atomic.h" // ATOMIC_LOAD +#include "lib/function/ob_function.h" // ObFunction #include "share/ob_thread_pool.h" // ObThreadPool -#include "common/ob_clock_generator.h" //ObClockGenerator +#include "common/ob_clock_generator.h" // ObClockGenerator #include "log_io_task.h" // LogBatchIOFlushLogTask -#include "log_define.h" // ALF_SLIDING_WINDOW_SIZE +#include "log_define.h" // PALF_SLIDING_WINDOW_SIZE #include "palf_options.h" // PalfThrottleOptions +#include "log_throttle.h" // LogWritingThrottle namespace oceanbase { namespace common @@ -64,145 +66,6 @@ struct LogIOWorkerConfig int64_t batch_depth_; TO_STRING_KV(K_(io_worker_num), K_(io_queue_capcity), K_(batch_width), K_(batch_depth)); }; -class LogThrottlingStat -{ -public: - LogThrottlingStat() {reset();} - ~LogThrottlingStat() {reset();} - void reset(); - inline bool has_ever_throttled() const; - inline void after_throttling(const int64_t throttling_interval, const int64_t throttling_size); - inline void start_throttling(); - inline void stop_throttling(); - TO_STRING_KV(K_(start_ts), - K_(stop_ts), - K_(total_throttling_interval), - K_(total_throttling_size), - K_(total_throttling_task_cnt), - K_(total_skipped_size), - K_(total_skipped_task_cnt), - K_(max_throttling_interval)); -private: - int64_t start_ts_; - int64_t stop_ts_; - int64_t total_throttling_interval_; - int64_t total_throttling_size_; - int64_t total_throttling_task_cnt_; - -//log_size of tasks need for throttling but overlooked - int64_t total_skipped_size_; -//count of tasks need for throttling but overlooked - int64_t total_skipped_task_cnt_; - int64_t max_throttling_interval_; -}; - -inline void LogThrottlingStat::after_throttling(const int64_t throttling_interval, - const int64_t throttling_size) -{ - if (0 == throttling_interval) { - total_skipped_size_ += throttling_size; - total_skipped_task_cnt_++; - } else { - total_throttling_interval_ += throttling_interval; - total_throttling_size_ += throttling_size; - total_throttling_task_cnt_++; - max_throttling_interval_ = MAX(max_throttling_interval_, throttling_interval); - } -} - -inline bool LogThrottlingStat::has_ever_throttled() const -{ - return common::OB_INVALID_TIMESTAMP != start_ts_; -} -inline void LogThrottlingStat::start_throttling() -{ - reset(); - start_ts_ = common::ObClockGenerator::getClock(); -} -inline void LogThrottlingStat::stop_throttling() -{ - stop_ts_ = common::ObClockGenerator::getClock(); -} - -class LogWritingThrottle -{ -public: - LogWritingThrottle() {reset();} - ~LogWritingThrottle() {reset();} - void reset(); - //invoked by gc thread - inline void notify_need_writing_throttling(const bool is_need); - inline bool need_writing_throttling_notified() const; - inline int64_t inc_and_fetch_submitted_seq(); - inline void dec_submitted_seq(); - int update_throttling_options(IPalfEnvImpl *palf_env_impl); - int throttling(const int64_t io_size, IPalfEnvImpl *palf_env_impl); - int after_append_log(const int64_t log_size, const int64_t seq); - TO_STRING_KV(K_(last_update_ts), - K_(need_writing_throttling_notified), - K_(submitted_seq), - K_(handled_seq), - K_(appended_log_size_cur_round), - K_(decay_factor), - K_(throttling_options), - K_(stat)); - -private: - int update_throtting_options_(IPalfEnvImpl *palf_env_impl, bool &has_recycled_log_disk); - inline bool need_throttling_with_options_() const; - inline bool need_throttling_() const; - //reset throttling related member - void clean_up_(); -private: - static const int64_t UPDATE_INTERVAL_US = 500 * 1000L;//500ms - const int64_t DETECT_INTERVAL_US = 30 * 1000L;//1ms - const int64_t THROTTLING_DURATION_US = 1800 * 1000 * 1000L;//1800s - const int64_t THROTTLING_CHUNK_SIZE = MAX_LOG_BUFFER_SIZE; - //ts of lastest updating writing throttling info - int64_t last_update_ts_; - //ts when next log can be appended - //log_size can be appended during current round, will be reset when unrecyclable_size changed - // notified by gc, local meta may not be ready - mutable bool need_writing_throttling_notified_; - // local meta is ready after need_writing_throttling_locally_ set true - //used for meta flush task - //max_seq of task ever submitted - mutable int64_t submitted_seq_; - //max_seq of task ever handled - mutable int64_t handled_seq_; - int64_t appended_log_size_cur_round_; - double decay_factor_; - //append_speed during current round, Bytes per usecond - PalfThrottleOptions throttling_options_; - LogThrottlingStat stat_; -}; - -inline void LogWritingThrottle::notify_need_writing_throttling(const bool is_need) { - ATOMIC_SET(&need_writing_throttling_notified_, is_need); -} - -inline bool LogWritingThrottle::need_writing_throttling_notified() const { - return ATOMIC_LOAD(&need_writing_throttling_notified_); -} - -inline bool LogWritingThrottle::need_throttling_with_options_() const -{ - return ATOMIC_LOAD(&need_writing_throttling_notified_) && throttling_options_.need_throttling(); -} -inline bool LogWritingThrottle::need_throttling_() const -{ - return need_throttling_with_options_() && ATOMIC_LOAD(&handled_seq_) >= ATOMIC_LOAD(&submitted_seq_); -} - -int64_t LogWritingThrottle::inc_and_fetch_submitted_seq() -{ - return ATOMIC_AAF(&submitted_seq_, 1); -} - -void LogWritingThrottle::dec_submitted_seq() -{ - ATOMIC_DEC(&submitted_seq_); -} class LogIOWorker : public share::ObThreadPool { @@ -211,8 +74,10 @@ public: ~LogIOWorker(); int init(const LogIOWorkerConfig &config, const int64_t tenant_id, - int cb_thread_pool_tg_id, + const int cb_thread_pool_tg_id, ObIAllocator *allocaotr, + LogWritingThrottle *throttle, + const bool need_ignore_throttle, IPalfEnvImpl *palf_env_impl); void destroy(); @@ -222,7 +87,7 @@ public: int notify_need_writing_throttling(const bool &need_throtting); static constexpr int64_t MAX_THREAD_NUM = 1; - TO_STRING_KV(K_(log_io_worker_num), K_(cb_thread_pool_tg_id)); + TO_STRING_KV(K_(log_io_worker_num), K_(cb_thread_pool_tg_id), K_(purge_throttling_task_handled_seq), K_(purge_throttling_task_submitted_seq)); private: bool need_reduce_(LogIOTask *task); int reduce_io_task_(void *task); @@ -230,11 +95,12 @@ private: int handle_io_task_with_throttling_(LogIOTask *io_task); int update_throttling_options_(); int run_loop_(); + int64_t inc_and_fetch_purge_throttling_submitted_seq_(); + void dec_purge_throttling_submitted_seq_(); + bool has_purge_throttling_tasks_() const; private: static constexpr int64_t QUEUE_WAIT_TIME = 100 * 1000; private: - typedef common::ObSpinLock SpinLock; - typedef common::ObSpinLockGuard SpinLockGuard; class BatchLogIOFlushLogTaskMgr { public: @@ -256,6 +122,8 @@ private: int64_t usable_count_; int64_t batch_width_; }; + typedef common::ObSpinLock SpinLock; + typedef common::ObSpinLockGuard SpinLockGuard; // TODO: io_task_queue used to store all LogIOTask objects, and the LogIOWorker // will consume it, at nowdays, the io_task_queue is single consumer and mutil @@ -271,9 +139,18 @@ private: int64_t do_task_count_; int64_t print_log_interval_; int64_t last_working_time_; - LogWritingThrottle throttle_; - SpinLock throttling_lock_; + LogWritingThrottle *throttle_; ObMiniStat::ObStatItem log_io_worker_queue_size_stat_; + // Each LogIOTask except LogIOFlushLogTask hold a unique sequence, when 'purge_throttling_task_submitted_seq_' minus + // 'purge_throttling_task_handled_seq_' is greater than zero, purge throttling. + // 1. Only incrementing 'purge_throttling_submitted_seq_' when submit LogIOTask which need purge throttling. + // 2. Set 'purge_throttling_task_handled_seq_' to 'purge_throttling_task_submitted_seq_' after handle the LogIOTask successfully. + mutable int64_t purge_throttling_task_submitted_seq_; + mutable int64_t purge_throttling_task_handled_seq_; + // ignoring throttline whatever(ie: for sys log stream, no need throttling) + bool need_ignoring_throttling_; + NeedPurgingThrottlingFunc need_purging_throttling_func_; + SpinLock lock_; bool is_inited_; }; } // end namespace palf diff --git a/src/logservice/palf/log_io_worker_wrapper.cpp b/src/logservice/palf/log_io_worker_wrapper.cpp index 59cd0a8cdf..6f8259f949 100644 --- a/src/logservice/palf/log_io_worker_wrapper.cpp +++ b/src/logservice/palf/log_io_worker_wrapper.cpp @@ -13,6 +13,7 @@ #define USING_LOG_PREFIX PALF #include "log_io_worker_wrapper.h" #include "lib/ob_define.h" +#include "share/rc/ob_tenant_base.h" // mtl_free #include "log_define.h" namespace oceanbase @@ -21,10 +22,13 @@ namespace palf { LogIOWorkerWrapper::LogIOWorkerWrapper() - : is_inited_(false), - is_user_tenant_(false), - sys_log_io_worker_(), - user_log_io_worker_() {} + : is_user_tenant_(false), + log_writer_parallelism_(-1), + log_io_workers_(NULL), + throttle_(), + round_robin_idx_(-1), + is_inited_(false) {} + LogIOWorkerWrapper::~LogIOWorkerWrapper() { @@ -34,51 +38,58 @@ LogIOWorkerWrapper::~LogIOWorkerWrapper() void LogIOWorkerWrapper::destroy() { is_inited_ = false; + round_robin_idx_ = -1; + throttle_.reset(); + destory_and_free_log_io_workers_(); + // reset after destory_and_free_log_io_workers_ + log_writer_parallelism_ = -1; is_user_tenant_ = false; - sys_log_io_worker_.destroy(); - user_log_io_worker_.destroy(); } int LogIOWorkerWrapper::init(const LogIOWorkerConfig &config, const int64_t tenant_id, int cb_thread_pool_tg_id, - ObIAllocator *allocaotor, + ObIAllocator *allocator, IPalfEnvImpl *palf_env_impl) { int ret = OB_SUCCESS; - if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id))) { + if (IS_INIT) { + ret = OB_INIT_TWICE; + LOG_WARN("LogIOWorkerWrapper has inited twice", K(config), K(tenant_id)); + } else if (!config.is_valid() || OB_UNLIKELY(!is_valid_tenant_id(tenant_id)) + || 0 >= cb_thread_pool_tg_id || OB_ISNULL(allocator) || OB_ISNULL(palf_env_impl)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid tenant_id", K(tenant_id)); - } else if(FALSE_IT(is_user_tenant_ = is_user_tenant(tenant_id))) { - } else if (OB_FAIL(sys_log_io_worker_.init(config, - tenant_id, - cb_thread_pool_tg_id, - allocaotor, - palf_env_impl))) { - LOG_WARN("failed to init sys_log_io_worker", K(ret)); - } else if (is_user_tenant_ && OB_FAIL(user_log_io_worker_.init(config, - tenant_id, - cb_thread_pool_tg_id, - allocaotor, palf_env_impl))) { - sys_log_io_worker_.destroy(); - LOG_WARN("failed to init user_log_io_worker"); + LOG_WARN("invalid tenant_id", K(config), K(tenant_id), K(cb_thread_pool_tg_id), KP(allocator), + KP(palf_env_impl)); + } else if (OB_FAIL(create_and_init_log_io_workers_(config, tenant_id, cb_thread_pool_tg_id, + allocator, palf_env_impl))) { + LOG_WARN("init_log_io_workers_ failed", K(config)); } else { + is_user_tenant_ = is_user_tenant(tenant_id); + log_writer_parallelism_ = config.io_worker_num_; + throttle_.reset(); + round_robin_idx_ = 0; is_inited_ = true; - LOG_INFO("success to init LogIOWorkerWrapper", K(tenant_id), KPC(this)); + LOG_INFO("success to init LogIOWorkerWrapper", K(config), K(tenant_id), KPC(this)); + } + if (OB_FAIL(ret) && OB_INIT_TWICE != ret) { + destory_and_free_log_io_workers_(); } return ret; } + LogIOWorker *LogIOWorkerWrapper::get_log_io_worker(const int64_t palf_id) { - return is_sys_palf_id(palf_id) ? &sys_log_io_worker_ : &user_log_io_worker_; + int64_t index = palf_id_to_index_(palf_id); + LogIOWorker *iow = log_io_workers_ + index; + PALF_LOG(INFO, "get_log_io_worker success", KPC(this), K(palf_id), K(index), KP(iow)); + return iow; } int LogIOWorkerWrapper::start() { int ret = OB_SUCCESS; - if (OB_FAIL(sys_log_io_worker_.start())) { - LOG_WARN("failed to start sys_log_io_worker"); - } else if (is_user_tenant_ && OB_FAIL(user_log_io_worker_.start())) { - LOG_WARN("failed to start user_log_io_worker"); + if (OB_FAIL(start_())) { + LOG_WARN("failed to start log_io_workers_"); } else { LOG_INFO("success to start LogIOWorkerWrapper", KPC(this)); } @@ -87,22 +98,16 @@ int LogIOWorkerWrapper::start() void LogIOWorkerWrapper::stop() { - LOG_INFO("LogIOWorkerWrapper starts stopping", KPC(this)); - sys_log_io_worker_.stop(); - if (is_user_tenant_) { - user_log_io_worker_.stop(); - } - LOG_INFO("LogIOWorkerWrapper has finished stopping", KPC(this)); + PALF_LOG(INFO, "LogIOWorkerWrapper starts stopping", KPC(this)); + stop_(); + PALF_LOG(INFO, "LogIOWorkerWrapper has finished stopping", KPC(this)); } void LogIOWorkerWrapper::wait() { - LOG_INFO("LogIOWorkerWrapper starts waiting", KPC(this)); - sys_log_io_worker_.wait(); - if (is_user_tenant_) { - user_log_io_worker_.wait(); - } - LOG_INFO("LogIOWorkerWrapper has finished waiting", KPC(this)); + PALF_LOG(INFO, " LogIOWorkerWrapper starts waiting", KPC(this)); + wait_(); + PALF_LOG(INFO, "LogIOWorkerWrapper has finished waiting", KPC(this)); } int LogIOWorkerWrapper::notify_need_writing_throttling(const bool &need_throttling) @@ -112,9 +117,8 @@ int LogIOWorkerWrapper::notify_need_writing_throttling(const bool &need_throttli ret = OB_NOT_INIT; } else if (!is_user_tenant_) { //need no nothing - } else if (OB_FAIL(user_log_io_worker_.notify_need_writing_throttling(need_throttling))) { - LOG_WARN("failed to notify_need_writing_throttling", K(need_throttling)); } else { + throttle_.notify_need_writing_throttling(need_throttling); if (need_throttling) { LOG_INFO("success to notify_need_writing_throttling True"); } @@ -124,10 +128,112 @@ int LogIOWorkerWrapper::notify_need_writing_throttling(const bool &need_throttli int64_t LogIOWorkerWrapper::get_last_working_time() const { - const int64_t sys_last_working_time = sys_log_io_worker_.get_last_working_time(); - const int64_t user_last_working_time = user_log_io_worker_.get_last_working_time(); - return MAX(sys_last_working_time, user_last_working_time); + int64_t last_working_time = OB_INVALID_TIMESTAMP; + if (IS_NOT_INIT) { + PALF_LOG_RET(ERROR, OB_NOT_INIT, "LogIOWorkerWrapper not inited", KPC(this)); + } else { + for (int64_t i = 0; i < log_writer_parallelism_; i++) { + last_working_time = MAX(last_working_time, log_io_workers_[i].get_last_working_time()); + } + } + return last_working_time; } +int LogIOWorkerWrapper::create_and_init_log_io_workers_(const LogIOWorkerConfig &config, + const int64_t tenant_id, + const int cb_thread_pool_tg_id, + ObIAllocator *allocator, + IPalfEnvImpl *palf_env_impl) +{ + int ret = OB_SUCCESS; + const int64_t log_writer_parallelism = config.io_worker_num_; + log_io_workers_ = reinterpret_cast(share::mtl_malloc( + (log_writer_parallelism) * sizeof(LogIOWorker), "LogIOWS")); + if (NULL == log_io_workers_) { + ret = OB_ALLOCATE_MEMORY_FAILED; + PALF_LOG(WARN, "allocate memory failed", K(log_writer_parallelism)); + } + for (int64_t i = 0; i < log_writer_parallelism && OB_SUCC(ret); i++) { + LogIOWorker *iow = log_io_workers_ + i; + iow = new(iow)LogIOWorker(); + // NB: the first LogIOWorker need ignoring throttling + bool need_ignoring_throttling = (i == SYS_LOG_IO_WORKER_INDEX); + if (OB_FAIL(iow->init(config, tenant_id, cb_thread_pool_tg_id, allocator, + &throttle_, need_ignoring_throttling, palf_env_impl))) { + PALF_LOG(WARN, "init LogIOWorker failed", K(i), K(config), K(tenant_id), + K(cb_thread_pool_tg_id), KP(allocator), KP(palf_env_impl)); + } else { + PALF_LOG(INFO, "init LogIOWorker success", K(i), K(config), K(tenant_id), + K(cb_thread_pool_tg_id), KP(allocator), KP(palf_env_impl), KP(iow), + KP(log_io_workers_)); + } + } + return ret; +} + +int LogIOWorkerWrapper::start_() +{ + int ret = OB_SUCCESS; + for (int64_t i = 0; i < log_writer_parallelism_ && OB_SUCC(ret); i++) { + LogIOWorker *iow = log_io_workers_ + i; + if (OB_FAIL(iow->start())) { + PALF_LOG(WARN, "start LogIOWorker failed", K(i)); + } + } + return ret; +} + +void LogIOWorkerWrapper::stop_() +{ + for (int64_t i = 0; i < log_writer_parallelism_; i++) { + LogIOWorker *iow = log_io_workers_ + i; + iow->stop(); + } +} + +void LogIOWorkerWrapper::wait_() +{ + for (int64_t i = 0; i < log_writer_parallelism_; i++) { + LogIOWorker *iow = log_io_workers_ + i; + iow->wait(); + } +} + +void LogIOWorkerWrapper::destory_and_free_log_io_workers_() +{ + PALF_LOG(INFO, "destory_log_io_workers_ success", KPC(this)); + if (NULL != log_io_workers_) { + for (int64_t i = 0; i < log_writer_parallelism_; i++) { + LogIOWorker *iow = log_io_workers_ + i; + iow->stop(); + iow->wait(); + iow->destroy(); + iow->~LogIOWorker(); + } + share::mtl_free(log_io_workers_); + log_io_workers_ = NULL; + } +} + +int64_t LogIOWorkerWrapper::palf_id_to_index_(const int64_t palf_id) +{ + int64_t index = -1; + // For sys log stream, index set to 0. + if (is_sys_palf_id(palf_id)) { + index = SYS_LOG_IO_WORKER_INDEX; + } else { + const int64_t hash_factor = log_writer_parallelism_ - 1; + if (hash_factor <= 0 && is_user_tenant_) { + PALF_LOG_RET(ERROR, OB_ERR_UNEXPECTED, "unexpected error, log_writer_parallelism_ must be greater than 1 when it's user tenant", + KPC(this), K(palf_id)); + OB_ASSERT(false); + } + // NB: SYS_LOG_IO_WORKER_INDEX is 0, others should not use this LogIOWorker. + index = (round_robin_idx_++ % hash_factor) + 1; + PALF_LOG(INFO, "palf_id_to_index_ success", KPC(this), K(palf_id), K(index)); + OB_ASSERT(index < log_writer_parallelism_); + } + return index; +} }//end of namespace palf }//end of namespace oceanbase diff --git a/src/logservice/palf/log_io_worker_wrapper.h b/src/logservice/palf/log_io_worker_wrapper.h index b26b6d2227..fd554d14ea 100644 --- a/src/logservice/palf/log_io_worker_wrapper.h +++ b/src/logservice/palf/log_io_worker_wrapper.h @@ -13,6 +13,7 @@ #ifndef OCEANBASE_LOGSERVIVE_LOG_IO_WORKER_WRAPPER_ #define OCEANBASE_LOGSERVIVE_LOG_IO_WORKER_WRAPPER_ +#include "log_throttle.h" #include "log_io_worker.h" namespace oceanbase @@ -24,7 +25,6 @@ class LogIOWorkerWrapper public: LogIOWorkerWrapper(); ~LogIOWorkerWrapper(); - int init(const LogIOWorkerConfig &config, const int64_t tenant_id, int cb_thread_pool_tg_id, @@ -34,18 +34,35 @@ public: int start(); void stop(); void wait(); + // NB: nowdays, this interface can only be called when create_palf_handle_impl!!! otherwise, round_robin_idx_ + // will not be correct. LogIOWorker *get_log_io_worker(const int64_t palf_id); int notify_need_writing_throttling(const bool &need_throtting); int64_t get_last_working_time() const; - TO_STRING_KV(K_(is_inited), K_(is_user_tenant), K_(sys_log_io_worker), K_(user_log_io_worker)); + TO_STRING_KV(K_(is_inited), K_(is_user_tenant), K_(log_writer_parallelism), KP(log_io_workers_), K_(round_robin_idx)); + +private: + int create_and_init_log_io_workers_(const LogIOWorkerConfig &config, + const int64_t tenant_id, + int cb_thread_pool_tg_id, + ObIAllocator *allocaotr, + IPalfEnvImpl *palf_env_impl); + int start_(); + void stop_(); + void wait_(); + void destory_and_free_log_io_workers_(); + int64_t palf_id_to_index_(const int64_t palf_id); + constexpr static int64_t SYS_LOG_IO_WORKER_INDEX = 0; private: - bool is_inited_; bool is_user_tenant_; - //for log stream NO.1 - LogIOWorker sys_log_io_worker_; - //for log streams except NO.1 - LogIOWorker user_log_io_worker_; + // 'log_writer_parallelism_' has include LogIOWorker which is used for sys log stream. + int64_t log_writer_parallelism_; + // The layout of LogIOWorker: | sys log ioworker | others | + LogIOWorker *log_io_workers_; + LogWritingThrottle throttle_; + int64_t round_robin_idx_; + bool is_inited_; }; }//end of namespace palf diff --git a/src/logservice/palf/log_throttle.cpp b/src/logservice/palf/log_throttle.cpp new file mode 100644 index 0000000000..6f2d9ecbcc --- /dev/null +++ b/src/logservice/palf/log_throttle.cpp @@ -0,0 +1,223 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#define USING_LOG_PREFIX PALF + +#include "log_throttle.h" +#include "share/ob_throttling_utils.h" //ObThrottlingUtils +#include "palf_env_impl.h" // PalfEnvImpl + +namespace oceanbase +{ +using namespace common; +using namespace share; +namespace palf +{ +void LogThrottlingStat::reset() +{ + start_ts_ = OB_INVALID_TIMESTAMP; + stop_ts_ = OB_INVALID_TIMESTAMP; + total_throttling_interval_ = 0; + total_throttling_size_ = 0; + total_throttling_task_cnt_ = 0; + total_skipped_size_ = 0; + total_skipped_task_cnt_ = 0; + max_throttling_interval_ = 0; +} +void LogWritingThrottle::reset() +{ + last_update_ts_ = OB_INVALID_TIMESTAMP; + need_writing_throttling_notified_ = false; + appended_log_size_cur_round_ = 0; + decay_factor_ = 0; + throttling_options_.reset(); + stat_.reset(); +} + +int LogWritingThrottle::update_throttling_options(IPalfEnvImpl *palf_env_impl) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(palf_env_impl)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("palf_env_impl is NULL", KPC(this)); + } else { + bool unused_has_freed_up_space = false; + if (check_need_update_throtting_options_guarded_by_lock_() + && OB_FAIL(update_throtting_options_guarded_by_lock_(palf_env_impl, unused_has_freed_up_space))) { + LOG_WARN("failed to update_throttling_info", KPC(this)); + } + } + return ret; +} + +int LogWritingThrottle::throttling(const int64_t throttling_size, + const NeedPurgingThrottlingFunc &need_purging_throttling_func, + IPalfEnvImpl *palf_env_impl) +{ + int ret = OB_SUCCESS; + if (!need_purging_throttling_func.is_valid() + || OB_ISNULL(palf_env_impl)) { + ret = OB_INVALID_ARGUMENT; + } else if (throttling_size < 0) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("invalid throttling_size", K(throttling_size), KPC(this)); + } else if (0 == throttling_size) { + //no need throttling + } else { + // hold lock firstly. + lock_.lock(); + if (need_throttling_not_guarded_by_lock_(need_purging_throttling_func)) { + const int64_t cur_unrecyclable_size = throttling_options_.unrecyclable_disk_space_ + appended_log_size_cur_round_; + const int64_t trigger_base_log_disk_size = throttling_options_.total_disk_space_ * throttling_options_.trigger_percentage_ / 100; + int64_t time_interval = 0; + if (OB_FAIL(ObThrottlingUtils::get_throttling_interval(THROTTLING_CHUNK_SIZE, throttling_size, trigger_base_log_disk_size, + cur_unrecyclable_size, decay_factor_, time_interval))) { + LOG_WARN("failed to get_throttling_interval", KPC(this)); + } + int64_t remain_interval_us = time_interval; + bool has_freed_up_space = false; + // release lock_ in progress of usleep, therefore, accessing shared members in LogWritingThrottle need be guarded by lock + // in following code block. + lock_.unlock(); + while (OB_SUCC(ret) && remain_interval_us > 0) { + const int64_t real_interval = MIN(remain_interval_us, DETECT_INTERVAL_US); + usleep(real_interval); + remain_interval_us -= real_interval; + if (remain_interval_us <= 0) { + //do nothing + } else if (OB_FAIL(update_throtting_options_guarded_by_lock_(palf_env_impl, has_freed_up_space))) { + LOG_WARN("failed to update_throttling_info_", KPC(this), K(time_interval), K(remain_interval_us)); + } else if (!need_throttling_not_guarded_by_lock_(need_purging_throttling_func) + || has_freed_up_space) { + LOG_TRACE("no need throttling or log disk has been freed up", KPC(this), K(time_interval), K(remain_interval_us), K(has_freed_up_space)); + break; + } + } + // hold lock_ after ulseep, therefore, accessing shared members in LogWritingThrottle no need be guarded by lock. + lock_.lock(); + stat_.after_throttling(time_interval - remain_interval_us, throttling_size); + } else if (need_throttling_with_options_not_guarded_by_lock_()) { + stat_.after_throttling(0, throttling_size); + } + + if (stat_.has_ever_throttled()) { + if (REACH_TIME_INTERVAL(2 * 1000 * 1000L)) { + PALF_LOG(INFO, "[LOG DISK THROTTLING] [STAT]", KPC(this)); + } + } + // release lock lastly. + lock_.unlock(); + } + return ret; +} + +int LogWritingThrottle::after_append_log(const int64_t log_size) +{ + SpinLockGuard guard(lock_); + appended_log_size_cur_round_ += log_size; + int ret = OB_SUCCESS; + if (OB_UNLIKELY(log_size < 0)) { + ret = OB_INVALID_ARGUMENT; + PALF_LOG(ERROR, "invalid argument", K(log_size)); + } else {/*do nothing*/} + return ret; +} + +int LogWritingThrottle::update_throtting_options_guarded_by_lock_(IPalfEnvImpl *palf_env_impl, bool &has_freed_up_space) +{ + int ret = OB_SUCCESS; + SpinLockGuard guard(lock_); + const int64_t cur_ts = ObClockGenerator::getClock(); + if (ATOMIC_LOAD(&need_writing_throttling_notified_)) { + PalfThrottleOptions new_throttling_options; + if (OB_FAIL(palf_env_impl->get_throttling_options(new_throttling_options))) { + PALF_LOG(WARN, "failed to get_writing_throttling_option"); + } else if (OB_UNLIKELY(!new_throttling_options.is_valid())) { + ret = OB_ERR_UNEXPECTED; + PALF_LOG(WARN, "options is invalid", K(new_throttling_options), KPC(this)); + } else { + const bool need_throttling = new_throttling_options.need_throttling(); + const int64_t new_available_size_after_limit = new_throttling_options.get_available_size_after_limit(); + bool need_update_decay_factor = false; + bool need_start_throttling = false; + + if (need_throttling) { + if (!throttling_options_.need_throttling()) { + need_start_throttling = true; + need_update_decay_factor = true; + } else { + need_update_decay_factor = (throttling_options_.get_available_size_after_limit() != new_available_size_after_limit); + } + if (need_update_decay_factor) { + if (OB_FAIL(ObThrottlingUtils::calc_decay_factor(new_available_size_after_limit, THROTTLING_DURATION_US, + THROTTLING_CHUNK_SIZE, decay_factor_))) { + PALF_LOG(ERROR, "failed to calc_decay_factor", K(throttling_options_), "duration(s)", + THROTTLING_DURATION_US / (1000 * 1000), K(THROTTLING_CHUNK_SIZE)); + } else { + PALF_LOG(INFO, "[LOG DISK THROTTLING] success to calc_decay_factor", K(decay_factor_), K(throttling_options_), + K(new_throttling_options), "duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE)); + } + } + + if (OB_SUCC(ret)) { + // update other field + has_freed_up_space = new_throttling_options.unrecyclable_disk_space_ < throttling_options_.unrecyclable_disk_space_; + bool has_unrecyclable_space_changed = new_throttling_options.unrecyclable_disk_space_ != throttling_options_.unrecyclable_disk_space_; + if (has_unrecyclable_space_changed || need_start_throttling) { + // reset appended_log_size_cur_round_ when unrecyclable_disk_space_ changed + appended_log_size_cur_round_ = 0; + } + throttling_options_ = new_throttling_options; + if (need_start_throttling) { + stat_.start_throttling(); + PALF_LOG(INFO, "[LOG DISK THROTTLING] [START]", KPC(this), + "duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE)); + } + } + } else { + if (throttling_options_.need_throttling()) { + PALF_LOG(INFO, "[LOG DISK THROTTLING] [STOP]", KPC(this), + "duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE)); + clean_up_not_guarded_by_lock_(); + stat_.stop_throttling(); + } + } + } + } else { + if (throttling_options_.need_throttling()) { + PALF_LOG(INFO, "[LOG DISK THROTTLING] [STOP] no need throttling any more", KPC(this), + "duration(s)", THROTTLING_DURATION_US / (1000 * 1000L), K(THROTTLING_CHUNK_SIZE)); + clean_up_not_guarded_by_lock_(); + stat_.stop_throttling(); + } + } + if (OB_SUCC(ret)) { + last_update_ts_ = cur_ts; + } + return ret; +} + +void LogWritingThrottle::clean_up_not_guarded_by_lock_() +{ + //do not reset submitted_seq_ && handled_seq_ && last_update_ts_ && stat_ + appended_log_size_cur_round_ = 0; + decay_factor_ = 0; + throttling_options_.reset(); +} + +bool LogWritingThrottle::check_need_update_throtting_options_guarded_by_lock_() +{ + SpinLockGuard guard(lock_); + int64_t cur_ts = ObClockGenerator::getClock(); + return cur_ts > last_update_ts_ + UPDATE_INTERVAL_US; +} +} // end namespace palf +} // end namespace oceanbase diff --git a/src/logservice/palf/log_throttle.h b/src/logservice/palf/log_throttle.h new file mode 100644 index 0000000000..f06033ef37 --- /dev/null +++ b/src/logservice/palf/log_throttle.h @@ -0,0 +1,163 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OCEANBASE_LOGSERVIVE_LOG_THROTTLE_H +#define OCEANBASE_LOGSERVIVE_LOG_THROTTLE_H + +#include "lib/utility/ob_macro_utils.h" // DISALLOW_COPY_AND_ASSIGN +#include "lib/utility/ob_print_utils.h" // TO_STRING_KV +#include "lib/lock/ob_spin_lock.h" // SpinLock +#include "common/ob_clock_generator.h" // ObClockGenerator +#include "lib/function/ob_function.h" // ObFunction +#include "log_define.h" // MAX_LOG_BUFFER_SIZE +#include "palf_options.h" // PalfThrottleOptions + +namespace oceanbase +{ +namespace palf +{ +class IPalfEnvImpl; +typedef ObFunction NeedPurgingThrottlingFunc; +class LogThrottlingStat +{ +public: + LogThrottlingStat() {reset();} + ~LogThrottlingStat() {reset();} + void reset(); + inline bool has_ever_throttled() const; + inline void after_throttling(const int64_t throttling_interval, const int64_t throttling_size); + inline void start_throttling(); + inline void stop_throttling(); + TO_STRING_KV(K_(start_ts), + K_(stop_ts), + K_(total_throttling_interval), + K_(total_throttling_size), + K_(total_throttling_task_cnt), + K_(total_skipped_size), + K_(total_skipped_task_cnt), + K_(max_throttling_interval)); +private: + int64_t start_ts_; + int64_t stop_ts_; + int64_t total_throttling_interval_; + int64_t total_throttling_size_; + int64_t total_throttling_task_cnt_; + +//log_size of tasks need for throttling but overlooked + int64_t total_skipped_size_; +//count of tasks need for throttling but overlooked + int64_t total_skipped_task_cnt_; + int64_t max_throttling_interval_; +}; + +inline void LogThrottlingStat::after_throttling(const int64_t throttling_interval, + const int64_t throttling_size) +{ + if (0 == throttling_interval) { + total_skipped_size_ += throttling_size; + total_skipped_task_cnt_++; + } else { + total_throttling_interval_ += throttling_interval; + total_throttling_size_ += throttling_size; + total_throttling_task_cnt_++; + max_throttling_interval_ = MAX(max_throttling_interval_, throttling_interval); + } +} + +inline bool LogThrottlingStat::has_ever_throttled() const +{ + return common::OB_INVALID_TIMESTAMP != start_ts_; +} +inline void LogThrottlingStat::start_throttling() +{ + reset(); + start_ts_ = common::ObClockGenerator::getClock(); +} +inline void LogThrottlingStat::stop_throttling() +{ + stop_ts_ = common::ObClockGenerator::getClock(); +} + +class LogWritingThrottle +{ +public: + LogWritingThrottle() {reset();} + ~LogWritingThrottle() {reset();} + void reset(); + //invoked by gc thread + inline void notify_need_writing_throttling(const bool is_need); + inline bool need_writing_throttling_notified() const; + int update_throttling_options(IPalfEnvImpl *palf_env_impl); + int throttling(const int64_t io_size, + const NeedPurgingThrottlingFunc &need_purging_throttling_func, + IPalfEnvImpl *palf_env_impl); + int after_append_log(const int64_t log_size); + TO_STRING_KV(K_(last_update_ts), + K_(need_writing_throttling_notified), + K_(appended_log_size_cur_round), + K_(decay_factor), + K_(throttling_options), + K_(stat)); + +private: + int update_throtting_options_guarded_by_lock_(IPalfEnvImpl *palf_env_impl, bool &has_recycled_log_disk); + inline bool need_throttling_with_options_not_guarded_by_lock_() const; + inline bool need_throttling_not_guarded_by_lock_(const NeedPurgingThrottlingFunc &need_purge_throttling) const; + //reset throttling related member + void clean_up_not_guarded_by_lock_(); + bool check_need_update_throtting_options_guarded_by_lock_(); +private: + typedef common::ObSpinLock SpinLock; + typedef common::ObSpinLockGuard SpinLockGuard; + static const int64_t UPDATE_INTERVAL_US = 500 * 1000L;//500ms + const int64_t DETECT_INTERVAL_US = 30 * 1000L;//30ms + const int64_t THROTTLING_DURATION_US = 1800 * 1000 * 1000L;//1800s + const int64_t THROTTLING_CHUNK_SIZE = MAX_LOG_BUFFER_SIZE; + //ts of lastest updating writing throttling info + int64_t last_update_ts_; + //ts when next log can be appended + //log_size can be appended during current round, will be reset when unrecyclable_size changed + // notified by gc, local meta may not be ready + mutable bool need_writing_throttling_notified_; + int64_t appended_log_size_cur_round_; + double decay_factor_; + //append_speed during current round, Bytes per usecond + PalfThrottleOptions throttling_options_; + LogThrottlingStat stat_; + mutable SpinLock lock_; +}; + +inline void LogWritingThrottle::notify_need_writing_throttling(const bool is_need) { + ATOMIC_SET(&need_writing_throttling_notified_, is_need); +} + +inline bool LogWritingThrottle::need_writing_throttling_notified() const { + return ATOMIC_LOAD(&need_writing_throttling_notified_); +} + +inline bool LogWritingThrottle::need_throttling_with_options_not_guarded_by_lock_() const +{ + return ATOMIC_LOAD(&need_writing_throttling_notified_) && throttling_options_.need_throttling(); +} + +inline bool LogWritingThrottle::need_throttling_not_guarded_by_lock_( + const NeedPurgingThrottlingFunc &need_purge_throttling) const +{ + // Only need throttle under following conditions: + // 1. when the tenant writing throttling has been enabled and has been triggered; + // 2. when there is no need to purge throttle. + return need_throttling_with_options_not_guarded_by_lock_() && !need_purge_throttling(); +} + +} // end namespace palf +} // end namespace oceanbase +#endif diff --git a/src/logservice/palf/palf_env_impl.cpp b/src/logservice/palf/palf_env_impl.cpp index 3f9a07179c..3e3b9268d2 100644 --- a/src/logservice/palf/palf_env_impl.cpp +++ b/src/logservice/palf/palf_env_impl.cpp @@ -198,11 +198,6 @@ int PalfEnvImpl::init( { int ret = OB_SUCCESS; int pret = 0; - // TODO by runlin: configurable - log_io_worker_config_.io_worker_num_ = 1; - log_io_worker_config_.io_queue_capcity_ = 100 * 1024; - log_io_worker_config_.batch_width_ = 8; - log_io_worker_config_.batch_depth_ = PALF_SLIDING_WINDOW_SIZE; const int64_t io_cb_num = PALF_SLIDING_WINDOW_SIZE * 128; if (is_inited_) { ret = OB_INIT_TWICE; @@ -212,6 +207,10 @@ int PalfEnvImpl::init( ret = OB_INVALID_ARGUMENT; PALF_LOG(ERROR, "invalid arguments", K(ret), KP(transport), K(base_dir), K(self), KP(transport), KP(log_alloc_mgr), KP(log_block_pool), KP(monitor)); + } else if (OB_FAIL(init_log_io_worker_config_(options.disk_options_.log_writer_parallelism_, + tenant_id, + log_io_worker_config_))) { + PALF_LOG(WARN, "init_log_io_worker_config_ failed", K(options)); } else if (OB_FAIL(fetch_log_engine_.init(this, log_alloc_mgr))) { PALF_LOG(ERROR, "FetchLogEngine init failed", K(ret)); } else if (OB_FAIL(log_rpc_.init(self, cluster_id, tenant_id, transport))) { @@ -1242,5 +1241,40 @@ int PalfEnvImpl::get_throttling_options(PalfThrottleOptions &options) return ret; } +int PalfEnvImpl::init_log_io_worker_config_(const int log_writer_parallelism, + const int64_t tenant_id, + LogIOWorkerConfig &config) +{ + int ret = OB_SUCCESS; + // log_writer_parallelism only valid when it's user tenant. + // to support writing throttling, sys log stream must has dependent LogIOWorker. + const int64_t real_log_writer_parallelism = is_user_tenant(tenant_id) ? (log_writer_parallelism + 1) : 1; + auto tmp_upper_align_div = [](const int64_t num, const int64_t align) -> int64_t { + return (num + align - 1) / align; + }; + + constexpr int64_t default_io_queue_cap = 100 * 1024; + constexpr int64_t default_io_batch_width = 8; + // Due to the uniqueness of the log stream ID, using a hash distribution method can + // naturally balance the load in a single-unit environment, therefore, we set the + // min io queue capacity to PALF_SLIDING_WINDOW_SIZE * 2, and set default_io_batch_width + // to 1. + // TODO by zjf225077: + // to support load balance in a multi-unit environment, LogIOWorker needs to use + // a load factor to ensure that the number of log streams on each LogIOWorker is in + // a balanced state. + constexpr int64_t default_min_io_queue_cap = PALF_SLIDING_WINDOW_SIZE * 2; + constexpr int64_t default_min_batch_width = 1; + // Assume that a maximum of 100 * 1024 I/O tasks exist simultaneously in single PalfEnvImpl + config.io_worker_num_ = real_log_writer_parallelism; + config.io_queue_capcity_ = MAX(default_min_io_queue_cap, + tmp_upper_align_div(default_io_queue_cap, real_log_writer_parallelism)); + config.batch_width_ = MAX(default_min_batch_width, + tmp_upper_align_div(default_io_batch_width, real_log_writer_parallelism)); + config.batch_depth_ = PALF_SLIDING_WINDOW_SIZE; + PALF_LOG(INFO, "init_log_io_worker_config_ success", K(config), K(tenant_id), K(log_writer_parallelism)); + return ret; +} + } // end namespace palf } // end namespace oceanbase diff --git a/src/logservice/palf/palf_env_impl.h b/src/logservice/palf/palf_env_impl.h index 523ec31d62..464f8b450d 100644 --- a/src/logservice/palf/palf_env_impl.h +++ b/src/logservice/palf/palf_env_impl.h @@ -324,6 +324,10 @@ private: int check_tmp_log_dir_exist_(bool &exist) const; int remove_stale_incomplete_palf_(); + int init_log_io_worker_config_(const int log_writer_parallelism, + const int64_t tenant_id, + LogIOWorkerConfig &config); + private: typedef common::RWLock RWLock; typedef RWLock::RLockGuard RLockGuard; diff --git a/src/logservice/palf/palf_options.cpp b/src/logservice/palf/palf_options.cpp index ce9281d29d..2b5ccfe90b 100644 --- a/src/logservice/palf/palf_options.cpp +++ b/src/logservice/palf/palf_options.cpp @@ -38,6 +38,7 @@ void PalfDiskOptions::reset() log_disk_utilization_limit_threshold_ = -1; log_disk_utilization_threshold_ = -1; log_disk_throttling_percentage_ = -1; + log_writer_parallelism_ = -1; } bool PalfDiskOptions::is_valid() const @@ -47,7 +48,8 @@ bool PalfDiskOptions::is_valid() const && 1 <=log_disk_utilization_limit_threshold_ && 100 >= log_disk_utilization_limit_threshold_ && log_disk_utilization_limit_threshold_ > log_disk_utilization_threshold_ && log_disk_throttling_percentage_ >= MIN_WRITING_THTOTTLING_TRIGGER_PERCENTAGE - && log_disk_throttling_percentage_ <= 100; + && log_disk_throttling_percentage_ <= 100 + && log_writer_parallelism_ >= 1 && log_writer_parallelism_ <= 8; } bool PalfDiskOptions::operator==(const PalfDiskOptions &palf_disk_options) const @@ -55,7 +57,9 @@ bool PalfDiskOptions::operator==(const PalfDiskOptions &palf_disk_options) const return log_disk_usage_limit_size_ == palf_disk_options.log_disk_usage_limit_size_ && log_disk_utilization_threshold_ == palf_disk_options.log_disk_utilization_threshold_ && log_disk_utilization_limit_threshold_ == palf_disk_options.log_disk_utilization_limit_threshold_ - && log_disk_throttling_percentage_ == palf_disk_options.log_disk_throttling_percentage_; + && log_disk_throttling_percentage_ == palf_disk_options.log_disk_throttling_percentage_ + && log_writer_parallelism_ == palf_disk_options.log_writer_parallelism_; + } PalfDiskOptions &PalfDiskOptions::operator=(const PalfDiskOptions &other) @@ -64,6 +68,7 @@ PalfDiskOptions &PalfDiskOptions::operator=(const PalfDiskOptions &other) log_disk_utilization_threshold_ = other.log_disk_utilization_threshold_; log_disk_utilization_limit_threshold_ = other.log_disk_utilization_limit_threshold_; log_disk_throttling_percentage_ = other.log_disk_throttling_percentage_; + log_writer_parallelism_ = other.log_writer_parallelism_; return *this; } diff --git a/src/logservice/palf/palf_options.h b/src/logservice/palf/palf_options.h index 07935f81a3..8f2b0a6d53 100644 --- a/src/logservice/palf/palf_options.h +++ b/src/logservice/palf/palf_options.h @@ -19,15 +19,19 @@ namespace oceanbase { namespace palf { -// 1. 磁盘空间总大小; -// 2. 复用百分比; -// 3. 停写百分比; +// Following disk options can be set to Palf. +// 1. log_disk_usage_limit_size_, the total log disk space. +// 2. log_disk_utilization_threshold_, log_disklog disk utilization threshold before reuse log files. +// 3. log_disk_utilization_limit_threshold_, maximum of log disk usage percentage before stop submitting or receiving logs. +// 4. log_disk_throttling_percentage_, the threshold of the size of the log disk when writing_limit will be triggered. +// 5. log_writer_parallelism, the number of parallel log writer processes that can be used to write redo log entries to disk. struct PalfDiskOptions { PalfDiskOptions() : log_disk_usage_limit_size_(-1), log_disk_utilization_threshold_(-1), log_disk_utilization_limit_threshold_(-1), - log_disk_throttling_percentage_(-1) + log_disk_throttling_percentage_(-1), + log_writer_parallelism_(-1) {} ~PalfDiskOptions() { reset(); } static constexpr int64_t MB = 1024*1024ll; @@ -39,10 +43,12 @@ struct PalfDiskOptions int log_disk_utilization_threshold_; int log_disk_utilization_limit_threshold_; int64_t log_disk_throttling_percentage_; + int log_writer_parallelism_; TO_STRING_KV("log_disk_size(MB)", log_disk_usage_limit_size_ / MB, "log_disk_utilization_threshold(%)", log_disk_utilization_threshold_, "log_disk_utilization_limit_threshold(%)", log_disk_utilization_limit_threshold_, - "log_disk_throttling_percentage(%)", log_disk_throttling_percentage_); + "log_disk_throttling_percentage(%)", log_disk_throttling_percentage_, + "log_writer_parallelism", log_writer_parallelism_); }; diff --git a/src/observer/omt/ob_multi_tenant.cpp b/src/observer/omt/ob_multi_tenant.cpp index 99eb39d9a6..6744f2e4f6 100644 --- a/src/observer/omt/ob_multi_tenant.cpp +++ b/src/observer/omt/ob_multi_tenant.cpp @@ -835,6 +835,12 @@ int ObMultiTenant::create_tenant(const ObTenantMeta &meta, bool write_slog, cons } } + if (OB_SUCC(ret)) { + if (OB_FAIL(OTC_MGR.add_tenant_config(tenant_id))) { + LOG_ERROR("add tenant config fail", K(tenant_id), K(ret)); + } + } + if (OB_SUCC(ret)) { CREATE_WITH_TEMP_ENTITY(RESOURCE_OWNER, tenant->id()) { WITH_ENTITY(&tenant->ctx()) { @@ -847,8 +853,6 @@ int ObMultiTenant::create_tenant(const ObTenantMeta &meta, bool write_slog, cons if (OB_FAIL(ret)) { // do nothing - } else if (OB_FAIL(OTC_MGR.add_tenant_config(tenant_id))) { - LOG_ERROR("add tenant config fail", K(tenant_id), K(ret)); #ifdef OMT_UNITTEST } else if (!is_virtual_tenant_id(tenant_id) && OB_FAIL(OTC_MGR.got_version(tenant_id, common::ObSystemConfig::INIT_VERSION))) { diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index 3e7a0345c9..60ddbfcf9f 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -705,6 +705,14 @@ int ObTenant::construct_mtl_init_ctx(const ObTenantMeta &meta, share::ObTenantMo mtl_init_ctx_->palf_options_.disk_options_.log_disk_utilization_threshold_ = 80; mtl_init_ctx_->palf_options_.disk_options_.log_disk_utilization_limit_threshold_ = 95; mtl_init_ctx_->palf_options_.disk_options_.log_disk_throttling_percentage_ = 100; + mtl_init_ctx_->palf_options_.disk_options_.log_writer_parallelism_ = 3; + ObTenantConfig *config = TENANT_CONF(id_); + if (OB_ISNULL(config)) { + ret = is_virtual_tenant_id(id_) ? OB_SUCCESS : OB_ENTRY_NOT_EXIST; + } else { + mtl_init_ctx_->palf_options_.disk_options_.log_writer_parallelism_ = config->_log_writer_parallelism; + } + LOG_INFO("construct_mtl_init_ctx success", "palf_options", mtl_init_ctx_->palf_options_.disk_options_); } return ret; } diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index b11374ad95..705542737c 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -599,6 +599,10 @@ DEF_TIME(standby_db_fetch_log_rpc_timeout, OB_TENANT_PARAMETER, "15s", "When the rpc timeout, the log transport service switches to another server of the log restore source tenant to fetch logs. " "Range: [2s, +∞)", ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); +DEF_INT(_log_writer_parallelism, OB_TENANT_PARAMETER, "3", + "[1,8]", + "the number of parallel log writer threads that can be used to write redo log entries to disk. ", + ObParameterAttr(Section::LOGSERVICE, Source::DEFAULT, EditLevel::STATIC_EFFECTIVE)); // ========================= LogService Config End ===================== DEF_INT(resource_hard_limit, OB_CLUSTER_PARAMETER, "100", "[100, 10000]", diff --git a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result index 2e0d50bf2c..7692e41f5f 100644 --- a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result +++ b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result @@ -294,6 +294,7 @@ _io_callback_thread_count _large_query_io_percentage _lcl_op_interval _load_tde_encrypt_engine +_log_writer_parallelism _max_elr_dependent_trx_count _max_malloc_sample_interval _max_schema_slot_num diff --git a/unittest/logservice/test_palf_throttling.cpp b/unittest/logservice/test_palf_throttling.cpp index 286fb1502c..acb932f82c 100644 --- a/unittest/logservice/test_palf_throttling.cpp +++ b/unittest/logservice/test_palf_throttling.cpp @@ -38,9 +38,13 @@ public: virtual void SetUp(); virtual void TearDown(); protected: + bool g_need_purging_throttling; + NeedPurgingThrottlingFunc g_need_purging_throttling_func; }; -TestPalfThrottling::TestPalfThrottling() {} +TestPalfThrottling::TestPalfThrottling() : g_need_purging_throttling(false) { + g_need_purging_throttling_func = [this](){ return g_need_purging_throttling; }; +} TestPalfThrottling::~TestPalfThrottling() { @@ -60,7 +64,8 @@ void TestPalfThrottling::TearDown() //ObMallocAllocator::get_instance()->recycle_tenant_allocator(1001); } -TEST(TestPalfThrottling, test_palf_options) + +TEST_F(TestPalfThrottling, test_palf_options) { char buf[64] = {0}; memset(buf, 0, 64); @@ -76,11 +81,13 @@ TEST(TestPalfThrottling, test_palf_options) wrapper.disk_opts_for_stopping_writing_.log_disk_usage_limit_size_ = total_disk_size; wrapper.disk_opts_for_stopping_writing_.log_disk_utilization_threshold_ = 80; wrapper.disk_opts_for_stopping_writing_.log_disk_utilization_limit_threshold_ = utilization_limit_threshold; + wrapper.disk_opts_for_stopping_writing_.log_writer_parallelism_ = 1; int64_t unrecyclable_size = 0; wrapper.set_cur_unrecyclable_log_disk_size(unrecyclable_size); ASSERT_EQ(false, wrapper.need_throttling()); unrecyclable_size = total_disk_size * 70 /100; wrapper.set_cur_unrecyclable_log_disk_size(unrecyclable_size); + PALF_LOG(INFO, "test_palf_options trace", K(wrapper)); ASSERT_EQ(true, wrapper.need_throttling()); //test PalfThrottleOptions PalfThrottleOptions throttling_options; @@ -97,7 +104,7 @@ TEST(TestPalfThrottling, test_palf_options) ASSERT_EQ(total_disk_size * (utilization_limit_threshold - throttling_percentage)/100, throttling_options.get_available_size_after_limit()); } -TEST(TestPalfThrottling, test_throttling_stat) +TEST_F(TestPalfThrottling, test_throttling_stat) { LogThrottlingStat stat; ASSERT_EQ(false, stat.has_ever_throttled()); @@ -125,7 +132,7 @@ TEST(TestPalfThrottling, test_throttling_stat) ASSERT_EQ(0, stat.max_throttling_interval_); } -TEST(TestPalfThrottling, test_log_write_throttle) +TEST_F(TestPalfThrottling, test_log_write_throttle) { int64_t total_disk_size = 1024 * 1024 * 1024L; int64_t utilization_limit_threshold = 95; @@ -142,7 +149,7 @@ TEST(TestPalfThrottling, test_log_write_throttle) palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options); LogWritingThrottle throttle; - ASSERT_EQ(false, throttle.need_throttling_()); + ASSERT_EQ(false, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func)); ASSERT_EQ(false, throttle.need_writing_throttling_notified()); throttle.notify_need_writing_throttling(true); ASSERT_EQ(true, throttle.need_writing_throttling_notified()); @@ -150,13 +157,12 @@ TEST(TestPalfThrottling, test_log_write_throttle) throttle.notify_need_writing_throttling(false); ASSERT_EQ(false, throttle.need_writing_throttling_notified()); - ASSERT_EQ(OB_INVALID_ARGUMENT, throttle.after_append_log(-1, 0)); - ASSERT_EQ(OB_INVALID_ARGUMENT, throttle.after_append_log(0, -1)); + ASSERT_EQ(OB_INVALID_ARGUMENT, throttle.after_append_log(-1)); //test throttling only after notified PALF_LOG(INFO, "case 1: test no need throttling while notify_need_writing_throttling is false"); throttle.update_throttling_options(&palf_env_impl); - throttle.throttling(1024, &palf_env_impl); + throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl); PalfThrottleOptions invalid_throttle_options; ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_); ASSERT_EQ(true, OB_INVALID_TIMESTAMP != throttle.last_update_ts_); @@ -164,7 +170,7 @@ TEST(TestPalfThrottling, test_log_write_throttle) // test update interval 500ms PALF_LOG(INFO, "case 2: test update interval"); - throttle.throttling(1024, &palf_env_impl); + throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl); ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_); ASSERT_EQ(true, OB_INVALID_TIMESTAMP != throttle.last_update_ts_); ASSERT_EQ(false, throttle.need_writing_throttling_notified_); @@ -174,8 +180,8 @@ TEST(TestPalfThrottling, test_log_write_throttle) usleep(LogWritingThrottle::UPDATE_INTERVAL_US); throttle.update_throttling_options(&palf_env_impl); throttle.notify_need_writing_throttling(true); - throttle.throttling(1024, &palf_env_impl); - ASSERT_EQ(false, throttle.need_throttling_()); + throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl); + ASSERT_EQ(false, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func)); ASSERT_EQ(false, throttle.stat_.has_ever_throttled()); @@ -185,9 +191,9 @@ TEST(TestPalfThrottling, test_log_write_throttle) palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 100; usleep(LogWritingThrottle::UPDATE_INTERVAL_US); throttle.update_throttling_options(&palf_env_impl); - throttle.throttling(1024, &palf_env_impl); + throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl); palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options); - ASSERT_EQ(false, throttle.need_throttling_()); + ASSERT_EQ(false, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func)); ASSERT_EQ(false, throttle.stat_.has_ever_throttled()); //test need throttling after update @@ -197,109 +203,96 @@ TEST(TestPalfThrottling, test_log_write_throttle) palf_env_impl.disk_options_wrapper_.set_cur_unrecyclable_log_disk_size(unrecyclable_size); usleep(LogWritingThrottle::UPDATE_INTERVAL_US); throttle.update_throttling_options(&palf_env_impl); - throttle.throttling(1024, &palf_env_impl); + throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl); palf_env_impl.disk_options_wrapper_.get_throttling_options(throttle_options); ASSERT_EQ(throttle_options, throttle.throttling_options_); - ASSERT_EQ(true, throttle.need_throttling_()); + ASSERT_EQ(true, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func)); ASSERT_EQ(true, throttle.stat_.has_ever_throttled()); ASSERT_EQ(1024, throttle.stat_.total_throttling_size_); ASSERT_EQ(1, throttle.stat_.total_throttling_task_cnt_); ASSERT_EQ(0, throttle.stat_.total_skipped_task_cnt_); ASSERT_EQ(0, throttle.stat_.total_skipped_size_); - throttle.after_append_log(1024, 0); + throttle.after_append_log(1024); ASSERT_EQ(1024, throttle.appended_log_size_cur_round_); - ASSERT_EQ(0, throttle.submitted_seq_); - ASSERT_EQ(0, throttle.handled_seq_); //test no need throttling with flush meta task in queue //.1 flush log task + g_need_purging_throttling = true; PALF_LOG(INFO, "case 5: test no need throttling while flush meta task ", K(throttle)); - throttle.inc_and_fetch_submitted_seq(); - throttle.inc_and_fetch_submitted_seq(); - throttle.inc_and_fetch_submitted_seq(); - ASSERT_EQ(3, throttle.submitted_seq_); - throttle.throttling(1024, &palf_env_impl); + throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl); ASSERT_EQ(throttle_options, throttle.throttling_options_); - ASSERT_EQ(false, throttle.need_throttling_()); + // meta task need purging throttling + ASSERT_EQ(false, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func)); ASSERT_EQ(1024, throttle.stat_.total_throttling_size_); ASSERT_EQ(1, throttle.stat_.total_throttling_task_cnt_); ASSERT_EQ(1, throttle.stat_.total_skipped_task_cnt_); ASSERT_EQ(1024, throttle.stat_.total_skipped_size_); - throttle.after_append_log(1024, 0); + throttle.after_append_log(1024); ASSERT_EQ(2048, throttle.appended_log_size_cur_round_); - ASSERT_EQ(3, throttle.submitted_seq_); - ASSERT_EQ(0, throttle.handled_seq_); //.2 flush meta task PALF_LOG(INFO, "case 6: test no need throttling and flush meta task ", K(throttle)); - throttle.throttling(1024, &palf_env_impl); + throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl); ASSERT_EQ(throttle_options, throttle.throttling_options_); - ASSERT_EQ(true, throttle.need_throttling_with_options_()); - ASSERT_EQ(false, throttle.need_throttling_()); + ASSERT_EQ(true, throttle.need_throttling_with_options_not_guarded_by_lock_()); + ASSERT_EQ(false, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func)); ASSERT_EQ(1024, throttle.stat_.total_throttling_size_); ASSERT_EQ(1, throttle.stat_.total_throttling_task_cnt_); ASSERT_EQ(2, throttle.stat_.total_skipped_task_cnt_); ASSERT_EQ(2048, throttle.stat_.total_skipped_size_); - throttle.after_append_log(1024, 1); + throttle.after_append_log(1024); ASSERT_EQ(3072, throttle.appended_log_size_cur_round_); - ASSERT_EQ(3, throttle.submitted_seq_); - ASSERT_EQ(1, throttle.handled_seq_); - throttle.throttling(1024, &palf_env_impl); + throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl); ASSERT_EQ(throttle_options, throttle.throttling_options_); - ASSERT_EQ(true, throttle.need_throttling_with_options_()); - ASSERT_EQ(false, throttle.need_throttling_()); + ASSERT_EQ(true, throttle.need_throttling_with_options_not_guarded_by_lock_()); + ASSERT_EQ(false, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func)); ASSERT_EQ(1024, throttle.stat_.total_throttling_size_); ASSERT_EQ(1, throttle.stat_.total_throttling_task_cnt_); ASSERT_EQ(3, throttle.stat_.total_skipped_task_cnt_); ASSERT_EQ(3072, throttle.stat_.total_skipped_size_); - throttle.after_append_log(1024, 3); + throttle.after_append_log(1024); ASSERT_EQ(4096, throttle.appended_log_size_cur_round_); - ASSERT_EQ(3, throttle.submitted_seq_); - ASSERT_EQ(3, throttle.handled_seq_); PALF_LOG(INFO, "case 7: need throttling after all flush meta task handled", K(throttle)); - throttle.throttling(1024, &palf_env_impl); + g_need_purging_throttling = false; + throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl); ASSERT_EQ(throttle_options, throttle.throttling_options_); ASSERT_EQ(true, throttle.decay_factor_ > 0.0); - ASSERT_EQ(true, throttle.need_throttling_()); + ASSERT_EQ(true, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func)); ASSERT_EQ(2048, throttle.stat_.total_throttling_size_); ASSERT_EQ(2, throttle.stat_.total_throttling_task_cnt_); ASSERT_EQ(3, throttle.stat_.total_skipped_task_cnt_); ASSERT_EQ(3072, throttle.stat_.total_skipped_size_); - throttle.after_append_log(1024, 3); + throttle.after_append_log(1024); ASSERT_EQ(5120, throttle.appended_log_size_cur_round_); - ASSERT_EQ(3, throttle.submitted_seq_); - ASSERT_EQ(3, throttle.handled_seq_); //test notify_need_writing_throttling(false) changed PALF_LOG(INFO, "case 8: no need to throttle after notify_need_throttling(false)", K(throttle)); usleep(LogWritingThrottle::UPDATE_INTERVAL_US); throttle.notify_need_writing_throttling(false); throttle.update_throttling_options(&palf_env_impl); - throttle.throttling(1024, &palf_env_impl); + throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl); ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_); - ASSERT_EQ(false, throttle.need_throttling_with_options_()); - ASSERT_EQ(false, throttle.need_throttling_()); + ASSERT_EQ(false, throttle.need_throttling_with_options_not_guarded_by_lock_()); + ASSERT_EQ(false, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func)); ASSERT_EQ(2048, throttle.stat_.total_throttling_size_); ASSERT_EQ(2, throttle.stat_.total_throttling_task_cnt_); ASSERT_EQ(3, throttle.stat_.total_skipped_task_cnt_); ASSERT_EQ(3072, throttle.stat_.total_skipped_size_); ASSERT_EQ(true, OB_INVALID_TIMESTAMP != throttle.stat_.stop_ts_); - throttle.after_append_log(1024, 0); + throttle.after_append_log(1024); ASSERT_EQ(1024, throttle.appended_log_size_cur_round_); ASSERT_EQ(0, throttle.decay_factor_); - ASSERT_EQ(3, throttle.submitted_seq_); - ASSERT_EQ(3, throttle.handled_seq_); //test need write throttling again PALF_LOG(INFO, "case 9: need to throttle after notify_need_throttling(true)", K(throttle)); usleep(LogWritingThrottle::UPDATE_INTERVAL_US); throttle.notify_need_writing_throttling(true); throttle.update_throttling_options(&palf_env_impl); - throttle.throttling(1024, &palf_env_impl); - ASSERT_EQ(true, throttle.need_throttling_with_options_()); - ASSERT_EQ(true, throttle.need_throttling_()); + throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl); + ASSERT_EQ(true, throttle.need_throttling_with_options_not_guarded_by_lock_()); + ASSERT_EQ(true, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func)); ASSERT_EQ(throttle_options, throttle.throttling_options_); ASSERT_EQ(true, throttle.decay_factor_ > 0.0); ASSERT_EQ(1024, throttle.stat_.total_throttling_size_); @@ -308,10 +301,8 @@ TEST(TestPalfThrottling, test_log_write_throttle) ASSERT_EQ(0, throttle.stat_.total_skipped_size_); ASSERT_EQ(true, throttle.stat_.total_throttling_interval_ > 0); ASSERT_EQ(true, OB_INVALID_TIMESTAMP == throttle.stat_.stop_ts_); - throttle.after_append_log(1024, 0); + throttle.after_append_log(1024); ASSERT_EQ(1024, throttle.appended_log_size_cur_round_); - ASSERT_EQ(3, throttle.submitted_seq_); - ASSERT_EQ(3, throttle.handled_seq_); double old_decay_factor = throttle.decay_factor_; // @@ -320,9 +311,9 @@ TEST(TestPalfThrottling, test_log_write_throttle) palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 55; palf_env_impl.get_throttling_options(throttle_options); throttle.update_throttling_options(&palf_env_impl); - throttle.throttling(1024, &palf_env_impl); - ASSERT_EQ(true, throttle.need_throttling_with_options_()); - ASSERT_EQ(true, throttle.need_throttling_()); + throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl); + ASSERT_EQ(true, throttle.need_throttling_with_options_not_guarded_by_lock_()); + ASSERT_EQ(true, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func)); ASSERT_EQ(throttle_options, throttle.throttling_options_); ASSERT_EQ(true, throttle.decay_factor_ > 0.0); ASSERT_EQ(true, throttle.decay_factor_ != old_decay_factor); @@ -331,10 +322,8 @@ TEST(TestPalfThrottling, test_log_write_throttle) ASSERT_EQ(0, throttle.stat_.total_skipped_task_cnt_); ASSERT_EQ(0, throttle.stat_.total_skipped_size_); ASSERT_EQ(true, OB_INVALID_TIMESTAMP == throttle.stat_.stop_ts_); - throttle.after_append_log(1024, 0); + throttle.after_append_log(1024); ASSERT_EQ(2048, throttle.appended_log_size_cur_round_); - ASSERT_EQ(3, throttle.submitted_seq_); - ASSERT_EQ(3, throttle.handled_seq_); //test reset appended_log_size_cur_round_ PALF_LOG(INFO, "case 11: test reset appended_log_size_cur_round_ after unrecyclable_size changes", K(throttle)); @@ -343,9 +332,9 @@ TEST(TestPalfThrottling, test_log_write_throttle) unrecyclable_size = total_disk_size * 65/100; palf_env_impl.disk_options_wrapper_.set_cur_unrecyclable_log_disk_size(unrecyclable_size); throttle.update_throttling_options(&palf_env_impl); - throttle.throttling(1024, &palf_env_impl); - ASSERT_EQ(true, throttle.need_throttling_with_options_()); - ASSERT_EQ(true, throttle.need_throttling_()); + throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl); + ASSERT_EQ(true, throttle.need_throttling_with_options_not_guarded_by_lock_()); + ASSERT_EQ(true, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func)); palf_env_impl.get_throttling_options(throttle_options); ASSERT_EQ(throttle_options, throttle.throttling_options_); ASSERT_EQ(true, throttle.decay_factor_ > 0.0); @@ -356,12 +345,9 @@ TEST(TestPalfThrottling, test_log_write_throttle) ASSERT_EQ(0, throttle.stat_.total_skipped_size_); ASSERT_EQ(true, OB_INVALID_TIMESTAMP != throttle.stat_.start_ts_); ASSERT_EQ(true, OB_INVALID_TIMESTAMP == throttle.stat_.stop_ts_); - throttle.after_append_log(1024, 0); + throttle.after_append_log(1024); ASSERT_EQ(1024, throttle.appended_log_size_cur_round_); - ASSERT_EQ(3, throttle.submitted_seq_); - ASSERT_EQ(3, throttle.handled_seq_); - ASSERT_EQ(OB_ERR_UNEXPECTED, throttle.after_append_log(0, 4)); - ASSERT_EQ(OB_ERR_UNEXPECTED, throttle.after_append_log(0, 2)); + ASSERT_EQ(OB_SUCCESS, throttle.after_append_log(0)); //test stop write throttling when trigger percentage changed PALF_LOG(INFO, "case 12: test stop write throttling when trigger percentage changed", K(throttle)); @@ -369,9 +355,9 @@ TEST(TestPalfThrottling, test_log_write_throttle) palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 80; palf_env_impl.get_throttling_options(throttle_options); throttle.update_throttling_options(&palf_env_impl); - throttle.throttling(1024, &palf_env_impl); - ASSERT_EQ(false, throttle.need_throttling_with_options_()); - ASSERT_EQ(false, throttle.need_throttling_()); + throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl); + ASSERT_EQ(false, throttle.need_throttling_with_options_not_guarded_by_lock_()); + ASSERT_EQ(false, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func)); ASSERT_EQ(invalid_throttle_options, throttle.throttling_options_); ASSERT_EQ(0, throttle.decay_factor_); ASSERT_EQ(3072, throttle.stat_.total_throttling_size_); @@ -380,17 +366,15 @@ TEST(TestPalfThrottling, test_log_write_throttle) ASSERT_EQ(0, throttle.stat_.total_skipped_size_); ASSERT_EQ(true, OB_INVALID_TIMESTAMP != throttle.stat_.stop_ts_); ASSERT_EQ(0, throttle.appended_log_size_cur_round_); - ASSERT_EQ(3, throttle.submitted_seq_); - ASSERT_EQ(3, throttle.handled_seq_); PALF_LOG(INFO, "case 12: test stop writing throttling when unrecyclable size fallbacks", K(throttle)); usleep(LogWritingThrottle::UPDATE_INTERVAL_US); palf_env_impl.disk_options_wrapper_.disk_opts_for_stopping_writing_.log_disk_throttling_percentage_ = 60; throttle.notify_need_writing_throttling(true); throttle.update_throttling_options(&palf_env_impl); - throttle.throttling(1024, &palf_env_impl); - ASSERT_EQ(true, throttle.need_throttling_with_options_()); - ASSERT_EQ(true, throttle.need_throttling_()); + throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl); + ASSERT_EQ(true, throttle.need_throttling_with_options_not_guarded_by_lock_()); + ASSERT_EQ(true, throttle.need_throttling_not_guarded_by_lock_(g_need_purging_throttling_func)); palf_env_impl.get_throttling_options(throttle_options); ASSERT_EQ(throttle_options, throttle.throttling_options_); ASSERT_EQ(true, throttle.decay_factor_ > 0.0); @@ -400,17 +384,15 @@ TEST(TestPalfThrottling, test_log_write_throttle) ASSERT_EQ(0, throttle.stat_.total_skipped_size_); ASSERT_EQ(true, throttle.stat_.total_throttling_interval_ > 0); ASSERT_EQ(true, OB_INVALID_TIMESTAMP == throttle.stat_.stop_ts_); - throttle.after_append_log(1024, 0); + throttle.after_append_log(1024); ASSERT_EQ(1024, throttle.appended_log_size_cur_round_); - ASSERT_EQ(3, throttle.submitted_seq_); - ASSERT_EQ(3, throttle.handled_seq_); usleep(LogWritingThrottle::UPDATE_INTERVAL_US); unrecyclable_size = total_disk_size * 45/100; palf_env_impl.disk_options_wrapper_.set_cur_unrecyclable_log_disk_size(unrecyclable_size); throttle.update_throttling_options(&palf_env_impl); - throttle.throttling(1024, &palf_env_impl); - ASSERT_EQ(false, throttle.need_throttling_with_options_()); + throttle.throttling(1024, g_need_purging_throttling_func, &palf_env_impl); + ASSERT_EQ(false, throttle.need_throttling_with_options_not_guarded_by_lock_()); }