/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #include #include #define USING_LOG_PREFIX STORAGE #define protected public #define private public #include "lib/lock/ob_spin_rwlock.h" // SpinRWLock #include "lib/time/ob_time_utility.h" #include "lib/random/ob_random.h" #include "lib/utility/utility.h" #include "lib/task/ob_timer.h" using namespace oceanbase::common; using namespace std; namespace oceanbase { namespace storage { class TestSpeedLimit : public ::testing::Test { public: TestSpeedLimit() : timer_(), task_(*this) {} virtual ~TestSpeedLimit() = default; virtual void SetUp() override { int ret = OB_SUCCESS; if (OB_FAIL(timer_.init("Flush"))) { LOG_INFO("fail to init timer", K(ret)); } else if (OB_FAIL(timer_.schedule(task_, FLUSH_INTERVAL, true))) { LOG_INFO("fail to schedule checkpoint task", K(ret)); } } virtual void TearDown() override { timer_.wait(); timer_.stop(); } static void SetUpTestCase() {} static void TearDownTestCase() {} static const int64_t MEM_SLICE_SIZE = 2 * 1024 * 1024; //Bytes per usecond static const int64_t ADVANCE_CLOCK_INTERVAL = 50;//50us static const int64_t MIN_INTERVAL = 20000; static const int64_t SLEEP_INTERVAL_PER_TIME = 20 * 1000; static const int64_t SPEED_LIMIT_MAX_SLEEP_TIME = 20 * 1000 * 1000; static const int64_t FLUSH_INTERVAL = 5 * 1000 * 1000; static const int64_t INSERT_THREADS = 1000; static const int64_t WRITE_DURATION = 1000L; //adjust when test void write(int64_t size); void release(int64_t size); void set_param(int64_t trigger_percentage, int64_t lastest_memstore_threshold, int64_t writing_throttling_maximum_duration) { trigger_percentage_ = trigger_percentage; lastest_memstore_threshold_ = lastest_memstore_threshold; writing_throttling_maximum_duration_ = writing_throttling_maximum_duration; } bool need_do_writing_throttle() const { int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage_ / 100; int64_t cur_mem_hold = ATOMIC_LOAD(&hold_); bool need_do_writing_throttle = cur_mem_hold > trigger_mem_limit; return need_do_writing_throttle; } int64_t calc_mem_limit(int64_t cur_mem_hold, int64_t trigger_mem_limit, int64_t dt) const { double cur_chunk_seq = static_cast(((cur_mem_hold - trigger_mem_limit) + MEM_SLICE_SIZE - 1)/ (MEM_SLICE_SIZE)); int64_t mem_can_be_assigned = 0; int64_t allocate_size_in_the_page = MEM_SLICE_SIZE - (cur_mem_hold - trigger_mem_limit) % MEM_SLICE_SIZE; int64_t accumulate_interval = 0; int64_t the_page_interval = 0; while (accumulate_interval < dt) { the_page_interval = static_cast(decay_factor_ * cur_chunk_seq * cur_chunk_seq * cur_chunk_seq) * allocate_size_in_the_page / MEM_SLICE_SIZE; accumulate_interval += the_page_interval; mem_can_be_assigned += (accumulate_interval > dt ? allocate_size_in_the_page - (accumulate_interval - dt) * allocate_size_in_the_page / the_page_interval : allocate_size_in_the_page); allocate_size_in_the_page = MEM_SLICE_SIZE; cur_chunk_seq += double(1); } return mem_can_be_assigned; } int64_t expected_wait_time(int64_t seq) const { int64_t expected_wait_time = 0; if (clock_ < seq) { int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage_ / 100; int64_t can_assign_in_next_period = calc_mem_limit(hold_, trigger_mem_limit, ADVANCE_CLOCK_INTERVAL); expected_wait_time = (seq - clock_) * ADVANCE_CLOCK_INTERVAL / can_assign_in_next_period; } return expected_wait_time; } void speed_limit(int64_t cur_mem_hold, int64_t alloc_size) { int ret = OB_SUCCESS; int64_t trigger_mem_limit = 0; bool need_speed_limit = false; int64_t seq = 0; int64_t throttling_interval = 0; if (trigger_percentage_ < 100) { if (OB_UNLIKELY(cur_mem_hold < 0 || alloc_size <= 0 || lastest_memstore_threshold_ <= 0 || trigger_percentage_ <= 0)) { COMMON_LOG(ERROR, "invalid arguments", K(cur_mem_hold), K(alloc_size), K(lastest_memstore_threshold_), K(trigger_percentage_)); } else if (cur_mem_hold > (trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage_ / 100)) { need_speed_limit = true; if (OB_FAIL(check_and_calc_decay_factor())) { COMMON_LOG(WARN, "failed to check_and_calc_decay_factor", K(cur_mem_hold), K(alloc_size)); } } COMMON_LOG(INFO, "CCTT 2"); advance_clock(need_speed_limit); seq = ATOMIC_AAF(&max_seq_, alloc_size); get_seq() = seq; tl_need_speed_limit() = need_speed_limit; if (need_speed_limit && REACH_TIME_INTERVAL(1 * 1000 * 1000L)) { COMMON_LOG(INFO, "report write throttle info", K(alloc_size), K(throttling_interval), "max_seq_", ATOMIC_LOAD(&max_seq_), K(clock_), K(cur_mem_hold), K(seq)); } } } int check_and_calc_decay_factor() { int ret = OB_SUCCESS; int64_t available_mem = (100 - trigger_percentage_) * lastest_memstore_threshold_ / 100; double N = static_cast(available_mem) / static_cast(MEM_SLICE_SIZE); decay_factor_ = (static_cast(writing_throttling_maximum_duration_) - N * static_cast(MIN_INTERVAL))/ static_cast((((N*(N+1)*N*(N+1)))/4)); decay_factor_ = decay_factor_ < 0 ? 0 : decay_factor_; COMMON_LOG(INFO, "recalculate decay factor", K(trigger_percentage_), K(decay_factor_), K(writing_throttling_maximum_duration_), K(available_mem), K(N)); return ret; } bool check_clock_over_seq(int64_t req) { advance_clock(true); RLockGuard guard(rwlock_); if (clock_ < req) { COMMON_LOG(INFO, "CCTT 1", K(clock_), K(req)); } return req <= clock_; } void advance_clock(bool need_speed_limit) { int64_t cur_ts = ObTimeUtility::current_time(); int64_t old_ts = last_update_ts_; if ((cur_ts - last_update_ts_ > ADVANCE_CLOCK_INTERVAL) && old_ts == ATOMIC_CAS(&last_update_ts_, old_ts, cur_ts)) { WLockGuard guard(rwlock_); int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage_ / 100; int64_t mem_limit = (need_speed_limit ? calc_mem_limit(hold_, trigger_mem_limit, cur_ts - old_ts) : trigger_mem_limit - hold_); clock_ = std::min(max_seq_, clock_ + mem_limit); COMMON_LOG(INFO, "current clock is ", K(clock_), K(max_seq_), K(mem_limit), K(hold_), K(cur_ts - old_ts)); } } double decay_factor_; int64_t trigger_percentage_; int64_t lastest_memstore_threshold_; int64_t hold_; int64_t writing_throttling_maximum_duration_; typedef common::SpinRWLock RWLock; typedef common::SpinRLockGuard RLockGuard; typedef common::SpinWLockGuard WLockGuard; RWLock rwlock_; int64_t max_seq_; int64_t clock_; int64_t last_update_ts_; class ObTraversalFlushTask : public common::ObTimerTask { public: ObTraversalFlushTask(TestSpeedLimit &test_speed_limit) : test_speed_limit_(test_speed_limit) {} virtual ~ObTraversalFlushTask() {} virtual void runTimerTask() { test_speed_limit_.release(3000 * 1000 * 1000L); } private: TestSpeedLimit &test_speed_limit_; }; common::ObTimer timer_; ObTraversalFlushTask task_; }; void TestSpeedLimit::write(int64_t size) { speed_limit(hold_, size); (void)ATOMIC_AAF(&hold_, size); // mock write latency int64_t sleep_time = ObRandom::rand(30, 1000); ob_usleep(sleep_time); bool &need_speed_limit = tl_need_speed_limit(); int64_t &seq = get_seq(); int64_t left_interval = SPEED_LIMIT_MAX_SLEEP_TIME; bool has_sleep = false; int64_t has_sleep_time = 0; int time = 0; bool need_sleep = need_speed_limit; while (need_sleep && !check_clock_over_seq(seq) && (left_interval > 0)) { int64_t wait_time = expected_wait_time(seq); //LOG_INFO("CCTT 3", K(need_sleep), K(left_interval), K(wait_time), K(seq), K(clock_), K(max_seq_)); if (wait_time == 0) { break; } uint32_t sleep_interval = static_cast(min(min(left_interval, SLEEP_INTERVAL_PER_TIME), wait_time)); ob_usleep(sleep_interval); has_sleep_time += sleep_interval; time++; left_interval -= sleep_interval; has_sleep = true; need_sleep = need_do_writing_throttle(); } if (need_speed_limit && TC_REACH_TIME_INTERVAL(100L * 1000L)) { LOG_INFO("throttle situation", K(has_sleep_time), K(time), K(seq)); } } void TestSpeedLimit::release(int64_t size) { if (hold_ > (lastest_memstore_threshold_ * (trigger_percentage_ + 30) / 100)) { (void)ATOMIC_AAF(&hold_, -size); } } TEST_F(TestSpeedLimit, test_speed_limit) { set_param(60, 9663676400, 3600000000); std::vector insert_threads; int insert_thread_num = 5; for (int i = 0; i < insert_thread_num; ++i) { insert_threads.push_back(std::thread([this]() { const int64_t start_time = ObTimeUtility::current_time(); while (ObTimeUtility::current_time() - start_time <= WRITE_DURATION) { write(1600); }})); } for (int i = 0; i < insert_thread_num; ++i) { insert_threads[i].join(); } } } } int main(int argc, char **argv) { system("rm -f test_speed_limit.log*"); OB_LOGGER.set_file_name("test_speed_limit.log", true); OB_LOGGER.set_log_level("INFO"); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); }