Files
oceanbase/mittest/mtlenv/storage/test_speed_limit.cpp

301 lines
9.9 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#include <thread>
#define USING_LOG_PREFIX STORAGE
#define protected public
#define private public
#include "lib/lock/ob_spin_rwlock.h" // SpinRWLock
#include "lib/time/ob_time_utility.h"
#include "lib/random/ob_random.h"
#include "lib/utility/utility.h"
#include "lib/task/ob_timer.h"
using namespace oceanbase::common;
using namespace std;
namespace oceanbase
{
namespace storage
{
class TestSpeedLimit : public ::testing::Test
{
public:
TestSpeedLimit()
: timer_(),
task_(*this)
{}
virtual ~TestSpeedLimit() = default;
virtual void SetUp() override
{
int ret = OB_SUCCESS;
if (OB_FAIL(timer_.init("Flush"))) {
LOG_INFO("fail to init timer", K(ret));
} else if (OB_FAIL(timer_.schedule(task_, FLUSH_INTERVAL, true))) {
LOG_INFO("fail to schedule checkpoint task", K(ret));
}
}
virtual void TearDown() override
{
timer_.wait();
timer_.stop();
}
static void SetUpTestCase() {}
static void TearDownTestCase() {}
static const int64_t MEM_SLICE_SIZE = 2 * 1024 * 1024; //Bytes per usecond
static const int64_t ADVANCE_CLOCK_INTERVAL = 50;//50us
static const int64_t MIN_INTERVAL = 20000;
static const int64_t SLEEP_INTERVAL_PER_TIME = 20 * 1000;
static const int64_t SPEED_LIMIT_MAX_SLEEP_TIME = 20 * 1000 * 1000;
static const int64_t FLUSH_INTERVAL = 5 * 1000 * 1000;
static const int64_t INSERT_THREADS = 1000;
static const int64_t WRITE_DURATION = 1000L; //adjust when test
void write(int64_t size);
void release(int64_t size);
void set_param(int64_t trigger_percentage, int64_t lastest_memstore_threshold, int64_t writing_throttling_maximum_duration)
{
trigger_percentage_ = trigger_percentage;
lastest_memstore_threshold_ = lastest_memstore_threshold;
writing_throttling_maximum_duration_ = writing_throttling_maximum_duration;
}
bool need_do_writing_throttle() const
{
int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage_ / 100;
int64_t cur_mem_hold = ATOMIC_LOAD(&hold_);
bool need_do_writing_throttle = cur_mem_hold > trigger_mem_limit;
return need_do_writing_throttle;
}
int64_t calc_mem_limit(int64_t cur_mem_hold, int64_t trigger_mem_limit, int64_t dt) const
{
double cur_chunk_seq = static_cast<double>(((cur_mem_hold - trigger_mem_limit) + MEM_SLICE_SIZE - 1)/ (MEM_SLICE_SIZE));
int64_t mem_can_be_assigned = 0;
int64_t allocate_size_in_the_page = MEM_SLICE_SIZE - (cur_mem_hold - trigger_mem_limit) % MEM_SLICE_SIZE;
int64_t accumulate_interval = 0;
int64_t the_page_interval = 0;
while (accumulate_interval < dt) {
the_page_interval = static_cast<int64_t>(decay_factor_ * cur_chunk_seq * cur_chunk_seq * cur_chunk_seq) * allocate_size_in_the_page / MEM_SLICE_SIZE;
accumulate_interval += the_page_interval;
mem_can_be_assigned += (accumulate_interval > dt ?
allocate_size_in_the_page - (accumulate_interval - dt) * allocate_size_in_the_page / the_page_interval :
allocate_size_in_the_page);
allocate_size_in_the_page = MEM_SLICE_SIZE;
cur_chunk_seq += double(1);
}
return mem_can_be_assigned;
}
int64_t expected_wait_time(int64_t seq) const
{
int64_t expected_wait_time = 0;
if (clock_ < seq) {
int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage_ / 100;
int64_t can_assign_in_next_period = calc_mem_limit(hold_, trigger_mem_limit, ADVANCE_CLOCK_INTERVAL);
expected_wait_time = (seq - clock_) * ADVANCE_CLOCK_INTERVAL / can_assign_in_next_period;
}
return expected_wait_time;
}
void speed_limit(int64_t cur_mem_hold, int64_t alloc_size)
{
int ret = OB_SUCCESS;
int64_t trigger_mem_limit = 0;
bool need_speed_limit = false;
int64_t seq = 0;
int64_t throttling_interval = 0;
if (trigger_percentage_ < 100) {
if (OB_UNLIKELY(cur_mem_hold < 0 || alloc_size <= 0 || lastest_memstore_threshold_ <= 0 || trigger_percentage_ <= 0)) {
COMMON_LOG(ERROR, "invalid arguments", K(cur_mem_hold), K(alloc_size), K(lastest_memstore_threshold_), K(trigger_percentage_));
} else if (cur_mem_hold > (trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage_ / 100)) {
need_speed_limit = true;
if (OB_FAIL(check_and_calc_decay_factor())) {
COMMON_LOG(WARN, "failed to check_and_calc_decay_factor", K(cur_mem_hold), K(alloc_size));
}
}
COMMON_LOG(INFO, "CCTT 2");
advance_clock(need_speed_limit);
seq = ATOMIC_AAF(&max_seq_, alloc_size);
get_seq() = seq;
tl_need_speed_limit() = need_speed_limit;
if (need_speed_limit && REACH_TIME_INTERVAL(1 * 1000 * 1000L)) {
COMMON_LOG(INFO, "report write throttle info", K(alloc_size), K(throttling_interval),
"max_seq_", ATOMIC_LOAD(&max_seq_), K(clock_),
K(cur_mem_hold), K(seq));
}
}
}
int check_and_calc_decay_factor()
{
int ret = OB_SUCCESS;
int64_t available_mem = (100 - trigger_percentage_) * lastest_memstore_threshold_ / 100;
double N = static_cast<double>(available_mem) / static_cast<double>(MEM_SLICE_SIZE);
decay_factor_ = (static_cast<double>(writing_throttling_maximum_duration_) - N * static_cast<double>(MIN_INTERVAL))/ static_cast<double>((((N*(N+1)*N*(N+1)))/4));
decay_factor_ = decay_factor_ < 0 ? 0 : decay_factor_;
COMMON_LOG(INFO, "recalculate decay factor", K(trigger_percentage_),
K(decay_factor_), K(writing_throttling_maximum_duration_), K(available_mem), K(N));
return ret;
}
bool check_clock_over_seq(int64_t req)
{
advance_clock(true);
RLockGuard guard(rwlock_);
if (clock_ < req) {
COMMON_LOG(INFO, "CCTT 1", K(clock_), K(req));
}
return req <= clock_;
}
void advance_clock(bool need_speed_limit)
{
int64_t cur_ts = ObTimeUtility::current_time();
int64_t old_ts = last_update_ts_;
if ((cur_ts - last_update_ts_ > ADVANCE_CLOCK_INTERVAL) &&
old_ts == ATOMIC_CAS(&last_update_ts_, old_ts, cur_ts)) {
WLockGuard guard(rwlock_);
int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage_ / 100;
int64_t mem_limit = (need_speed_limit ? calc_mem_limit(hold_, trigger_mem_limit, cur_ts - old_ts) : trigger_mem_limit - hold_);
clock_ = std::min(max_seq_, clock_ + mem_limit);
COMMON_LOG(INFO, "current clock is ", K(clock_), K(max_seq_), K(mem_limit), K(hold_), K(cur_ts - old_ts));
}
}
double decay_factor_;
int64_t trigger_percentage_;
int64_t lastest_memstore_threshold_;
int64_t hold_;
int64_t writing_throttling_maximum_duration_;
typedef common::SpinRWLock RWLock;
typedef common::SpinRLockGuard RLockGuard;
typedef common::SpinWLockGuard WLockGuard;
RWLock rwlock_;
int64_t max_seq_;
int64_t clock_;
int64_t last_update_ts_;
class ObTraversalFlushTask : public common::ObTimerTask
{
public:
ObTraversalFlushTask(TestSpeedLimit &test_speed_limit)
: test_speed_limit_(test_speed_limit) {}
virtual ~ObTraversalFlushTask() {}
virtual void runTimerTask()
{
test_speed_limit_.release(3000 * 1000 * 1000L);
}
private:
TestSpeedLimit &test_speed_limit_;
};
common::ObTimer timer_;
ObTraversalFlushTask task_;
};
void TestSpeedLimit::write(int64_t size)
{
speed_limit(hold_, size);
(void)ATOMIC_AAF(&hold_, size);
// mock write latency
int64_t sleep_time = ObRandom::rand(30, 1000);
ob_usleep(sleep_time);
bool &need_speed_limit = tl_need_speed_limit();
int64_t &seq = get_seq();
int64_t left_interval = SPEED_LIMIT_MAX_SLEEP_TIME;
bool has_sleep = false;
int64_t has_sleep_time = 0;
int time = 0;
bool need_sleep = need_speed_limit;
while (need_sleep &&
!check_clock_over_seq(seq) &&
(left_interval > 0)) {
int64_t wait_time = expected_wait_time(seq);
//LOG_INFO("CCTT 3", K(need_sleep), K(left_interval), K(wait_time), K(seq), K(clock_), K(max_seq_));
if (wait_time == 0) {
break;
}
uint32_t sleep_interval =
static_cast<uint32_t>(min(min(left_interval, SLEEP_INTERVAL_PER_TIME), wait_time));
ob_usleep<common::ObWaitEventIds::STORAGE_WRITING_THROTTLE_SLEEP>(sleep_interval);
has_sleep_time += sleep_interval;
time++;
left_interval -= sleep_interval;
has_sleep = true;
need_sleep = need_do_writing_throttle();
}
if (need_speed_limit && TC_REACH_TIME_INTERVAL(100L * 1000L)) {
LOG_INFO("throttle situation", K(has_sleep_time), K(time), K(seq));
}
}
void TestSpeedLimit::release(int64_t size)
{
if (hold_ > (lastest_memstore_threshold_ * (trigger_percentage_ + 30) / 100)) {
(void)ATOMIC_AAF(&hold_, -size);
}
}
TEST_F(TestSpeedLimit, test_speed_limit)
{
set_param(60, 9663676400, 3600000000);
std::vector<std::thread> insert_threads;
int insert_thread_num = 5;
for (int i = 0; i < insert_thread_num; ++i) {
insert_threads.push_back(std::thread([this]() {
const int64_t start_time = ObTimeUtility::current_time();
while (ObTimeUtility::current_time() - start_time <= WRITE_DURATION) {
write(1600);
}}));
}
for (int i = 0; i < insert_thread_num; ++i) {
insert_threads[i].join();
}
}
}
}
int main(int argc, char **argv)
{
system("rm -f test_speed_limit.log*");
OB_LOGGER.set_file_name("test_speed_limit.log", true);
OB_LOGGER.set_log_level("INFO");
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}