add some fix for speed_limit
This commit is contained in:
10
deps/oblib/src/lib/alloc/alloc_func.cpp
vendored
10
deps/oblib/src/lib/alloc/alloc_func.cpp
vendored
@ -74,6 +74,16 @@ int64_t get_tenant_memory_hold(uint64_t tenant_id)
|
|||||||
return bytes;
|
return bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t get_tenant_memory_remain(uint64_t tenant_id)
|
||||||
|
{
|
||||||
|
int64_t bytes = 0;
|
||||||
|
ObMallocAllocator *allocator = ObMallocAllocator::get_instance();
|
||||||
|
if (!OB_ISNULL(allocator)) {
|
||||||
|
bytes = allocator->get_tenant_remain(tenant_id);
|
||||||
|
}
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t get_tenant_memory_hold(const uint64_t tenant_id, const uint64_t ctx_id)
|
int64_t get_tenant_memory_hold(const uint64_t tenant_id, const uint64_t ctx_id)
|
||||||
{
|
{
|
||||||
int64_t bytes = 0;
|
int64_t bytes = 0;
|
||||||
|
|||||||
1
deps/oblib/src/lib/alloc/alloc_func.h
vendored
1
deps/oblib/src/lib/alloc/alloc_func.h
vendored
@ -32,6 +32,7 @@ void set_tenant_memory_limit(uint64_t tenant_id, int64_t bytes);
|
|||||||
int64_t get_tenant_memory_limit(uint64_t tenant_id);
|
int64_t get_tenant_memory_limit(uint64_t tenant_id);
|
||||||
int64_t get_tenant_memory_hold(uint64_t tenant_id);
|
int64_t get_tenant_memory_hold(uint64_t tenant_id);
|
||||||
int64_t get_tenant_memory_hold(const uint64_t tenant_id, const uint64_t ctx_id);
|
int64_t get_tenant_memory_hold(const uint64_t tenant_id, const uint64_t ctx_id);
|
||||||
|
int64_t get_tenant_memory_remain(uint64_t tenant_id);
|
||||||
void get_tenant_mod_memory(uint64_t tenant_id, int mod_id, common::ObModItem& ObModItem);
|
void get_tenant_mod_memory(uint64_t tenant_id, int mod_id, common::ObModItem& ObModItem);
|
||||||
void ob_set_reserved_memory(const int64_t bytes);
|
void ob_set_reserved_memory(const int64_t bytes);
|
||||||
void ob_set_urgent_memory(const int64_t bytes);
|
void ob_set_urgent_memory(const int64_t bytes);
|
||||||
|
|||||||
10
deps/oblib/src/lib/alloc/ob_malloc_allocator.cpp
vendored
10
deps/oblib/src/lib/alloc/ob_malloc_allocator.cpp
vendored
@ -420,6 +420,16 @@ int64_t ObMallocAllocator::get_tenant_hold(uint64_t tenant_id)
|
|||||||
return hold;
|
return hold;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t ObMallocAllocator::get_tenant_remain(uint64_t tenant_id)
|
||||||
|
{
|
||||||
|
int64_t remain = 0;
|
||||||
|
with_resource_handle_invoke(tenant_id, [&remain](ObTenantMemoryMgr *mgr) {
|
||||||
|
remain = mgr->get_limit() - mgr->get_sum_hold() + mgr->get_cache_hold();
|
||||||
|
return OB_SUCCESS;
|
||||||
|
});
|
||||||
|
return remain;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t ObMallocAllocator::get_tenant_rpc_hold(uint64_t tenant_id)
|
int64_t ObMallocAllocator::get_tenant_rpc_hold(uint64_t tenant_id)
|
||||||
{
|
{
|
||||||
int64_t rpc_hold = 0;
|
int64_t rpc_hold = 0;
|
||||||
|
|||||||
@ -57,6 +57,7 @@ public:
|
|||||||
static int set_tenant_limit(uint64_t tenant_id, int64_t bytes);
|
static int set_tenant_limit(uint64_t tenant_id, int64_t bytes);
|
||||||
static int64_t get_tenant_limit(uint64_t tenant_id);
|
static int64_t get_tenant_limit(uint64_t tenant_id);
|
||||||
static int64_t get_tenant_hold(uint64_t tenant_id);
|
static int64_t get_tenant_hold(uint64_t tenant_id);
|
||||||
|
static int64_t get_tenant_remain(uint64_t tenant_id);
|
||||||
static int64_t get_tenant_rpc_hold(uint64_t tenant_id);
|
static int64_t get_tenant_rpc_hold(uint64_t tenant_id);
|
||||||
int64_t get_tenant_ctx_hold(const uint64_t tenant_id, const uint64_t ctx_id) const;
|
int64_t get_tenant_ctx_hold(const uint64_t tenant_id, const uint64_t ctx_id) const;
|
||||||
void get_tenant_mod_usage(uint64_t tenant_id, int mod_id, common::ObModItem& item) const;
|
void get_tenant_mod_usage(uint64_t tenant_id, int mod_id, common::ObModItem& item) const;
|
||||||
|
|||||||
@ -281,12 +281,13 @@ void ObFifoArena::destroy_page(Page* page)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ObFifoArena::need_do_writing_throttle() const
|
bool ObFifoArena::need_do_writing_throttle()
|
||||||
{
|
{
|
||||||
int64_t trigger_percentage = get_writing_throttling_trigger_percentage_();
|
int64_t trigger_percentage = get_writing_throttling_trigger_percentage_();
|
||||||
int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100;
|
int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100;
|
||||||
|
int64_t trigger_mem_remain = lastest_memstore_threshold_ * (100 - trigger_percentage) / 100;
|
||||||
int64_t cur_mem_hold = ATOMIC_LOAD(&hold_);
|
int64_t cur_mem_hold = ATOMIC_LOAD(&hold_);
|
||||||
bool need_do_writing_throttle = cur_mem_hold > trigger_mem_limit;
|
bool need_do_writing_throttle = (cur_mem_hold > trigger_mem_limit) || (get_tenant_memory_remain_() < trigger_mem_remain);
|
||||||
return need_do_writing_throttle;
|
return need_do_writing_throttle;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -295,7 +296,10 @@ void ObFifoArena::speed_limit(int64_t cur_mem_hold, int64_t alloc_size)
|
|||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
int64_t trigger_percentage = get_writing_throttling_trigger_percentage_();
|
int64_t trigger_percentage = get_writing_throttling_trigger_percentage_();
|
||||||
int64_t trigger_mem_limit = 0;
|
int64_t trigger_mem_limit = 0;
|
||||||
|
int64_t trigger_mem_remain = 0;
|
||||||
|
bool is_memstore_overused = false;
|
||||||
if (trigger_percentage < 100) {
|
if (trigger_percentage < 100) {
|
||||||
|
const int64_t tenant_memory_remain = get_tenant_memory_remain_();
|
||||||
if (OB_UNLIKELY(
|
if (OB_UNLIKELY(
|
||||||
cur_mem_hold < 0 || alloc_size <= 0 || lastest_memstore_threshold_ <= 0 || trigger_percentage <= 0)) {
|
cur_mem_hold < 0 || alloc_size <= 0 || lastest_memstore_threshold_ <= 0 || trigger_percentage <= 0)) {
|
||||||
COMMON_LOG(ERROR,
|
COMMON_LOG(ERROR,
|
||||||
@ -304,13 +308,15 @@ void ObFifoArena::speed_limit(int64_t cur_mem_hold, int64_t alloc_size)
|
|||||||
K(alloc_size),
|
K(alloc_size),
|
||||||
K(lastest_memstore_threshold_),
|
K(lastest_memstore_threshold_),
|
||||||
K(trigger_percentage));
|
K(trigger_percentage));
|
||||||
} else if (cur_mem_hold > (trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100)) {
|
} else if ((is_memstore_overused = (cur_mem_hold > (trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100)))
|
||||||
|
|| (tenant_memory_remain < (trigger_mem_remain = lastest_memstore_threshold_ - trigger_mem_limit))) {
|
||||||
int64_t alloc_duration = get_writing_throttling_maximum_duration_();
|
int64_t alloc_duration = get_writing_throttling_maximum_duration_();
|
||||||
if (OB_FAIL(throttle_info_.check_and_calc_decay_factor(
|
if (OB_FAIL(throttle_info_.check_and_calc_decay_factor(
|
||||||
lastest_memstore_threshold_, trigger_percentage, alloc_duration))) {
|
lastest_memstore_threshold_, trigger_percentage, alloc_duration))) {
|
||||||
COMMON_LOG(WARN, "failed to check_and_calc_decay_factor", K(cur_mem_hold), K(alloc_size), K(throttle_info_));
|
COMMON_LOG(WARN, "failed to check_and_calc_decay_factor", K(cur_mem_hold), K(alloc_size), K(throttle_info_));
|
||||||
} else {
|
} else {
|
||||||
int64_t throttling_interval = get_throttling_interval(cur_mem_hold, alloc_size, trigger_mem_limit);
|
const int64_t mem_overused = MAX(cur_mem_hold - trigger_mem_limit, trigger_mem_remain - tenant_memory_remain);
|
||||||
|
int64_t throttling_interval = get_throttling_interval(mem_overused, alloc_size, is_memstore_overused);
|
||||||
int64_t cur_ts = ObTimeUtility::current_time();
|
int64_t cur_ts = ObTimeUtility::current_time();
|
||||||
int64_t new_base_ts = ATOMIC_AAF(&last_base_ts_, throttling_interval);
|
int64_t new_base_ts = ATOMIC_AAF(&last_base_ts_, throttling_interval);
|
||||||
int64_t sleep_interval = new_base_ts - cur_ts;
|
int64_t sleep_interval = new_base_ts - cur_ts;
|
||||||
@ -339,7 +345,11 @@ void ObFifoArena::speed_limit(int64_t cur_mem_hold, int64_t alloc_size)
|
|||||||
"last_base_ts",
|
"last_base_ts",
|
||||||
ATOMIC_LOAD(&last_base_ts_),
|
ATOMIC_LOAD(&last_base_ts_),
|
||||||
K(cur_mem_hold),
|
K(cur_mem_hold),
|
||||||
K(throttle_info_));
|
K(trigger_mem_limit),
|
||||||
|
K(trigger_mem_remain),
|
||||||
|
K(tenant_memory_remain),
|
||||||
|
K(throttle_info_),
|
||||||
|
K(mem_overused));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else { /*do nothing*/
|
} else { /*do nothing*/
|
||||||
@ -347,18 +357,24 @@ void ObFifoArena::speed_limit(int64_t cur_mem_hold, int64_t alloc_size)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t ObFifoArena::get_throttling_interval(int64_t cur_mem_hold, int64_t alloc_size, int64_t trigger_mem_limit)
|
int64_t ObFifoArena::get_throttling_interval(const int64_t mem_overused,
|
||||||
|
const int64_t alloc_size,
|
||||||
|
const bool is_memstore_overused)
|
||||||
{
|
{
|
||||||
constexpr int64_t MIN_INTERVAL_PER_ALLOC = 20;
|
constexpr int64_t MIN_INTERVAL_PER_ALLOC = 20;
|
||||||
int64_t chunk_cnt = ((alloc_size + MEM_SLICE_SIZE - 1) / (MEM_SLICE_SIZE));
|
int64_t chunk_cnt = ((alloc_size + MEM_SLICE_SIZE - 1) / (MEM_SLICE_SIZE));
|
||||||
int64_t chunk_seq = ((cur_mem_hold - trigger_mem_limit) + MEM_SLICE_SIZE - 1) / (MEM_SLICE_SIZE);
|
int64_t chunk_seq = (mem_overused + alloc_size + MEM_SLICE_SIZE - 1)/ (MEM_SLICE_SIZE);
|
||||||
int64_t ret_interval = 0;
|
int64_t ret_interval = 0;
|
||||||
double cur_chunk_seq = 1.0;
|
double cur_chunk_seq = 1.0;
|
||||||
for (int64_t i = 0; i < chunk_cnt && cur_chunk_seq > 0.0; ++i) {
|
for (int64_t i = 0; i < chunk_cnt && cur_chunk_seq > 0.0; ++i) {
|
||||||
cur_chunk_seq = static_cast<double>(chunk_seq - i);
|
cur_chunk_seq = static_cast<double>(chunk_seq - i);
|
||||||
ret_interval += static_cast<int64_t>(throttle_info_.decay_factor_ * cur_chunk_seq * cur_chunk_seq * cur_chunk_seq);
|
ret_interval += static_cast<int64_t>(throttle_info_.decay_factor_ * cur_chunk_seq * cur_chunk_seq * cur_chunk_seq);
|
||||||
}
|
}
|
||||||
return alloc_size * ret_interval / MEM_SLICE_SIZE + MIN_INTERVAL_PER_ALLOC;
|
ret_interval = ret_interval + MIN_INTERVAL;
|
||||||
|
const int64_t calc_sleep_interval = alloc_size * ret_interval / MEM_SLICE_SIZE;
|
||||||
|
const int64_t actual_sleep_interval = is_memstore_overused ? calc_sleep_interval
|
||||||
|
: MAX(TENANT_MEMORY_EXHAUSTION_FACTOR * calc_sleep_interval, MIN_SLEEP_INTERVAL_WITH_TENANT_MEMORY_EXHAUSTION);
|
||||||
|
return actual_sleep_interval;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ObFifoArena::set_memstore_threshold(int64_t memstore_threshold)
|
void ObFifoArena::set_memstore_threshold(int64_t memstore_threshold)
|
||||||
@ -395,5 +411,17 @@ int64_t ObFifoArena::get_writing_throttling_maximum_duration_() const
|
|||||||
return duration;
|
return duration;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t ObFifoArena::get_tenant_memory_remain_()
|
||||||
|
{
|
||||||
|
const uint64_t tenant_id = attr_.tenant_id_;
|
||||||
|
int64_t cur_time = ObTimeUtility::current_time();
|
||||||
|
int64_t old_time = ATOMIC_LOAD(&last_query_time_);
|
||||||
|
if (OB_UNLIKELY((QUERY_MEM_INTERVAL + old_time) < cur_time)
|
||||||
|
&& old_time == ATOMIC_CAS(&last_query_time_, old_time, cur_time)) {
|
||||||
|
ATOMIC_STORE(&remain_, lib::get_tenant_memory_remain(tenant_id));
|
||||||
|
}
|
||||||
|
return ATOMIC_LOAD(&remain_);
|
||||||
|
}
|
||||||
|
|
||||||
}; // namespace common
|
}; // namespace common
|
||||||
}; // end namespace oceanbase
|
}; // end namespace oceanbase
|
||||||
|
|||||||
@ -171,6 +171,8 @@ public:
|
|||||||
allocated_(0),
|
allocated_(0),
|
||||||
reclaimed_(0),
|
reclaimed_(0),
|
||||||
hold_(0),
|
hold_(0),
|
||||||
|
last_query_time_(0),
|
||||||
|
remain_(INT64_MAX),
|
||||||
retired_(0),
|
retired_(0),
|
||||||
last_base_ts_(0),
|
last_base_ts_(0),
|
||||||
last_reclaimed_(0),
|
last_reclaimed_(0),
|
||||||
@ -209,7 +211,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
void set_memstore_threshold(int64_t memstore_threshold);
|
void set_memstore_threshold(int64_t memstore_threshold);
|
||||||
bool need_do_writing_throttle() const;
|
bool need_do_writing_throttle();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
ObQSync& get_qs()
|
ObQSync& get_qs()
|
||||||
@ -262,10 +264,13 @@ private:
|
|||||||
void destroy_page(Page* page);
|
void destroy_page(Page* page);
|
||||||
void shrink_cached_page(int64_t nway);
|
void shrink_cached_page(int64_t nway);
|
||||||
void speed_limit(int64_t cur_mem_hold, int64_t alloc_size);
|
void speed_limit(int64_t cur_mem_hold, int64_t alloc_size);
|
||||||
int64_t get_throttling_interval(int64_t cur_mem_hold, int64_t alloc_size, int64_t trigger_mem_limit);
|
int64_t get_throttling_interval(const int64_t overused_mem,
|
||||||
|
const int64_t alloc_size,
|
||||||
|
const bool is_memstore_overused);
|
||||||
int64_t get_actual_hold_size(Page* page);
|
int64_t get_actual_hold_size(Page* page);
|
||||||
int64_t get_writing_throttling_trigger_percentage_() const;
|
int64_t get_writing_throttling_trigger_percentage_() const;
|
||||||
int64_t get_writing_throttling_maximum_duration_() const;
|
int64_t get_writing_throttling_maximum_duration_() const;
|
||||||
|
int64_t get_tenant_memory_remain_();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static const int64_t MAX_WAIT_INTERVAL = 20 * 1000 * 1000; // 20s
|
static const int64_t MAX_WAIT_INTERVAL = 20 * 1000 * 1000; // 20s
|
||||||
@ -273,12 +278,19 @@ private:
|
|||||||
static const int64_t MIN_INTERVAL = 20000;
|
static const int64_t MIN_INTERVAL = 20000;
|
||||||
static const int64_t DEFAULT_TRIGGER_PERCENTAGE = 100;
|
static const int64_t DEFAULT_TRIGGER_PERCENTAGE = 100;
|
||||||
static const int64_t DEFAULT_DURATION = 60 * 60 * 1000 * 1000L; // us
|
static const int64_t DEFAULT_DURATION = 60 * 60 * 1000 * 1000L; // us
|
||||||
|
static const int64_t QUERY_MEM_INTERVAL = 100 * 1000L;//100ms
|
||||||
|
//sleep interval should be multipled by TENANT_MEMORY_EXHAUSTION_FACTOR when writing throttling is
|
||||||
|
//triggered by exhaustion of tenant memory
|
||||||
|
static const int64_t TENANT_MEMORY_EXHAUSTION_FACTOR = 10;
|
||||||
|
static const int64_t MIN_SLEEP_INTERVAL_WITH_TENANT_MEMORY_EXHAUSTION = 1000;
|
||||||
lib::ObMemAttr attr_;
|
lib::ObMemAttr attr_;
|
||||||
ObIAllocator* allocator_;
|
ObIAllocator* allocator_;
|
||||||
int64_t nway_;
|
int64_t nway_;
|
||||||
int64_t allocated_;
|
int64_t allocated_;
|
||||||
int64_t reclaimed_;
|
int64_t reclaimed_;
|
||||||
int64_t hold_; // for single tenant
|
int64_t hold_; // for single tenant
|
||||||
|
int64_t last_query_time_;//improve performance
|
||||||
|
int64_t remain_;//improve performance
|
||||||
int64_t retired_;
|
int64_t retired_;
|
||||||
int64_t last_base_ts_;
|
int64_t last_base_ts_;
|
||||||
|
|
||||||
|
|||||||
@ -171,7 +171,7 @@ public:
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
int set_memstore_threshold(uint64_t tenant_id);
|
int set_memstore_threshold(uint64_t tenant_id);
|
||||||
bool need_do_writing_throttle() const
|
bool need_do_writing_throttle()
|
||||||
{
|
{
|
||||||
return arena_.need_do_writing_throttle();
|
return arena_.need_do_writing_throttle();
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1895,13 +1895,14 @@ int ObTenantManager::check_memory_used(const int64_t tenant_id,
|
|||||||
} else {
|
} else {
|
||||||
double tenant_memory_hold = get_tenant_memory_hold(tenant_id);
|
double tenant_memory_hold = get_tenant_memory_hold(tenant_id);
|
||||||
double tenant_memory_limit = get_tenant_memory_limit(tenant_id);
|
double tenant_memory_limit = get_tenant_memory_limit(tenant_id);
|
||||||
|
double tenant_memory_remain = get_tenant_memory_remain(tenant_id);
|
||||||
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
|
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
|
||||||
int64_t trigger_percentage = tenant_config->writing_throttling_trigger_percentage;
|
int64_t trigger_percentage = tenant_config->writing_throttling_trigger_percentage;
|
||||||
if (!tenant_config.is_valid()) {
|
if (!tenant_config.is_valid()) {
|
||||||
COMMON_LOG(INFO, "failed to get tenant config");
|
COMMON_LOG(INFO, "failed to get tenant config");
|
||||||
} else {
|
} else {
|
||||||
if (tenant_memory_limit > tenant_memory_hold &&
|
if (tenant_memory_limit > tenant_memory_hold &&
|
||||||
(tenant_memory_limit - tenant_memory_hold < mem_memstore_limit / 100 * (100 - trigger_percentage) * 0.95)) {
|
(tenant_memory_remain < mem_memstore_limit / 100 * (100 - trigger_percentage) / 0.95)) {
|
||||||
use_too_much_memory = true;
|
use_too_much_memory = true;
|
||||||
COMMON_LOG(INFO,
|
COMMON_LOG(INFO,
|
||||||
"A minor freeze is needed by writing throttling",
|
"A minor freeze is needed by writing throttling",
|
||||||
|
|||||||
Reference in New Issue
Block a user