BUGFIX: make write throttle perform as expected
This commit is contained in:
@ -328,18 +328,25 @@ bool ObFifoArena::check_clock_over_seq(const int64_t req)
|
||||
return req <= clock;
|
||||
}
|
||||
|
||||
int64_t ObFifoArena::get_clock()
|
||||
{
|
||||
advance_clock();
|
||||
return clock_;
|
||||
}
|
||||
|
||||
void ObFifoArena::advance_clock()
|
||||
{
|
||||
int64_t cur_ts = ObTimeUtility::current_time();
|
||||
int64_t old_ts = last_update_ts_;
|
||||
if ((cur_ts - last_update_ts_ > ADVANCE_CLOCK_INTERVAL) &&
|
||||
const int64_t advance_us = cur_ts - old_ts;
|
||||
if ((advance_us > ADVANCE_CLOCK_INTERVAL) &&
|
||||
old_ts == ATOMIC_CAS(&last_update_ts_, old_ts, cur_ts)) {
|
||||
int64_t trigger_percentage = get_writing_throttling_trigger_percentage_();
|
||||
int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100;
|
||||
int64_t cur_mem_hold = ATOMIC_LOAD(&hold_);
|
||||
int64_t mem_limit = calc_mem_limit(cur_mem_hold, trigger_mem_limit, ADVANCE_CLOCK_INTERVAL);
|
||||
int64_t clock = ATOMIC_LOAD(&clock_);
|
||||
int64_t max_seq = ATOMIC_LOAD(&max_seq_);
|
||||
const int64_t trigger_percentage = get_writing_throttling_trigger_percentage_();
|
||||
const int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100;
|
||||
const int64_t cur_mem_hold = ATOMIC_LOAD(&hold_);
|
||||
const int64_t mem_limit = calc_mem_limit(cur_mem_hold, trigger_mem_limit, advance_us);
|
||||
const int64_t clock = ATOMIC_LOAD(&clock_);
|
||||
const int64_t max_seq = ATOMIC_LOAD(&max_seq_);
|
||||
ATOMIC_SET(&clock_, min(max_seq, clock + mem_limit));
|
||||
if (REACH_TIME_INTERVAL(100 * 1000L)) {
|
||||
COMMON_LOG(INFO, "current clock is ",
|
||||
@ -449,39 +456,44 @@ void ObFifoArena::set_memstore_threshold(int64_t memstore_threshold)
|
||||
template<int64_t N>
|
||||
struct INTEGER_WRAPPER
|
||||
{
|
||||
INTEGER_WRAPPER() : v_(N) {}
|
||||
INTEGER_WRAPPER() : v_(N), tenant_id_(0) {}
|
||||
int64_t v_;
|
||||
uint64_t tenant_id_;
|
||||
};
|
||||
|
||||
int64_t ObFifoArena::get_writing_throttling_trigger_percentage_() const
|
||||
{
|
||||
RLOCAL(INTEGER_WRAPPER<DEFAULT_TRIGGER_PERCENTAGE>, wrapper);
|
||||
int64_t &trigger_percentage = (&wrapper)->v_;
|
||||
if (TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) { // 5s
|
||||
int64_t &trigger_v = (&wrapper)->v_;
|
||||
uint64_t &tenant_id = (&wrapper)->tenant_id_;
|
||||
if (tenant_id != attr_.tenant_id_ || TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) { // 5s
|
||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(attr_.tenant_id_));
|
||||
if (!tenant_config.is_valid()) {
|
||||
COMMON_LOG(INFO, "failed to get tenant config", K(attr_));
|
||||
} else {
|
||||
trigger_percentage = tenant_config->writing_throttling_trigger_percentage;
|
||||
trigger_v = tenant_config->writing_throttling_trigger_percentage;
|
||||
tenant_id = attr_.tenant_id_;
|
||||
}
|
||||
}
|
||||
return trigger_percentage;
|
||||
return trigger_v;
|
||||
}
|
||||
|
||||
int64_t ObFifoArena::get_writing_throttling_maximum_duration_() const
|
||||
{
|
||||
RLOCAL(INTEGER_WRAPPER<DEFAULT_DURATION>, wrapper);
|
||||
int64_t &duration = (&wrapper)->v_;
|
||||
if (TC_REACH_TIME_INTERVAL(1 * 1000 * 1000)) { // 1s
|
||||
int64_t &duration_v = (&wrapper)->v_;
|
||||
uint64_t &tenant_id = (&wrapper)->tenant_id_;
|
||||
if (tenant_id != attr_.tenant_id_ || TC_REACH_TIME_INTERVAL(1 * 1000 * 1000)) { // 1s
|
||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(attr_.tenant_id_));
|
||||
if (!tenant_config.is_valid()) {
|
||||
//keep default
|
||||
COMMON_LOG(INFO, "failed to get tenant config", K(attr_));
|
||||
} else {
|
||||
duration = tenant_config->writing_throttling_maximum_duration;
|
||||
duration_v = tenant_config->writing_throttling_maximum_duration;
|
||||
tenant_id = attr_.tenant_id_;
|
||||
}
|
||||
}
|
||||
return duration;
|
||||
return duration_v;
|
||||
}
|
||||
|
||||
}; // end namespace allocator
|
||||
|
||||
@ -169,6 +169,7 @@ public:
|
||||
void set_memstore_threshold(int64_t memstore_threshold);
|
||||
bool need_do_writing_throttle() const;
|
||||
bool check_clock_over_seq(const int64_t seq);
|
||||
int64_t get_clock();
|
||||
int64_t expected_wait_time(const int64_t seq) const;
|
||||
int64_t get_max_cached_memstore_size() const
|
||||
{
|
||||
|
||||
@ -170,10 +170,14 @@ public:
|
||||
public:
|
||||
int set_memstore_threshold(uint64_t tenant_id);
|
||||
bool need_do_writing_throttle() const {return arena_.need_do_writing_throttle();}
|
||||
bool check_clock_over_seq(int64_t seq)
|
||||
bool check_clock_over_seq(const int64_t seq)
|
||||
{
|
||||
return arena_.check_clock_over_seq(seq);
|
||||
}
|
||||
int64_t get_clock()
|
||||
{
|
||||
return arena_.get_clock();
|
||||
}
|
||||
int64_t expected_wait_time(int64_t seq) const
|
||||
{
|
||||
return arena_.expected_wait_time(seq);
|
||||
|
||||
@ -53,9 +53,9 @@ ObStorageTableGuard::~ObStorageTableGuard()
|
||||
bool &need_speed_limit = tl_need_speed_limit();
|
||||
if (need_control_mem_ && need_speed_limit) {
|
||||
bool need_sleep = true;
|
||||
int64_t left_interval = SPEED_LIMIT_MAX_SLEEP_TIME;
|
||||
int64_t left_interval = INT64_MAX;
|
||||
if (!for_replay_) {
|
||||
left_interval = min(SPEED_LIMIT_MAX_SLEEP_TIME, store_ctx_.timeout_ - ObTimeUtility::current_time());
|
||||
left_interval = min(left_interval, store_ctx_.timeout_ - ObTimeUtility::current_time());
|
||||
}
|
||||
if (NULL != memtable_) {
|
||||
need_sleep = memtable_->is_active_memtable();
|
||||
@ -68,7 +68,8 @@ ObStorageTableGuard::~ObStorageTableGuard()
|
||||
bool has_sleep = false;
|
||||
int64_t sleep_time = 0;
|
||||
int time = 0;
|
||||
int64_t &seq = get_seq();
|
||||
const int64_t &seq = get_seq();
|
||||
int64_t clock = 0;
|
||||
if (store_ctx_.mvcc_acc_ctx_.is_write()) {
|
||||
ObGMemstoreAllocator* memstore_allocator = NULL;
|
||||
if (OB_SUCCESS != (tmp_ret = ObMemstoreAllocatorMgr::get_instance().get_tenant_memstore_allocator(
|
||||
@ -76,6 +77,7 @@ ObStorageTableGuard::~ObStorageTableGuard()
|
||||
} else if (OB_ISNULL(memstore_allocator)) {
|
||||
LOG_WARN_RET(OB_ALLOCATE_MEMORY_FAILED, "get_tenant_mutil_allocator failed", K(store_ctx_.tablet_id_), K(tmp_ret));
|
||||
} else {
|
||||
clock = memstore_allocator->get_clock();
|
||||
while (need_sleep &&
|
||||
!memstore_allocator->check_clock_over_seq(seq) &&
|
||||
(left_interval > 0)) {
|
||||
@ -106,7 +108,7 @@ ObStorageTableGuard::~ObStorageTableGuard()
|
||||
if (REACH_TIME_INTERVAL(100 * 1000L) &&
|
||||
sleep_time > 0) {
|
||||
int64_t cost_time = ObTimeUtility::current_time() - init_ts_;
|
||||
LOG_INFO("throttle situation", K(sleep_time), K(time), K(seq), K(for_replay_), K(cost_time));
|
||||
LOG_INFO("throttle situation", K(sleep_time), K(clock), K(time), K(seq), K(for_replay_), K(cost_time));
|
||||
}
|
||||
|
||||
if (for_replay_ && has_sleep) {
|
||||
|
||||
@ -63,7 +63,6 @@ private:
|
||||
private:
|
||||
static const int64_t LOG_INTERVAL_US = 10 * 1000 * 1000;
|
||||
static const int64_t GET_TS_INTERVAL = 10 * 1000;
|
||||
static const int64_t SPEED_LIMIT_MAX_SLEEP_TIME = 20 * 1000 * 1000;
|
||||
static const int64_t SLEEP_INTERVAL_PER_TIME = 20 * 1000;
|
||||
|
||||
ObTablet *tablet_;
|
||||
|
||||
Reference in New Issue
Block a user