301 lines
9.9 KiB
C++
301 lines
9.9 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#include <gtest/gtest.h>
|
|
#include <thread>
|
|
|
|
#define USING_LOG_PREFIX STORAGE
|
|
|
|
#define protected public
|
|
#define private public
|
|
#include "lib/lock/ob_spin_rwlock.h" // SpinRWLock
|
|
#include "lib/time/ob_time_utility.h"
|
|
#include "lib/random/ob_random.h"
|
|
#include "lib/utility/utility.h"
|
|
#include "lib/task/ob_timer.h"
|
|
|
|
|
|
using namespace oceanbase::common;
|
|
using namespace std;
|
|
|
|
namespace oceanbase
|
|
{
|
|
|
|
namespace storage
|
|
{
|
|
|
|
class TestSpeedLimit : public ::testing::Test
|
|
{
|
|
public:
|
|
TestSpeedLimit()
|
|
: timer_(),
|
|
task_(*this)
|
|
{}
|
|
virtual ~TestSpeedLimit() = default;
|
|
|
|
virtual void SetUp() override
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (OB_FAIL(timer_.init("Flush"))) {
|
|
LOG_INFO("fail to init timer", K(ret));
|
|
} else if (OB_FAIL(timer_.schedule(task_, FLUSH_INTERVAL, true))) {
|
|
LOG_INFO("fail to schedule checkpoint task", K(ret));
|
|
}
|
|
}
|
|
|
|
virtual void TearDown() override
|
|
{
|
|
timer_.wait();
|
|
timer_.stop();
|
|
}
|
|
|
|
static void SetUpTestCase() {}
|
|
static void TearDownTestCase() {}
|
|
|
|
static const int64_t MEM_SLICE_SIZE = 2 * 1024 * 1024; //Bytes per usecond
|
|
static const int64_t ADVANCE_CLOCK_INTERVAL = 50;//50us
|
|
static const int64_t MIN_INTERVAL = 20000;
|
|
static const int64_t SLEEP_INTERVAL_PER_TIME = 20 * 1000;
|
|
static const int64_t SPEED_LIMIT_MAX_SLEEP_TIME = 20 * 1000 * 1000;
|
|
static const int64_t FLUSH_INTERVAL = 5 * 1000 * 1000;
|
|
static const int64_t INSERT_THREADS = 1000;
|
|
static const int64_t WRITE_DURATION = 1000L; //adjust when test
|
|
|
|
void write(int64_t size);
|
|
|
|
void release(int64_t size);
|
|
|
|
void set_param(int64_t trigger_percentage, int64_t lastest_memstore_threshold, int64_t writing_throttling_maximum_duration)
|
|
{
|
|
trigger_percentage_ = trigger_percentage;
|
|
lastest_memstore_threshold_ = lastest_memstore_threshold;
|
|
writing_throttling_maximum_duration_ = writing_throttling_maximum_duration;
|
|
}
|
|
|
|
bool need_do_writing_throttle() const
|
|
{
|
|
int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage_ / 100;
|
|
int64_t cur_mem_hold = ATOMIC_LOAD(&hold_);
|
|
bool need_do_writing_throttle = cur_mem_hold > trigger_mem_limit;
|
|
return need_do_writing_throttle;
|
|
}
|
|
|
|
int64_t calc_mem_limit(int64_t cur_mem_hold, int64_t trigger_mem_limit, int64_t dt) const
|
|
{
|
|
double cur_chunk_seq = static_cast<double>(((cur_mem_hold - trigger_mem_limit) + MEM_SLICE_SIZE - 1)/ (MEM_SLICE_SIZE));
|
|
int64_t mem_can_be_assigned = 0;
|
|
|
|
int64_t allocate_size_in_the_page = MEM_SLICE_SIZE - (cur_mem_hold - trigger_mem_limit) % MEM_SLICE_SIZE;
|
|
int64_t accumulate_interval = 0;
|
|
int64_t the_page_interval = 0;
|
|
while (accumulate_interval < dt) {
|
|
the_page_interval = static_cast<int64_t>(decay_factor_ * cur_chunk_seq * cur_chunk_seq * cur_chunk_seq) * allocate_size_in_the_page / MEM_SLICE_SIZE;
|
|
accumulate_interval += the_page_interval;
|
|
|
|
mem_can_be_assigned += (accumulate_interval > dt ?
|
|
allocate_size_in_the_page - (accumulate_interval - dt) * allocate_size_in_the_page / the_page_interval :
|
|
allocate_size_in_the_page);
|
|
allocate_size_in_the_page = MEM_SLICE_SIZE;
|
|
cur_chunk_seq += double(1);
|
|
}
|
|
|
|
return mem_can_be_assigned;
|
|
}
|
|
|
|
int64_t expected_wait_time(int64_t seq) const
|
|
{
|
|
int64_t expected_wait_time = 0;
|
|
if (clock_ < seq) {
|
|
int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage_ / 100;
|
|
int64_t can_assign_in_next_period = calc_mem_limit(hold_, trigger_mem_limit, ADVANCE_CLOCK_INTERVAL);
|
|
expected_wait_time = (seq - clock_) * ADVANCE_CLOCK_INTERVAL / can_assign_in_next_period;
|
|
}
|
|
|
|
return expected_wait_time;
|
|
}
|
|
|
|
void speed_limit(int64_t cur_mem_hold, int64_t alloc_size)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
int64_t trigger_mem_limit = 0;
|
|
bool need_speed_limit = false;
|
|
int64_t seq = 0;
|
|
int64_t throttling_interval = 0;
|
|
if (trigger_percentage_ < 100) {
|
|
if (OB_UNLIKELY(cur_mem_hold < 0 || alloc_size <= 0 || lastest_memstore_threshold_ <= 0 || trigger_percentage_ <= 0)) {
|
|
COMMON_LOG(ERROR, "invalid arguments", K(cur_mem_hold), K(alloc_size), K(lastest_memstore_threshold_), K(trigger_percentage_));
|
|
} else if (cur_mem_hold > (trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage_ / 100)) {
|
|
need_speed_limit = true;
|
|
if (OB_FAIL(check_and_calc_decay_factor())) {
|
|
COMMON_LOG(WARN, "failed to check_and_calc_decay_factor", K(cur_mem_hold), K(alloc_size));
|
|
}
|
|
}
|
|
COMMON_LOG(INFO, "CCTT 2");
|
|
advance_clock(need_speed_limit);
|
|
seq = ATOMIC_AAF(&max_seq_, alloc_size);
|
|
get_seq() = seq;
|
|
tl_need_speed_limit() = need_speed_limit;
|
|
|
|
if (need_speed_limit && REACH_TIME_INTERVAL(1 * 1000 * 1000L)) {
|
|
COMMON_LOG(INFO, "report write throttle info", K(alloc_size), K(throttling_interval),
|
|
"max_seq_", ATOMIC_LOAD(&max_seq_), K(clock_),
|
|
K(cur_mem_hold), K(seq));
|
|
}
|
|
}
|
|
}
|
|
|
|
int check_and_calc_decay_factor()
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
int64_t available_mem = (100 - trigger_percentage_) * lastest_memstore_threshold_ / 100;
|
|
double N = static_cast<double>(available_mem) / static_cast<double>(MEM_SLICE_SIZE);
|
|
decay_factor_ = (static_cast<double>(writing_throttling_maximum_duration_) - N * static_cast<double>(MIN_INTERVAL))/ static_cast<double>((((N*(N+1)*N*(N+1)))/4));
|
|
decay_factor_ = decay_factor_ < 0 ? 0 : decay_factor_;
|
|
COMMON_LOG(INFO, "recalculate decay factor", K(trigger_percentage_),
|
|
K(decay_factor_), K(writing_throttling_maximum_duration_), K(available_mem), K(N));
|
|
|
|
return ret;
|
|
}
|
|
|
|
bool check_clock_over_seq(int64_t req)
|
|
{
|
|
advance_clock(true);
|
|
RLockGuard guard(rwlock_);
|
|
if (clock_ < req) {
|
|
COMMON_LOG(INFO, "CCTT 1", K(clock_), K(req));
|
|
}
|
|
|
|
return req <= clock_;
|
|
}
|
|
|
|
void advance_clock(bool need_speed_limit)
|
|
{
|
|
int64_t cur_ts = ObTimeUtility::current_time();
|
|
int64_t old_ts = last_update_ts_;
|
|
if ((cur_ts - last_update_ts_ > ADVANCE_CLOCK_INTERVAL) &&
|
|
old_ts == ATOMIC_CAS(&last_update_ts_, old_ts, cur_ts)) {
|
|
WLockGuard guard(rwlock_);
|
|
int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage_ / 100;
|
|
int64_t mem_limit = (need_speed_limit ? calc_mem_limit(hold_, trigger_mem_limit, cur_ts - old_ts) : trigger_mem_limit - hold_);
|
|
clock_ = std::min(max_seq_, clock_ + mem_limit);
|
|
COMMON_LOG(INFO, "current clock is ", K(clock_), K(max_seq_), K(mem_limit), K(hold_), K(cur_ts - old_ts));
|
|
}
|
|
}
|
|
|
|
double decay_factor_;
|
|
int64_t trigger_percentage_;
|
|
int64_t lastest_memstore_threshold_;
|
|
int64_t hold_;
|
|
int64_t writing_throttling_maximum_duration_;
|
|
|
|
typedef common::SpinRWLock RWLock;
|
|
typedef common::SpinRLockGuard RLockGuard;
|
|
typedef common::SpinWLockGuard WLockGuard;
|
|
RWLock rwlock_;
|
|
int64_t max_seq_;
|
|
int64_t clock_;
|
|
int64_t last_update_ts_;
|
|
|
|
class ObTraversalFlushTask : public common::ObTimerTask
|
|
{
|
|
public:
|
|
ObTraversalFlushTask(TestSpeedLimit &test_speed_limit)
|
|
: test_speed_limit_(test_speed_limit) {}
|
|
virtual ~ObTraversalFlushTask() {}
|
|
virtual void runTimerTask()
|
|
{
|
|
test_speed_limit_.release(3000 * 1000 * 1000L);
|
|
}
|
|
private:
|
|
TestSpeedLimit &test_speed_limit_;
|
|
};
|
|
common::ObTimer timer_;
|
|
ObTraversalFlushTask task_;
|
|
};
|
|
|
|
void TestSpeedLimit::write(int64_t size)
|
|
{
|
|
speed_limit(hold_, size);
|
|
|
|
(void)ATOMIC_AAF(&hold_, size);
|
|
// mock write latency
|
|
int64_t sleep_time = ObRandom::rand(30, 1000);
|
|
ob_usleep(sleep_time);
|
|
|
|
bool &need_speed_limit = tl_need_speed_limit();
|
|
int64_t &seq = get_seq();
|
|
int64_t left_interval = SPEED_LIMIT_MAX_SLEEP_TIME;
|
|
bool has_sleep = false;
|
|
int64_t has_sleep_time = 0;
|
|
int time = 0;
|
|
bool need_sleep = need_speed_limit;
|
|
|
|
while (need_sleep &&
|
|
!check_clock_over_seq(seq) &&
|
|
(left_interval > 0)) {
|
|
int64_t wait_time = expected_wait_time(seq);
|
|
//LOG_INFO("CCTT 3", K(need_sleep), K(left_interval), K(wait_time), K(seq), K(clock_), K(max_seq_));
|
|
if (wait_time == 0) {
|
|
break;
|
|
}
|
|
uint32_t sleep_interval =
|
|
static_cast<uint32_t>(min(min(left_interval, SLEEP_INTERVAL_PER_TIME), wait_time));
|
|
ob_usleep<common::ObWaitEventIds::STORAGE_WRITING_THROTTLE_SLEEP>(sleep_interval);
|
|
has_sleep_time += sleep_interval;
|
|
time++;
|
|
left_interval -= sleep_interval;
|
|
has_sleep = true;
|
|
need_sleep = need_do_writing_throttle();
|
|
}
|
|
|
|
if (need_speed_limit && TC_REACH_TIME_INTERVAL(100L * 1000L)) {
|
|
LOG_INFO("throttle situation", K(has_sleep_time), K(time), K(seq));
|
|
}
|
|
}
|
|
|
|
void TestSpeedLimit::release(int64_t size)
|
|
{
|
|
if (hold_ > (lastest_memstore_threshold_ * (trigger_percentage_ + 30) / 100)) {
|
|
(void)ATOMIC_AAF(&hold_, -size);
|
|
}
|
|
}
|
|
|
|
TEST_F(TestSpeedLimit, test_speed_limit)
|
|
{
|
|
set_param(60, 9663676400, 3600000000);
|
|
std::vector<std::thread> insert_threads;
|
|
|
|
int insert_thread_num = 5;
|
|
for (int i = 0; i < insert_thread_num; ++i) {
|
|
insert_threads.push_back(std::thread([this]() {
|
|
const int64_t start_time = ObTimeUtility::current_time();
|
|
while (ObTimeUtility::current_time() - start_time <= WRITE_DURATION) {
|
|
write(1600);
|
|
}}));
|
|
}
|
|
|
|
for (int i = 0; i < insert_thread_num; ++i) {
|
|
insert_threads[i].join();
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
int main(int argc, char **argv)
|
|
{
|
|
system("rm -f test_speed_limit.log*");
|
|
OB_LOGGER.set_file_name("test_speed_limit.log", true);
|
|
OB_LOGGER.set_log_level("INFO");
|
|
testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
} |