301 lines
		
	
	
		
			9.9 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			301 lines
		
	
	
		
			9.9 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /**
 | |
|  * Copyright (c) 2021 OceanBase
 | |
|  * OceanBase CE is licensed under Mulan PubL v2.
 | |
|  * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | |
|  * You may obtain a copy of Mulan PubL v2 at:
 | |
|  *          http://license.coscl.org.cn/MulanPubL-2.0
 | |
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | |
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | |
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | |
|  * See the Mulan PubL v2 for more details.
 | |
|  */
 | |
| 
 | |
| #include <gtest/gtest.h>
 | |
| #include <thread>
 | |
| 
 | |
| #define USING_LOG_PREFIX STORAGE
 | |
| 
 | |
| #define protected public
 | |
| #define private public
 | |
| #include "lib/lock/ob_spin_rwlock.h"           // SpinRWLock
 | |
| #include "lib/time/ob_time_utility.h"
 | |
| #include "lib/random/ob_random.h"
 | |
| #include "lib/utility/utility.h"
 | |
| #include "lib/task/ob_timer.h"
 | |
| 
 | |
| 
 | |
| using namespace oceanbase::common;
 | |
| using namespace std;
 | |
| 
 | |
| namespace oceanbase
 | |
| {
 | |
| 
 | |
| namespace storage
 | |
| {
 | |
| 
 | |
| class TestSpeedLimit : public ::testing::Test
 | |
| {
 | |
| public:
 | |
|   TestSpeedLimit()
 | |
|     : timer_(),
 | |
|       task_(*this)
 | |
|       {}
 | |
|   virtual ~TestSpeedLimit() = default;
 | |
| 
 | |
|   virtual void SetUp() override
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     if (OB_FAIL(timer_.init("Flush"))) {
 | |
|       LOG_INFO("fail to init timer", K(ret));
 | |
|     } else if (OB_FAIL(timer_.schedule(task_, FLUSH_INTERVAL, true))) {
 | |
|       LOG_INFO("fail to schedule checkpoint task", K(ret));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   virtual void TearDown() override
 | |
|   {
 | |
|     timer_.wait();
 | |
|     timer_.stop();
 | |
|   }
 | |
| 
 | |
|   static void SetUpTestCase() {}
 | |
|   static void TearDownTestCase() {}
 | |
| 
 | |
|   static const int64_t MEM_SLICE_SIZE = 2 * 1024 * 1024; //Bytes per usecond
 | |
|   static const int64_t ADVANCE_CLOCK_INTERVAL = 50;//50us
 | |
|   static const int64_t MIN_INTERVAL = 20000;
 | |
|   static const int64_t SLEEP_INTERVAL_PER_TIME = 20 * 1000;
 | |
|   static const int64_t SPEED_LIMIT_MAX_SLEEP_TIME = 20 * 1000 * 1000;
 | |
|   static const int64_t FLUSH_INTERVAL = 5 * 1000 * 1000;
 | |
|   static const int64_t INSERT_THREADS = 1000;
 | |
|   static const int64_t WRITE_DURATION = 1000L; //adjust when test
 | |
| 
 | |
|   void write(int64_t size);
 | |
| 
 | |
|   void release(int64_t size);
 | |
| 
 | |
|   void set_param(int64_t trigger_percentage, int64_t lastest_memstore_threshold, int64_t writing_throttling_maximum_duration)
 | |
|   {
 | |
|     trigger_percentage_ = trigger_percentage;
 | |
|     lastest_memstore_threshold_ = lastest_memstore_threshold;
 | |
|     writing_throttling_maximum_duration_ = writing_throttling_maximum_duration;
 | |
|   }
 | |
| 
 | |
|   bool need_do_writing_throttle() const
 | |
|   {
 | |
|     int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage_ / 100;
 | |
|     int64_t cur_mem_hold = ATOMIC_LOAD(&hold_);
 | |
|     bool need_do_writing_throttle = cur_mem_hold > trigger_mem_limit;
 | |
|     return need_do_writing_throttle;
 | |
|   }
 | |
| 
 | |
|   int64_t calc_mem_limit(int64_t cur_mem_hold, int64_t trigger_mem_limit, int64_t dt) const
 | |
|   {
 | |
|     double cur_chunk_seq = static_cast<double>(((cur_mem_hold - trigger_mem_limit) + MEM_SLICE_SIZE - 1)/ (MEM_SLICE_SIZE));
 | |
|     int64_t mem_can_be_assigned = 0;
 | |
| 
 | |
|     int64_t allocate_size_in_the_page = MEM_SLICE_SIZE - (cur_mem_hold - trigger_mem_limit) % MEM_SLICE_SIZE;
 | |
|     int64_t accumulate_interval = 0;
 | |
|     int64_t the_page_interval = 0;
 | |
|     while (accumulate_interval < dt) {
 | |
|       the_page_interval = static_cast<int64_t>(decay_factor_ * cur_chunk_seq * cur_chunk_seq * cur_chunk_seq) * allocate_size_in_the_page / MEM_SLICE_SIZE;
 | |
|       accumulate_interval += the_page_interval;
 | |
| 
 | |
|       mem_can_be_assigned += (accumulate_interval > dt ?
 | |
|                               allocate_size_in_the_page - (accumulate_interval - dt) * allocate_size_in_the_page / the_page_interval :
 | |
|                               allocate_size_in_the_page);
 | |
|       allocate_size_in_the_page = MEM_SLICE_SIZE;
 | |
|       cur_chunk_seq += double(1);
 | |
|     }
 | |
| 
 | |
|     return mem_can_be_assigned;
 | |
|   }
 | |
| 
 | |
|   int64_t expected_wait_time(int64_t seq) const
 | |
|   {
 | |
|     int64_t expected_wait_time = 0;
 | |
|     if (clock_ < seq) {
 | |
|       int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage_ / 100;
 | |
|       int64_t can_assign_in_next_period = calc_mem_limit(hold_, trigger_mem_limit, ADVANCE_CLOCK_INTERVAL);
 | |
|       expected_wait_time = (seq - clock_) * ADVANCE_CLOCK_INTERVAL / can_assign_in_next_period;
 | |
|     }
 | |
| 
 | |
|     return expected_wait_time;
 | |
|   }
 | |
| 
 | |
|   void speed_limit(int64_t cur_mem_hold, int64_t alloc_size)
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     int64_t trigger_mem_limit = 0;
 | |
|     bool need_speed_limit = false;
 | |
|     int64_t seq = 0;
 | |
|     int64_t throttling_interval = 0;
 | |
|     if (trigger_percentage_ < 100) {
 | |
|       if (OB_UNLIKELY(cur_mem_hold < 0 || alloc_size <= 0 || lastest_memstore_threshold_ <= 0 || trigger_percentage_ <= 0)) {
 | |
|         COMMON_LOG(ERROR, "invalid arguments", K(cur_mem_hold), K(alloc_size), K(lastest_memstore_threshold_), K(trigger_percentage_));
 | |
|       } else if (cur_mem_hold > (trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage_ / 100)) {
 | |
|         need_speed_limit = true;
 | |
|         if (OB_FAIL(check_and_calc_decay_factor())) {
 | |
|           COMMON_LOG(WARN, "failed to check_and_calc_decay_factor", K(cur_mem_hold), K(alloc_size));
 | |
|         }
 | |
|       }
 | |
|       COMMON_LOG(INFO, "CCTT 2");
 | |
|       advance_clock(need_speed_limit);
 | |
|       seq = ATOMIC_AAF(&max_seq_, alloc_size);
 | |
|       get_seq() = seq;
 | |
|       tl_need_speed_limit() = need_speed_limit;
 | |
| 
 | |
|       if (need_speed_limit && REACH_TIME_INTERVAL(1 * 1000 * 1000L)) {
 | |
|         COMMON_LOG(INFO, "report write throttle info", K(alloc_size), K(throttling_interval),
 | |
|                     "max_seq_", ATOMIC_LOAD(&max_seq_), K(clock_),
 | |
|                     K(cur_mem_hold), K(seq));
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   int check_and_calc_decay_factor()
 | |
|   {
 | |
|     int ret = OB_SUCCESS;
 | |
|     int64_t available_mem = (100 - trigger_percentage_) * lastest_memstore_threshold_ / 100;
 | |
|     double N =  static_cast<double>(available_mem) / static_cast<double>(MEM_SLICE_SIZE);
 | |
|     decay_factor_ = (static_cast<double>(writing_throttling_maximum_duration_) - N * static_cast<double>(MIN_INTERVAL))/ static_cast<double>((((N*(N+1)*N*(N+1)))/4));
 | |
|     decay_factor_ = decay_factor_ < 0 ? 0 : decay_factor_;
 | |
|     COMMON_LOG(INFO, "recalculate decay factor", K(trigger_percentage_),
 | |
|               K(decay_factor_), K(writing_throttling_maximum_duration_), K(available_mem), K(N));
 | |
| 
 | |
|     return ret;
 | |
|   }
 | |
| 
 | |
|   bool check_clock_over_seq(int64_t req)
 | |
|   {
 | |
|     advance_clock(true);
 | |
|     RLockGuard guard(rwlock_);
 | |
|     if (clock_ < req) {
 | |
|       COMMON_LOG(INFO, "CCTT 1", K(clock_), K(req));
 | |
|     }
 | |
| 
 | |
|     return req <= clock_;
 | |
|   }
 | |
| 
 | |
|   void advance_clock(bool need_speed_limit)
 | |
|   {
 | |
|     int64_t cur_ts = ObTimeUtility::current_time();
 | |
|     int64_t old_ts = last_update_ts_;
 | |
|     if ((cur_ts - last_update_ts_ > ADVANCE_CLOCK_INTERVAL) &&
 | |
|         old_ts == ATOMIC_CAS(&last_update_ts_, old_ts, cur_ts)) {
 | |
|       WLockGuard guard(rwlock_);
 | |
|       int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage_ / 100;
 | |
|       int64_t mem_limit = (need_speed_limit ? calc_mem_limit(hold_, trigger_mem_limit, cur_ts - old_ts) : trigger_mem_limit - hold_);
 | |
|       clock_ = std::min(max_seq_, clock_ + mem_limit);
 | |
|       COMMON_LOG(INFO, "current clock is ", K(clock_), K(max_seq_), K(mem_limit), K(hold_), K(cur_ts - old_ts));
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   double decay_factor_;
 | |
|   int64_t trigger_percentage_;
 | |
|   int64_t lastest_memstore_threshold_;
 | |
|   int64_t hold_;
 | |
|   int64_t writing_throttling_maximum_duration_;
 | |
| 
 | |
|   typedef common::SpinRWLock RWLock;
 | |
|   typedef common::SpinRLockGuard  RLockGuard;
 | |
|   typedef common::SpinWLockGuard  WLockGuard;
 | |
|   RWLock rwlock_;
 | |
|   int64_t max_seq_;
 | |
|   int64_t clock_;
 | |
|   int64_t last_update_ts_;
 | |
| 
 | |
|   class ObTraversalFlushTask : public common::ObTimerTask
 | |
|   {
 | |
|   public:
 | |
|     ObTraversalFlushTask(TestSpeedLimit &test_speed_limit)
 | |
|       : test_speed_limit_(test_speed_limit) {}
 | |
|     virtual ~ObTraversalFlushTask() {}
 | |
|     virtual void runTimerTask()
 | |
|     {
 | |
|       test_speed_limit_.release(3000 * 1000 * 1000L);
 | |
|     }
 | |
|   private:
 | |
|     TestSpeedLimit &test_speed_limit_;
 | |
|   };
 | |
|   common::ObTimer timer_;
 | |
|   ObTraversalFlushTask task_;
 | |
| };
 | |
| 
 | |
| void TestSpeedLimit::write(int64_t size)
 | |
| {
 | |
|   speed_limit(hold_, size);
 | |
| 
 | |
|   (void)ATOMIC_AAF(&hold_, size);
 | |
|   // mock write latency
 | |
|   int64_t sleep_time = ObRandom::rand(30, 1000);
 | |
|   ob_usleep(sleep_time);
 | |
| 
 | |
|   bool &need_speed_limit = tl_need_speed_limit();
 | |
|   int64_t &seq = get_seq();
 | |
|   int64_t left_interval = SPEED_LIMIT_MAX_SLEEP_TIME;
 | |
|   bool has_sleep = false;
 | |
|   int64_t has_sleep_time = 0;
 | |
|   int time = 0;
 | |
|   bool need_sleep = need_speed_limit;
 | |
| 
 | |
|   while (need_sleep &&
 | |
|          !check_clock_over_seq(seq) &&
 | |
|         (left_interval > 0)) {
 | |
|     int64_t wait_time = expected_wait_time(seq);
 | |
|     //LOG_INFO("CCTT 3", K(need_sleep), K(left_interval), K(wait_time), K(seq), K(clock_), K(max_seq_));
 | |
|     if (wait_time == 0) {
 | |
|       break;
 | |
|     }
 | |
|     uint32_t sleep_interval =
 | |
|       static_cast<uint32_t>(min(min(left_interval, SLEEP_INTERVAL_PER_TIME), wait_time));
 | |
|     ob_usleep<common::ObWaitEventIds::STORAGE_WRITING_THROTTLE_SLEEP>(sleep_interval);
 | |
|     has_sleep_time += sleep_interval;
 | |
|     time++;
 | |
|     left_interval -= sleep_interval;
 | |
|     has_sleep = true;
 | |
|     need_sleep = need_do_writing_throttle();
 | |
|   }
 | |
| 
 | |
|   if (need_speed_limit && TC_REACH_TIME_INTERVAL(100L * 1000L)) {
 | |
|     LOG_INFO("throttle situation", K(has_sleep_time), K(time), K(seq));
 | |
|   }
 | |
| }
 | |
| 
 | |
| void TestSpeedLimit::release(int64_t size)
 | |
| {
 | |
|   if (hold_ > (lastest_memstore_threshold_ * (trigger_percentage_ + 30) / 100)) {
 | |
|     (void)ATOMIC_AAF(&hold_, -size);
 | |
|   }
 | |
| }
 | |
| 
 | |
| TEST_F(TestSpeedLimit, test_speed_limit)
 | |
| {
 | |
|   set_param(60, 9663676400, 3600000000);
 | |
|   std::vector<std::thread> insert_threads;
 | |
| 
 | |
|   int insert_thread_num = 5;
 | |
|   for (int i = 0; i < insert_thread_num; ++i) {
 | |
|     insert_threads.push_back(std::thread([this]() {
 | |
|       const int64_t start_time = ObTimeUtility::current_time();
 | |
|       while (ObTimeUtility::current_time() - start_time <= WRITE_DURATION) {
 | |
|         write(1600);
 | |
|       }}));
 | |
|   }
 | |
| 
 | |
|   for (int i = 0; i < insert_thread_num; ++i) {
 | |
|     insert_threads[i].join();
 | |
|   }
 | |
| }
 | |
| 
 | |
| }
 | |
| }
 | |
| 
 | |
| int main(int argc, char **argv)
 | |
| {
 | |
|   system("rm -f test_speed_limit.log*");
 | |
|   OB_LOGGER.set_file_name("test_speed_limit.log", true);
 | |
|   OB_LOGGER.set_log_level("INFO");
 | |
|   testing::InitGoogleTest(&argc, argv);
 | |
|   return RUN_ALL_TESTS();
 | |
| } | 
