[FEAT MERGE] Implement Resource Throttle

This commit is contained in:
ZenoWang
2023-12-12 15:12:41 +00:00
committed by ob-robot
parent 4235388eaf
commit 56e0ec6dcf
93 changed files with 3353 additions and 1667 deletions

View File

@ -16,15 +16,15 @@
#include <malloc.h>
#endif
#include "math.h"
#include "ob_memstore_allocator_mgr.h"
#include "share/ob_tenant_mgr.h"
#include "observer/omt/ob_tenant_config_mgr.h"
#include "lib/alloc/alloc_struct.h"
#include "lib/stat/ob_diagnose_info.h"
#include "share/throttle/ob_throttle_common.h"
#include "observer/omt/ob_tenant_config_mgr.h"
#include "share/allocator/ob_shared_memory_allocator_mgr.h"
#include "share/ob_tenant_mgr.h"
using namespace oceanbase::lib;
using namespace oceanbase::omt;
using namespace oceanbase::share;
namespace oceanbase
{
namespace common
@ -42,58 +42,7 @@ int64_t ObFifoArena::Page::get_actual_hold_size()
#endif
}
void ObFifoArena::ObWriteThrottleInfo::reset()
{
decay_factor_ = 0.0;
alloc_duration_ = 0;
trigger_percentage_ = 0;
memstore_threshold_ = 0;
ATOMIC_SET(&period_throttled_count_, 0);
ATOMIC_SET(&period_throttled_time_, 0);
ATOMIC_SET(&total_throttled_count_, 0);
ATOMIC_SET(&total_throttled_time_, 0);
}
void ObFifoArena::ObWriteThrottleInfo::reset_period_stat_info()
{
ATOMIC_SET(&period_throttled_count_, 0);
ATOMIC_SET(&period_throttled_time_, 0);
}
void ObFifoArena::ObWriteThrottleInfo::record_limit_event(int64_t interval)
{
ATOMIC_INC(&period_throttled_count_);
ATOMIC_FAA(&period_throttled_time_, interval);
ATOMIC_INC(&total_throttled_count_);
ATOMIC_FAA(&total_throttled_time_, interval);
}
int ObFifoArena::ObWriteThrottleInfo::check_and_calc_decay_factor(int64_t memstore_threshold,
int64_t trigger_percentage,
int64_t alloc_duration)
{
int ret = OB_SUCCESS;
if (memstore_threshold != memstore_threshold_
|| trigger_percentage != trigger_percentage_
|| alloc_duration != alloc_duration_
|| decay_factor_ <= 0) {
memstore_threshold_ = memstore_threshold;
trigger_percentage_ = trigger_percentage;
alloc_duration_ = alloc_duration;
int64_t available_mem = (100 - trigger_percentage_) * memstore_threshold_ / 100;
double N = static_cast<double>(available_mem) / static_cast<double>(MEM_SLICE_SIZE);
double decay_factor = (static_cast<double>(alloc_duration) - N * static_cast<double>(MIN_INTERVAL))/ static_cast<double>((((N*(N+1)*N*(N+1)))/4));
decay_factor_ = decay_factor < 0 ? 0 : decay_factor;
COMMON_LOG(INFO, "recalculate decay factor", K(memstore_threshold_), K(trigger_percentage_),
K(decay_factor_), K(alloc_duration), K(available_mem), K(N));
if (decay_factor < 0) {
LOG_ERROR("decay factor is smaller than 0", K(decay_factor), K(alloc_duration), K(N));
}
}
return ret;
}
int ObFifoArena::init(uint64_t tenant_id)
int ObFifoArena::init()
{
int ret = OB_SUCCESS;
lib::ObMallocAllocator *allocator = lib::ObMallocAllocator::get_instance();
@ -107,7 +56,7 @@ int ObFifoArena::init(uint64_t tenant_id)
}
if (OB_SUCC(ret)) {
attr_.tenant_id_ = tenant_id;
attr_.tenant_id_ = MTL_ID();
attr_.label_ = ObNewModIds::OB_MEMSTORE;
attr_.ctx_id_ = ctx_id;
}
@ -161,7 +110,7 @@ void* ObFifoArena::alloc(int64_t adv_idx, Handle& handle, int64_t size)
int ret = OB_SUCCESS;
void* ptr = NULL;
int64_t rsize = size + sizeof(Page) + sizeof(Ref);
speed_limit(ATOMIC_LOAD(&hold_), size);
CriticalGuard(get_qs());
int64_t way_id = get_way_id();
int64_t idx = get_idx(adv_idx, way_id);
@ -281,243 +230,5 @@ void ObFifoArena::destroy_page(Page* page)
}
}
bool ObFifoArena::need_do_writing_throttle() const
{
bool need_do_writing_throttle = false;
int64_t trigger_percentage = get_writing_throttling_trigger_percentage_();
if (trigger_percentage < 100) {
int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100;
int64_t cur_mem_hold = ATOMIC_LOAD(&hold_);
need_do_writing_throttle = cur_mem_hold > trigger_mem_limit;
}
return need_do_writing_throttle;
}
void ObFifoArena::speed_limit(const int64_t cur_mem_hold, const int64_t alloc_size)
{
int ret = OB_SUCCESS;
int64_t trigger_percentage = get_writing_throttling_trigger_percentage_();
int64_t trigger_mem_limit = 0;
bool need_speed_limit = false;
int64_t seq = max_seq_;
int64_t throttling_interval = 0;
if (trigger_percentage < 100) {
if (OB_UNLIKELY(cur_mem_hold < 0 || alloc_size <= 0 || lastest_memstore_threshold_ <= 0 || trigger_percentage <= 0)) {
COMMON_LOG(ERROR, "invalid arguments", K(cur_mem_hold), K(alloc_size), K(lastest_memstore_threshold_), K(trigger_percentage));
} else if (cur_mem_hold > (trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100)) {
need_speed_limit = true;
seq = ATOMIC_AAF(&max_seq_, alloc_size);
int64_t alloc_duration = get_writing_throttling_maximum_duration_();
if (OB_FAIL(throttle_info_.check_and_calc_decay_factor(lastest_memstore_threshold_, trigger_percentage, alloc_duration))) {
COMMON_LOG(WARN, "failed to check_and_calc_decay_factor", K(cur_mem_hold), K(alloc_size), K(throttle_info_));
}
}
advance_clock();
get_seq() = seq;
tl_need_speed_limit() = need_speed_limit;
share::get_thread_alloc_stat() += alloc_size;
if (need_speed_limit && REACH_TIME_INTERVAL(1 * 1000 * 1000L)) {
COMMON_LOG(INFO, "report write throttle info", K(alloc_size), K(attr_), K(throttling_interval),
"max_seq_", ATOMIC_LOAD(&max_seq_), K(clock_),
K(cur_mem_hold), K(throttle_info_), K(seq));
}
}
}
bool ObFifoArena::check_clock_over_seq(const int64_t req)
{
advance_clock();
int64_t clock = ATOMIC_LOAD(&clock_);
return req <= clock;
}
int64_t ObFifoArena::get_clock()
{
advance_clock();
return clock_;
}
void ObFifoArena::skip_clock(const int64_t skip_size)
{
int64_t ov = 0;
int64_t nv = ATOMIC_LOAD(&clock_);
while ((ov = nv) < ATOMIC_LOAD(&max_seq_)
&& ov != (nv = ATOMIC_CAS(&clock_, ov, min(ATOMIC_LOAD(&max_seq_), ov + skip_size)))) {
PAUSE();
if (REACH_TIME_INTERVAL(100 * 1000L)) {
const int64_t max_seq = ATOMIC_LOAD(&max_seq_);
const int64_t cur_mem_hold = ATOMIC_LOAD(&hold_);
COMMON_LOG(INFO, "skip clock",
K(clock_), K(max_seq_), K(skip_size), K(cur_mem_hold), K(attr_.tenant_id_));
}
}
}
void ObFifoArena::advance_clock()
{
int64_t cur_ts = ObTimeUtility::current_time();
int64_t old_ts = last_update_ts_;
const int64_t advance_us = cur_ts - old_ts;
if ((advance_us > ADVANCE_CLOCK_INTERVAL) &&
old_ts == ATOMIC_CAS(&last_update_ts_, old_ts, cur_ts)) {
const int64_t trigger_percentage = get_writing_throttling_trigger_percentage_();
const int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100;
const int64_t cur_mem_hold = ATOMIC_LOAD(&hold_);
const int64_t mem_limit = calc_mem_limit(cur_mem_hold, trigger_mem_limit, advance_us);
const int64_t clock = ATOMIC_LOAD(&clock_);
const int64_t max_seq = ATOMIC_LOAD(&max_seq_);
ATOMIC_SET(&clock_, min(max_seq, clock + mem_limit));
if (REACH_TIME_INTERVAL(100 * 1000L)) {
COMMON_LOG(INFO, "current clock is ",
K(clock_), K(max_seq_), K(mem_limit), K(cur_mem_hold), K(attr_.tenant_id_));
}
}
}
int64_t ObFifoArena::expected_wait_time(const int64_t seq) const
{
int64_t expected_wait_time = 0;
int64_t trigger_percentage = get_writing_throttling_trigger_percentage_();
int64_t trigger_mem_limit = lastest_memstore_threshold_ * trigger_percentage / 100;
int64_t can_assign_in_next_period = calc_mem_limit(hold_, trigger_mem_limit, ADVANCE_CLOCK_INTERVAL);
int64_t clock = ATOMIC_LOAD(&clock_);
if (seq > clock) {
if (can_assign_in_next_period != 0) {
expected_wait_time = (seq - clock) * ADVANCE_CLOCK_INTERVAL / can_assign_in_next_period;
} else {
expected_wait_time = ADVANCE_CLOCK_INTERVAL;
}
}
return expected_wait_time;
}
// how much memory we can get after dt time.
int64_t ObFifoArena::calc_mem_limit(const int64_t cur_mem_hold, const int64_t trigger_mem_limit, const int64_t dt) const
{
int ret = OB_SUCCESS;
int64_t mem_can_be_assigned = 0;
const double decay_factor = throttle_info_.decay_factor_;
int64_t init_seq = 0;
int64_t init_page_left_size = 0;
double init_page_left_interval = 0;
double past_interval = 0;
double last_page_interval = 0;
double mid_result = 0;
double approx_max_chunk_seq = 0;
int64_t max_seq = 0;
double accumulate_interval = 0;
if (cur_mem_hold < trigger_mem_limit) {
// there is no speed limit now
// we can get all the memory before speed limit
mem_can_be_assigned = trigger_mem_limit - cur_mem_hold;
} else if (decay_factor <= 0) {
mem_can_be_assigned = 0;
LOG_WARN("we should limit speed, but the decay factor not calculate now", K(cur_mem_hold), K(trigger_mem_limit), K(dt));
} else {
init_seq = ((cur_mem_hold - trigger_mem_limit) + MEM_SLICE_SIZE - 1) / (MEM_SLICE_SIZE);
init_page_left_size = MEM_SLICE_SIZE - (cur_mem_hold - trigger_mem_limit) % MEM_SLICE_SIZE;
init_page_left_interval = (1.0 * decay_factor * pow(init_seq, 3) *
init_page_left_size / MEM_SLICE_SIZE);
past_interval = decay_factor * pow(init_seq, 2) * pow(init_seq + 1, 2) / 4;
// there is speed limit
if (init_page_left_interval > dt) {
last_page_interval = decay_factor * pow(init_seq, 3);
mem_can_be_assigned = dt / last_page_interval * MEM_SLICE_SIZE;
} else {
mid_result = 4.0 * (dt + past_interval - init_page_left_interval) / decay_factor;
approx_max_chunk_seq = pow(mid_result, 0.25);
max_seq = floor(approx_max_chunk_seq);
for (int i = 0; i < 2; i++) {
if (pow(max_seq, 2) * pow(max_seq + 1, 2) < mid_result) {
max_seq = max_seq + 1;
}
}
accumulate_interval = pow(max_seq, 2) * pow(max_seq + 1, 2) * decay_factor / 4 - past_interval + init_page_left_interval;
mem_can_be_assigned = init_page_left_size + (max_seq - init_seq) * MEM_SLICE_SIZE;
if (accumulate_interval > dt) {
last_page_interval = decay_factor * pow(max_seq, 3);
mem_can_be_assigned -= (accumulate_interval - dt) / last_page_interval * MEM_SLICE_SIZE;
}
}
// defensive code
if (pow(max_seq, 2) * pow(max_seq + 1, 2) < mid_result) {
LOG_ERROR("unexpected result", K(max_seq), K(mid_result));
}
}
// defensive code
if (mem_can_be_assigned <= 0) {
LOG_WARN("we can not get memory now", K(mem_can_be_assigned), K(decay_factor), K(cur_mem_hold), K(trigger_mem_limit), K(dt));
}
return mem_can_be_assigned;
}
int64_t ObFifoArena::get_throttling_interval(const int64_t cur_mem_hold,
const int64_t alloc_size,
const int64_t trigger_mem_limit)
{
constexpr int64_t MIN_INTERVAL_PER_ALLOC = 20;
int64_t chunk_cnt = ((alloc_size + MEM_SLICE_SIZE - 1) / (MEM_SLICE_SIZE));
int64_t chunk_seq = ((cur_mem_hold - trigger_mem_limit) + MEM_SLICE_SIZE - 1)/ (MEM_SLICE_SIZE);
int64_t ret_interval = 0;
double cur_chunk_seq = 1.0;
for (int64_t i = 0; i < chunk_cnt && cur_chunk_seq > 0.0; ++i) {
cur_chunk_seq = static_cast<double>(chunk_seq - i);
ret_interval += static_cast<int64_t>(throttle_info_.decay_factor_ * cur_chunk_seq * cur_chunk_seq * cur_chunk_seq);
}
return alloc_size * ret_interval / MEM_SLICE_SIZE + MIN_INTERVAL_PER_ALLOC;
}
void ObFifoArena::set_memstore_threshold(int64_t memstore_threshold)
{
ATOMIC_STORE(&lastest_memstore_threshold_, memstore_threshold);
}
template<int64_t N>
struct INTEGER_WRAPPER
{
INTEGER_WRAPPER() : v_(N), tenant_id_(0) {}
int64_t v_;
uint64_t tenant_id_;
};
int64_t ObFifoArena::get_writing_throttling_trigger_percentage_() const
{
RLOCAL(INTEGER_WRAPPER<DEFAULT_TRIGGER_PERCENTAGE>, wrapper);
int64_t &trigger_v = (&wrapper)->v_;
uint64_t &tenant_id = (&wrapper)->tenant_id_;
if (tenant_id != attr_.tenant_id_ || TC_REACH_TIME_INTERVAL(5 * 1000 * 1000)) { // 5s
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(attr_.tenant_id_));
if (!tenant_config.is_valid()) {
COMMON_LOG(INFO, "failed to get tenant config", K(attr_));
} else {
trigger_v = tenant_config->writing_throttling_trigger_percentage;
tenant_id = attr_.tenant_id_;
}
}
return trigger_v;
}
int64_t ObFifoArena::get_writing_throttling_maximum_duration_() const
{
RLOCAL(INTEGER_WRAPPER<DEFAULT_DURATION>, wrapper);
int64_t &duration_v = (&wrapper)->v_;
uint64_t &tenant_id = (&wrapper)->tenant_id_;
if (tenant_id != attr_.tenant_id_ || TC_REACH_TIME_INTERVAL(1 * 1000 * 1000)) { // 1s
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(attr_.tenant_id_));
if (!tenant_config.is_valid()) {
//keep default
COMMON_LOG(INFO, "failed to get tenant config", K(attr_));
} else {
duration_v = tenant_config->writing_throttling_maximum_duration;
tenant_id = attr_.tenant_id_;
}
}
return duration_v;
}
}; // end namespace allocator
}; // end namespace oceanbase

View File

@ -25,10 +25,10 @@ namespace oceanbase
{
namespace common
{
class ObMemstoreAllocatorMgr;
class ObActiveList;
class ObFifoArena
{
public:
static int64_t total_hold_;
struct Page;
@ -146,13 +146,26 @@ public:
};
public:
enum { MAX_CACHED_GROUP_COUNT = 16, MAX_CACHED_PAGE_COUNT = MAX_CACHED_GROUP_COUNT * Handle::MAX_NWAY, PAGE_SIZE = OB_MALLOC_BIG_BLOCK_SIZE + sizeof(Page) + sizeof(Ref)};
ObFifoArena(): allocator_(NULL), nway_(0), allocated_(0), reclaimed_(0), hold_(0), retired_(0), max_seq_(0), clock_(0), last_update_ts_(0),
last_reclaimed_(0), lastest_memstore_threshold_(0)
{ memset(cur_pages_, 0, sizeof(cur_pages_)); }
enum {
MAX_CACHED_GROUP_COUNT = 16,
MAX_CACHED_PAGE_COUNT = MAX_CACHED_GROUP_COUNT * Handle::MAX_NWAY,
PAGE_SIZE = OB_MALLOC_BIG_BLOCK_SIZE + sizeof(Page) + sizeof(Ref)
};
ObFifoArena()
: allocator_(NULL),
nway_(0),
allocated_(0),
reclaimed_(0),
hold_(0),
retired_(0),
last_reclaimed_(0),
lastest_memstore_threshold_(0)
{
memset(cur_pages_, 0, sizeof(cur_pages_));
}
~ObFifoArena() { reset(); }
public:
int init(uint64_t tenant_id);
int init();
void reset();
void update_nway_per_group(int64_t nway);
void* alloc(int64_t idx, Handle& handle, int64_t size);
@ -160,23 +173,17 @@ public:
int64_t allocated() const { return ATOMIC_LOAD(&allocated_); }
int64_t retired() const { return ATOMIC_LOAD(&retired_); }
int64_t reclaimed() const { return ATOMIC_LOAD(&reclaimed_); }
int64_t hold() const {
int64_t rsize = ATOMIC_LOAD(&reclaimed_);
int64_t asize = ATOMIC_LOAD(&allocated_);
return asize - rsize;
}
uint64_t get_tenant_id() const { return attr_.tenant_id_; }
void set_memstore_threshold(int64_t memstore_threshold);
bool need_do_writing_throttle() const;
bool check_clock_over_seq(const int64_t seq);
int64_t get_clock();
int64_t expected_wait_time(const int64_t seq) const;
void skip_clock(const int64_t skip_size);
int64_t hold() const {
return hold_;
}
uint64_t get_tenant_id() const { return attr_.tenant_id_; }
int64_t get_max_cached_memstore_size() const
{
return MAX_CACHED_GROUP_COUNT * ATOMIC_LOAD(&nway_) * (PAGE_SIZE + ACHUNK_PRESERVE_SIZE);
}
private:
ObQSync& get_qs() {
static ObQSync s_qs;
@ -185,36 +192,6 @@ private:
int64_t get_way_id() { return icpu_id() % ATOMIC_LOAD(&nway_); }
int64_t get_idx(int64_t grp_id, int64_t way_id) { return (grp_id % MAX_CACHED_GROUP_COUNT) * Handle::MAX_NWAY + way_id; }
struct ObWriteThrottleInfo {
public:
ObWriteThrottleInfo(){ reset();}
~ObWriteThrottleInfo(){}
void reset();
void reset_period_stat_info();
void record_limit_event(int64_t interval);
int check_and_calc_decay_factor(int64_t memstore_threshold,
int64_t trigger_percentage,
int64_t alloc_duration);
TO_STRING_KV(K(decay_factor_),
K(alloc_duration_),
K(trigger_percentage_),
K(memstore_threshold_),
K(period_throttled_count_),
K(period_throttled_time_),
K(total_throttled_count_),
K(total_throttled_time_));
public:
//control info
double decay_factor_;
int64_t alloc_duration_;
int64_t trigger_percentage_;
int64_t memstore_threshold_;
//stat info
int64_t period_throttled_count_;
int64_t period_throttled_time_;
int64_t total_throttled_count_;
int64_t total_throttled_time_;
};
private:
void release_ref(Ref* ref);
Page* alloc_page(int64_t size);
@ -222,24 +199,11 @@ private:
void retire_page(int64_t way_id, Handle& handle, Page* ptr);
void destroy_page(Page* page);
void shrink_cached_page(int64_t nway);
void speed_limit(const int64_t cur_mem_hold, const int64_t alloc_size);
int64_t get_throttling_interval(const int64_t cur_mem_hold,
const int64_t alloc_size,
const int64_t trigger_mem_limit);
void advance_clock();
int64_t calc_mem_limit(const int64_t cur_mem_hold, const int64_t trigger_mem_limit, const int64_t dt) const;
int64_t get_actual_hold_size(Page* page);
int64_t get_writing_throttling_trigger_percentage_() const;
int64_t get_writing_throttling_maximum_duration_() const;
private:
static const int64_t MAX_WAIT_INTERVAL = 20 * 1000 * 1000;//20s
static const int64_t ADVANCE_CLOCK_INTERVAL = 50;// 50us
static const int64_t MEM_SLICE_SIZE = 2 * 1024 * 1024; //Bytes per usecond
static const int64_t MIN_INTERVAL = 20000;
static const int64_t DEFAULT_TRIGGER_PERCENTAGE = 100;
static const int64_t DEFAULT_DURATION = 60 * 60 * 1000 * 1000L;//us
lib::ObMemAttr attr_;
lib::ObIAllocator *allocator_;
int64_t nway_;
int64_t allocated_; // record all the memory hold by pages in history.
// increase while a page created and decrease only if a failed page destroyed.
@ -250,13 +214,8 @@ private:
// (may be: hold_ = allocated_ - reclaimed_)
int64_t retired_; // record all the memory hold by not active pages in history.
int64_t max_seq_;
int64_t clock_;
int64_t last_update_ts_;
int64_t last_reclaimed_;
Page* cur_pages_[MAX_CACHED_PAGE_COUNT];
ObWriteThrottleInfo throttle_info_;
int64_t lastest_memstore_threshold_;//Save the latest memstore_threshold
DISALLOW_COPY_AND_ASSIGN(ObFifoArena);
};

View File

@ -1,201 +0,0 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "ob_gmemstore_allocator.h"
#include "ob_memstore_allocator_mgr.h"
#include "share/rc/ob_tenant_base.h"
#include "storage/memtable/ob_memtable.h"
#include "lib/utility/ob_print_utils.h"
#include "observer/omt/ob_multi_tenant.h"
#include "observer/ob_server_struct.h"
#include "share/ob_tenant_mgr.h"
#include "storage/tx_storage/ob_tenant_freezer.h"
namespace oceanbase
{
using namespace share;
namespace common
{
int FrozenMemstoreInfoLogger::operator()(ObDLink* link)
{
int ret = OB_SUCCESS;
ObGMemstoreAllocator::AllocHandle* handle = CONTAINER_OF(link, typeof(*handle), total_list_);
memtable::ObMemtable& mt = handle->mt_;
if (handle->is_frozen()) {
if (OB_FAIL(databuff_print_obj(buf_, limit_, pos_, mt))) {
} else {
ret = databuff_printf(buf_, limit_, pos_, ",");
}
}
return ret;
}
int ActiveMemstoreInfoLogger::operator()(ObDLink* link)
{
int ret = OB_SUCCESS;
ObGMemstoreAllocator::AllocHandle* handle = CONTAINER_OF(link, typeof(*handle), total_list_);
memtable::ObMemtable& mt = handle->mt_;
if (handle->is_active()) {
if (OB_FAIL(databuff_print_obj(buf_, limit_, pos_, mt))) {
} else {
ret = databuff_printf(buf_, limit_, pos_, ",");
}
}
return ret;
}
int ObGMemstoreAllocator::AllocHandle::init(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
ObGMemstoreAllocator* host = NULL;
if (OB_FAIL(ObMemstoreAllocatorMgr::get_instance().get_tenant_memstore_allocator(tenant_id, host))) {
ret = OB_ERR_UNEXPECTED;
} else if (NULL == host){
ret = OB_ERR_UNEXPECTED;
} else {
host->init_handle(*this, tenant_id);
}
return ret;
}
void ObGMemstoreAllocator::init_handle(AllocHandle& handle, uint64_t tenant_id)
{
handle.do_reset();
handle.set_host(this);
{
int64_t nway = nway_per_group();
LockGuard guard(lock_);
hlist_.init_handle(handle);
arena_.update_nway_per_group(nway);
set_memstore_threshold_without_lock(tenant_id);
}
COMMON_LOG(TRACE, "MTALLOC.init", KP(&handle.mt_));
}
void ObGMemstoreAllocator::destroy_handle(AllocHandle& handle)
{
ObTimeGuard time_guard("ObGMemstoreAllocator::destroy_handle", 100 * 1000);
COMMON_LOG(TRACE, "MTALLOC.destroy", KP(&handle.mt_));
arena_.free(handle.arena_handle_);
time_guard.click();
{
LockGuard guard(lock_);
time_guard.click();
hlist_.destroy_handle(handle);
time_guard.click();
if (hlist_.is_empty()) {
arena_.reset();
}
time_guard.click();
}
handle.do_reset();
}
void* ObGMemstoreAllocator::alloc(AllocHandle& handle, int64_t size)
{
int ret = OB_SUCCESS;
int64_t align_size = upper_align(size, sizeof(int64_t));
uint64_t tenant_id = arena_.get_tenant_id();
bool is_out_of_mem = false;
if (!handle.is_id_valid()) {
COMMON_LOG(TRACE, "MTALLOC.first_alloc", KP(&handle.mt_));
LockGuard guard(lock_);
if (handle.is_frozen()) {
COMMON_LOG(ERROR, "cannot alloc because allocator is frozen", K(ret), K(handle.mt_));
} else if (!handle.is_id_valid()) {
handle.set_clock(arena_.retired());
hlist_.set_active(handle);
}
}
MTL_SWITCH(tenant_id) {
storage::ObTenantFreezer *freezer = nullptr;
if (is_virtual_tenant_id(tenant_id)) {
// virtual tenant should not have memstore.
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(ERROR, "virtual tenant should not have memstore", K(ret), K(tenant_id));
} else if (FALSE_IT(freezer = MTL(storage::ObTenantFreezer*))) {
} else if (OB_FAIL(freezer->check_memstore_full_internal(is_out_of_mem))) {
COMMON_LOG(ERROR, "fail to check tenant out of mem limit", K(ret), K(tenant_id));
}
}
if (OB_FAIL(ret)) {
is_out_of_mem = true;
}
if (is_out_of_mem && REACH_TIME_INTERVAL(1 * 1000 * 1000)) {
STORAGE_LOG(WARN, "this tenant is already out of memstore limit or some thing wrong.", K(tenant_id));
}
return is_out_of_mem ? nullptr : arena_.alloc(handle.id_, handle.arena_handle_, align_size);
}
void ObGMemstoreAllocator::set_frozen(AllocHandle& handle)
{
COMMON_LOG(TRACE, "MTALLOC.set_frozen", KP(&handle.mt_));
LockGuard guard(lock_);
hlist_.set_frozen(handle);
}
static int64_t calc_nway(int64_t cpu, int64_t mem)
{
return std::min(cpu, mem/20/ObFifoArena::PAGE_SIZE);
}
int64_t ObGMemstoreAllocator::nway_per_group()
{
int ret = OB_SUCCESS;
uint64_t tenant_id = arena_.get_tenant_id();
double min_cpu = 0;
double max_cpu = 0;
int64_t max_memory = 0;
int64_t min_memory = 0;
omt::ObMultiTenant *omt = GCTX.omt_;
MTL_SWITCH(tenant_id) {
storage::ObTenantFreezer *freezer = nullptr;
if (NULL == omt) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "omt should not be null", K(tenant_id), K(ret));
} else if (OB_FAIL(omt->get_tenant_cpu(tenant_id, min_cpu, max_cpu))) {
COMMON_LOG(WARN, "get tenant cpu failed", K(tenant_id), K(ret));
} else if (FALSE_IT(freezer = MTL(storage::ObTenantFreezer *))) {
} else if (OB_FAIL(freezer->get_tenant_mem_limit(min_memory, max_memory))) {
COMMON_LOG(WARN, "get tenant mem limit failed", K(tenant_id), K(ret));
}
}
return OB_SUCCESS == ret? calc_nway((int64_t)max_cpu, min_memory): 0;
}
int ObGMemstoreAllocator::set_memstore_threshold(uint64_t tenant_id)
{
LockGuard guard(lock_);
int ret = set_memstore_threshold_without_lock(tenant_id);
return ret;
}
int ObGMemstoreAllocator::set_memstore_threshold_without_lock(uint64_t tenant_id)
{
int ret = OB_SUCCESS;
int64_t memstore_threshold = INT64_MAX;
MTL_SWITCH(tenant_id) {
storage::ObTenantFreezer *freezer = nullptr;
if (FALSE_IT(freezer = MTL(storage::ObTenantFreezer *))) {
} else if (OB_FAIL(freezer->get_tenant_memstore_limit(memstore_threshold))) {
COMMON_LOG(WARN, "failed to get_tenant_memstore_limit", K(tenant_id), K(ret));
} else {
arena_.set_memstore_threshold(memstore_threshold);
}
}
return ret;
}
}; // end namespace common
}; // end namespace oceanbase

View File

@ -19,7 +19,6 @@ namespace oceanbase
{
namespace common
{
class ObFifoArena;
class ObHandleList
{
public:

View File

@ -0,0 +1,177 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "ob_mds_allocator.h"
#include "share/allocator/ob_shared_memory_allocator_mgr.h"
#include "share/rc/ob_tenant_base.h"
#include "share/throttle/ob_share_throttle_define.h"
#include "storage/multi_data_source/runtime_utility/mds_tenant_service.h"
#include "storage/tx_storage/ob_tenant_freezer.h"
using namespace oceanbase::storage::mds;
namespace oceanbase {
namespace share {
int64_t ObTenantMdsAllocator::resource_unit_size()
{
static const int64_t MDS_RESOURCE_UNIT_SIZE = OB_MALLOC_NORMAL_BLOCK_SIZE; /* 8KB */
return MDS_RESOURCE_UNIT_SIZE;
}
void ObTenantMdsAllocator::init_throttle_config(int64_t &resource_limit, int64_t &trigger_percentage, int64_t &max_duration)
{
// define some default value
const int64_t MDS_LIMIT_PERCENTAGE = 5;
const int64_t MDS_THROTTLE_TRIGGER_PERCENTAGE = 60;
const int64_t MDS_THROTTLE_MAX_DURATION = 2LL * 60LL * 60LL * 1000LL * 1000LL; // 2 hours
int64_t total_memory = lib::get_tenant_memory_limit(MTL_ID());
// Use tenant config to init throttle config
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (tenant_config.is_valid()) {
resource_limit = total_memory * tenant_config->_mds_memory_limit_percentage / 100LL;
trigger_percentage = tenant_config->writing_throttling_trigger_percentage;
max_duration = tenant_config->writing_throttling_maximum_duration;
} else {
SHARE_LOG_RET(WARN, OB_INVALID_CONFIG, "init throttle config with default value");
resource_limit = total_memory * MDS_LIMIT_PERCENTAGE / 100;
trigger_percentage = MDS_THROTTLE_TRIGGER_PERCENTAGE;
max_duration = MDS_THROTTLE_MAX_DURATION;
}
}
void ObTenantMdsAllocator::adaptive_update_limit(const int64_t holding_size,
const int64_t config_specify_resource_limit,
int64_t &resource_limit,
int64_t &last_update_limit_ts,
bool &is_updated)
{
// do nothing
}
int ObTenantMdsAllocator::init()
{
int ret = OB_SUCCESS;
ObMemAttr mem_attr;
// TODO : @gengli new ctx id?
mem_attr.tenant_id_ = MTL_ID();
mem_attr.ctx_id_ = ObCtxIds::MDS_DATA_ID;
mem_attr.label_ = "MdsTable";
ObSharedMemAllocMgr *share_mem_alloc_mgr = MTL(ObSharedMemAllocMgr *);
throttle_tool_ = &(share_mem_alloc_mgr->share_resource_throttle_tool());
MDS_TG(10_ms);
if (IS_INIT){
ret = OB_INIT_TWICE;
SHARE_LOG(WARN, "init tenant mds allocator twice", KR(ret), KPC(this));
} else if (OB_ISNULL(throttle_tool_)) {
ret = OB_ERR_UNEXPECTED;
SHARE_LOG(WARN, "throttle tool is unexpected null", KP(throttle_tool_), KP(share_mem_alloc_mgr));
} else if (MDS_FAIL(allocator_.init(OB_MALLOC_NORMAL_BLOCK_SIZE, block_alloc_, mem_attr))) {
MDS_LOG(WARN, "init vslice allocator failed", K(ret), K(OB_MALLOC_NORMAL_BLOCK_SIZE), KP(this), K(mem_attr));
} else {
allocator_.set_nway(MDS_ALLOC_CONCURRENCY);
is_inited_ = true;
}
return ret;
}
void *ObTenantMdsAllocator::alloc(const int64_t size)
{
int64_t abs_expire_time = THIS_WORKER.get_timeout_ts();
return alloc(size, abs_expire_time);
}
void *ObTenantMdsAllocator::alloc(const int64_t size, const ObMemAttr &attr)
{
UNUSED(attr);
void *obj = alloc(size);
MDS_LOG_RET(WARN, OB_INVALID_ARGUMENT, "VSLICE Allocator not support mark attr", KP(obj), K(size), K(attr));
return obj;
}
void *ObTenantMdsAllocator::alloc(const int64_t size, const int64_t abs_expire_time)
{
bool is_throttled = false;
// try alloc resource from throttle tool
(void)throttle_tool_->alloc_resource<ObTenantMdsAllocator>(size, abs_expire_time, is_throttled);
// if is throttled, do throttle
if (OB_UNLIKELY(is_throttled)) {
if (MTL(ObTenantFreezer *)->exist_ls_freezing()) {
(void)throttle_tool_->skip_throttle<ObTenantTxDataAllocator>(size);
} else {
(void)throttle_tool_->do_throttle<ObTenantTxDataAllocator>(abs_expire_time);
}
}
void *obj = allocator_.alloc(size);
MDS_LOG(DEBUG, "mds alloc ", K(size), KP(obj), K(abs_expire_time));
if (OB_NOT_NULL(obj)) {
MTL(storage::mds::ObTenantMdsService *)
->record_alloc_backtrace(obj,
__thread_mds_tag__,
__thread_mds_alloc_type__,
__thread_mds_alloc_file__,
__thread_mds_alloc_func__,
__thread_mds_alloc_line__); // for debug mem leak
}
return obj;
}
void ObTenantMdsAllocator::free(void *ptr)
{
allocator_.free(ptr);
MTL(storage::mds::ObTenantMdsService *)->erase_alloc_backtrace(ptr);
}
void ObTenantMdsAllocator::set_attr(const ObMemAttr &attr) { allocator_.set_attr(attr); }
void *ObTenantBufferCtxAllocator::alloc(const int64_t size)
{
void *obj = share::mtl_malloc(size, ObMemAttr(MTL_ID(), "MDS_CTX_DEFAULT", ObCtxIds::MDS_CTX_ID));
if (OB_NOT_NULL(obj)) {
MTL(ObTenantMdsService*)->record_alloc_backtrace(obj,
__thread_mds_tag__,
__thread_mds_alloc_type__,
__thread_mds_alloc_file__,
__thread_mds_alloc_func__,
__thread_mds_alloc_line__);// for debug mem leak
}
return obj;
}
void *ObTenantBufferCtxAllocator::alloc(const int64_t size, const ObMemAttr &attr)
{
void *obj = share::mtl_malloc(size, attr);
if (OB_NOT_NULL(obj)) {
MTL(ObTenantMdsService*)->record_alloc_backtrace(obj,
__thread_mds_tag__,
__thread_mds_alloc_type__,
__thread_mds_alloc_file__,
__thread_mds_alloc_func__,
__thread_mds_alloc_line__);// for debug mem leak
}
return obj;
}
void ObTenantBufferCtxAllocator::free(void *ptr)
{
share::mtl_free(ptr);
MTL(ObTenantMdsService*)->erase_alloc_backtrace(ptr);
}
} // namespace share
} // namespace oceanbase

View File

@ -0,0 +1,61 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_ALLOCATOR_OB_MDS_ALLOCATOR_H_
#define OCEANBASE_ALLOCATOR_OB_MDS_ALLOCATOR_H_
#include "lib/allocator/ob_vslice_alloc.h"
#include "share/throttle/ob_share_throttle_define.h"
namespace oceanbase {
namespace share {
class ObTenantMdsAllocator : public ObIAllocator {
private:
static const int64_t MDS_ALLOC_CONCURRENCY = 32;
public:
DEFINE_CUSTOM_FUNC_FOR_THROTTLE(Mds);
public:
ObTenantMdsAllocator() : is_inited_(false), throttle_tool_(nullptr), block_alloc_(), allocator_() {}
int init();
void destroy() { is_inited_ = false; }
void *alloc(const int64_t size, const int64_t expire_ts);
virtual void *alloc(const int64_t size) override;
virtual void *alloc(const int64_t size, const ObMemAttr &attr) override;
virtual void free(void *ptr) override;
virtual void set_attr(const ObMemAttr &attr) override;
int64_t hold() { return allocator_.hold(); }
TO_STRING_KV(K(is_inited_), KP(this), KP(throttle_tool_), KP(&block_alloc_), KP(&allocator_));
private:
bool is_inited_;
share::TxShareThrottleTool *throttle_tool_;
common::ObBlockAllocMgr block_alloc_;
common::ObVSliceAlloc allocator_;
};
struct ObTenantBufferCtxAllocator : public ObIAllocator// for now, it is just a wrapper of mtl_malloc
{
virtual void *alloc(const int64_t size) override;
virtual void *alloc(const int64_t size, const ObMemAttr &attr) override;
virtual void free(void *ptr) override;
virtual void set_attr(const ObMemAttr &) override {}
};
} // namespace share
} // namespace oceanbase
#endif

View File

@ -0,0 +1,247 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "ob_shared_memory_allocator_mgr.h"
#include "share/rc/ob_tenant_base.h"
#include "storage/memtable/ob_memtable.h"
#include "lib/utility/ob_print_utils.h"
#include "observer/omt/ob_multi_tenant.h"
#include "observer/ob_server_struct.h"
#include "share/ob_tenant_mgr.h"
#include "storage/tx_storage/ob_tenant_freezer.h"
namespace oceanbase
{
using namespace share;
namespace share
{
int FrozenMemstoreInfoLogger::operator()(ObDLink* link)
{
int ret = OB_SUCCESS;
ObMemstoreAllocator::AllocHandle* handle = CONTAINER_OF(link, typeof(*handle), total_list_);
memtable::ObMemtable& mt = handle->mt_;
if (handle->is_frozen()) {
if (OB_FAIL(databuff_print_obj(buf_, limit_, pos_, mt))) {
} else {
ret = databuff_printf(buf_, limit_, pos_, ",");
}
}
return ret;
}
int ActiveMemstoreInfoLogger::operator()(ObDLink* link)
{
int ret = OB_SUCCESS;
ObMemstoreAllocator::AllocHandle* handle = CONTAINER_OF(link, typeof(*handle), total_list_);
memtable::ObMemtable& mt = handle->mt_;
if (handle->is_active()) {
if (OB_FAIL(databuff_print_obj(buf_, limit_, pos_, mt))) {
} else {
ret = databuff_printf(buf_, limit_, pos_, ",");
}
}
return ret;
}
int ObMemstoreAllocator::AllocHandle::init()
{
int ret = OB_SUCCESS;
uint64_t tenant_id = MTL_ID();
ObMemstoreAllocator &host = MTL(ObSharedMemAllocMgr *)->memstore_allocator();
(void)host.init_handle(*this);
return ret;
}
int ObMemstoreAllocator::init()
{
throttle_tool_ = &(MTL(ObSharedMemAllocMgr *)->share_resource_throttle_tool());
return arena_.init();
}
void ObMemstoreAllocator::init_handle(AllocHandle& handle)
{
handle.do_reset();
handle.set_host(this);
{
int64_t nway = nway_per_group();
LockGuard guard(lock_);
hlist_.init_handle(handle);
arena_.update_nway_per_group(nway);
}
COMMON_LOG(TRACE, "MTALLOC.init", KP(&handle.mt_));
}
void ObMemstoreAllocator::destroy_handle(AllocHandle& handle)
{
ObTimeGuard time_guard("ObMemstoreAllocator::destroy_handle", 100 * 1000);
COMMON_LOG(TRACE, "MTALLOC.destroy", KP(&handle.mt_));
arena_.free(handle.arena_handle_);
time_guard.click();
{
LockGuard guard(lock_);
time_guard.click();
hlist_.destroy_handle(handle);
time_guard.click();
if (hlist_.is_empty()) {
arena_.reset();
}
time_guard.click();
}
handle.do_reset();
}
void* ObMemstoreAllocator::alloc(AllocHandle& handle, int64_t size, const int64_t expire_ts)
{
int ret = OB_SUCCESS;
int64_t align_size = upper_align(size, sizeof(int64_t));
uint64_t tenant_id = arena_.get_tenant_id();
bool is_out_of_mem = false;
if (!handle.is_id_valid()) {
COMMON_LOG(TRACE, "MTALLOC.first_alloc", KP(&handle.mt_));
LockGuard guard(lock_);
if (handle.is_frozen()) {
COMMON_LOG(ERROR, "cannot alloc because allocator is frozen", K(ret), K(handle.mt_));
} else if (!handle.is_id_valid()) {
handle.set_clock(arena_.retired());
hlist_.set_active(handle);
}
}
storage::ObTenantFreezer *freezer = nullptr;
if (is_virtual_tenant_id(tenant_id)) {
// virtual tenant should not have memstore.
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(ERROR, "virtual tenant should not have memstore", K(ret), K(tenant_id));
} else if (FALSE_IT(freezer = MTL(storage::ObTenantFreezer *))) {
} else if (OB_FAIL(freezer->check_memstore_full_internal(is_out_of_mem))) {
COMMON_LOG(ERROR, "fail to check tenant out of mem limit", K(ret), K(tenant_id));
}
void *res = nullptr;
if (OB_FAIL(ret) || is_out_of_mem) {
if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) {
STORAGE_LOG(WARN, "this tenant is already out of memstore limit or some thing wrong.", K(tenant_id));
}
res = nullptr;
} else {
bool is_throttled = false;
(void)throttle_tool_->alloc_resource<ObMemstoreAllocator>(align_size, expire_ts, is_throttled);
if (is_throttled) {
share::memstore_throttled_alloc() += align_size;
}
res = arena_.alloc(handle.id_, handle.arena_handle_, align_size);
}
return res;
}
void ObMemstoreAllocator::set_frozen(AllocHandle& handle)
{
COMMON_LOG(TRACE, "MTALLOC.set_frozen", KP(&handle.mt_));
LockGuard guard(lock_);
hlist_.set_frozen(handle);
}
static int64_t calc_nway(int64_t cpu, int64_t mem)
{
return std::min(cpu, mem/20/ObFifoArena::PAGE_SIZE);
}
int64_t ObMemstoreAllocator::nway_per_group()
{
int ret = OB_SUCCESS;
uint64_t tenant_id = arena_.get_tenant_id();
double min_cpu = 0;
double max_cpu = 0;
int64_t max_memory = 0;
int64_t min_memory = 0;
omt::ObMultiTenant *omt = GCTX.omt_;
MTL_SWITCH(tenant_id) {
storage::ObTenantFreezer *freezer = nullptr;
if (NULL == omt) {
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(WARN, "omt should not be null", K(tenant_id), K(ret));
} else if (OB_FAIL(omt->get_tenant_cpu(tenant_id, min_cpu, max_cpu))) {
COMMON_LOG(WARN, "get tenant cpu failed", K(tenant_id), K(ret));
} else if (FALSE_IT(freezer = MTL(storage::ObTenantFreezer *))) {
} else if (OB_FAIL(freezer->get_tenant_mem_limit(min_memory, max_memory))) {
COMMON_LOG(WARN, "get tenant mem limit failed", K(tenant_id), K(ret));
}
}
return OB_SUCCESS == ret? calc_nway((int64_t)max_cpu, min_memory): 0;
}
int ObMemstoreAllocator::set_memstore_threshold()
{
LockGuard guard(lock_);
int ret = set_memstore_threshold_without_lock();
return ret;
}
int ObMemstoreAllocator::set_memstore_threshold_without_lock()
{
int ret = OB_SUCCESS;
int64_t memstore_threshold = INT64_MAX;
storage::ObTenantFreezer *freezer = nullptr;
if (FALSE_IT(freezer = MTL(storage::ObTenantFreezer *))) {
} else if (OB_FAIL(freezer->get_tenant_memstore_limit(memstore_threshold))) {
COMMON_LOG(WARN, "failed to get_tenant_memstore_limit", K(ret));
} else {
throttle_tool_->set_resource_limit<ObMemstoreAllocator>(memstore_threshold);
}
return ret;
}
int64_t ObMemstoreAllocator::resource_unit_size()
{
static const int64_t MEMSTORE_RESOURCE_UNIT_SIZE = 2LL * 1024LL * 1024LL; /* 2MB */
return MEMSTORE_RESOURCE_UNIT_SIZE;
}
void ObMemstoreAllocator::init_throttle_config(int64_t &resource_limit,
int64_t &trigger_percentage,
int64_t &max_duration)
{
// define some default value
const int64_t MEMSTORE_LIMIT_PERCENTAGE = 50;
const int64_t MEMSTORE_THROTTLE_TRIGGER_PERCENTAGE = 60;
const int64_t MEMSTORE_THROTTLE_MAX_DURATION = 2LL * 60LL * 60LL * 1000LL * 1000LL; // 2 hours
int64_t total_memory = lib::get_tenant_memory_limit(MTL_ID());
// Use tenant config to init throttle config
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (tenant_config.is_valid()) {
resource_limit = total_memory * tenant_config->memstore_limit_percentage / 100LL;
trigger_percentage = tenant_config->writing_throttling_trigger_percentage;
max_duration = tenant_config->writing_throttling_maximum_duration;
} else {
COMMON_LOG_RET(WARN, OB_INVALID_CONFIG, "init throttle config with default value");
resource_limit = total_memory * MEMSTORE_LIMIT_PERCENTAGE / 100;
trigger_percentage = MEMSTORE_THROTTLE_TRIGGER_PERCENTAGE;
max_duration = MEMSTORE_THROTTLE_MAX_DURATION;
}
}
void ObMemstoreAllocator::adaptive_update_limit(const int64_t holding_size,
const int64_t config_specify_resource_limit,
int64_t &resource_limit,
int64_t &last_update_limit_ts,
bool &is_updated)
{
// do nothing
}
}; // namespace share
}; // namespace oceanbase

View File

@ -15,6 +15,7 @@
#include "ob_handle_list.h"
#include "ob_fifo_arena.h"
#include "lib/lock/ob_spin_lock.h"
#include "share/throttle/ob_share_throttle_define.h"
namespace oceanbase
{
@ -22,8 +23,17 @@ namespace memtable
{
class ObMemtable;
};
namespace common
namespace share
{
// record the throttled alloc size of memstore in this thread
OB_INLINE int64_t &memstore_throttled_alloc()
{
RLOCAL_INLINE(int64_t, throttled_alloc);
return throttled_alloc;
}
struct FrozenMemstoreInfoLogger
{
FrozenMemstoreInfoLogger(char* buf, int64_t limit): buf_(buf), limit_(limit), pos_(0) {}
@ -44,12 +54,15 @@ struct ActiveMemstoreInfoLogger
int64_t pos_;
};
class ObGMemstoreAllocator
class ObMemstoreAllocator
{
public:
DEFINE_CUSTOM_FUNC_FOR_THROTTLE(Memstore);
typedef ObSpinLock Lock;
typedef ObSpinLockGuard LockGuard;
typedef ObGMemstoreAllocator GAlloc;
typedef ObMemstoreAllocator GAlloc;
typedef ObFifoArena Arena;
typedef ObHandleList HandleList;
typedef HandleList::Handle ListHandle;
@ -70,7 +83,7 @@ public:
host_ = NULL;
}
int64_t get_group_id() const { return id_ < 0? INT64_MAX: (id_ % Arena::MAX_CACHED_GROUP_COUNT); }
int init(uint64_t tenant_id);
int init();
void set_host(GAlloc* host) { host_ = host; }
void destroy() {
if (NULL != host_) {
@ -109,19 +122,18 @@ public:
};
public:
ObGMemstoreAllocator():
lock_(common::ObLatchIds::MEMSTORE_ALLOCATOR_LOCK),
hlist_(),
arena_() {}
~ObGMemstoreAllocator() {}
ObMemstoreAllocator()
: throttle_tool_(nullptr), lock_(common::ObLatchIds::MEMSTORE_ALLOCATOR_LOCK), hlist_(), arena_() {}
~ObMemstoreAllocator() {}
public:
int init(uint64_t tenant_id)
{
return arena_.init(tenant_id);
}
void init_handle(AllocHandle& handle, uint64_t tenant_id);
int init();
int start() { return OB_SUCCESS; }
void stop() {}
void wait() {}
void destroy() {}
void init_handle(AllocHandle& handle);
void destroy_handle(AllocHandle& handle);
void* alloc(AllocHandle& handle, int64_t size);
void* alloc(AllocHandle& handle, int64_t size, const int64_t expire_ts = 0);
void set_frozen(AllocHandle& handle);
template<typename Func>
int for_each(Func& f, const bool reverse=false) {
@ -145,6 +157,7 @@ public:
int64_t get_max_cached_memstore_size() const {
return arena_.get_max_cached_memstore_size();
}
int64_t hold() const { return arena_.hold(); }
int64_t get_total_memstore_used() const { return arena_.hold(); }
int64_t get_frozen_memstore_pos() const {
int64_t hazard = hlist_.hazard();
@ -167,34 +180,21 @@ public:
(void)for_each(logger, true /* reverse */);
}
}
public:
int set_memstore_threshold(uint64_t tenant_id);
bool need_do_writing_throttle() const {return arena_.need_do_writing_throttle();}
bool check_clock_over_seq(const int64_t seq)
{
return arena_.check_clock_over_seq(seq);
}
int64_t get_clock()
{
return arena_.get_clock();
}
int64_t expected_wait_time(int64_t seq) const
{
return arena_.expected_wait_time(seq);
}
void skip_clock(const int64_t skip_size)
{
arena_.skip_clock(skip_size);
}
int set_memstore_threshold();
private:
int64_t nway_per_group();
int set_memstore_threshold_without_lock(uint64_t tenant_id);
int set_memstore_threshold_without_lock();
private:
share::TxShareThrottleTool *throttle_tool_;
Lock lock_;
HandleList hlist_;
Arena arena_;
};
}; // end namespace common
}; // end namespace oceanbase
}; // namespace share
}; // namespace oceanbase
#endif /* OCEANBASE_ALLOCATOR_OB_GMEMSTORE_ALLOCATOR_H_ */

View File

@ -1,131 +0,0 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SHARE
#include "share/allocator/ob_memstore_allocator_mgr.h"
#include "share/allocator/ob_gmemstore_allocator.h"
#include "lib/alloc/alloc_struct.h"
using namespace oceanbase::lib;
using namespace oceanbase::common;
int64_t ObMemstoreAllocatorMgr::get_all_tenants_memstore_used()
{
return ATOMIC_LOAD(&ObFifoArena::total_hold_);
}
ObMemstoreAllocatorMgr::ObMemstoreAllocatorMgr()
: is_inited_(false),
allocators_(),
allocator_map_(),
malloc_allocator_(NULL),
all_tenants_memstore_used_(0)
{
set_malloc_allocator(ObMallocAllocator::get_instance());
}
ObMemstoreAllocatorMgr::~ObMemstoreAllocatorMgr()
{}
int ObMemstoreAllocatorMgr::init()
{
int ret = OB_SUCCESS;
if (OB_FAIL(allocator_map_.create(ALLOCATOR_MAP_BUCKET_NUM, ObModIds::OB_MEMSTORE_ALLOCATOR))) {
LOG_WARN("failed to create allocator_map", K(ret));
} else {
is_inited_ = true;
}
return ret;
}
int ObMemstoreAllocatorMgr::get_tenant_memstore_allocator(const uint64_t tenant_id,
TAllocator *&out_allocator)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(tenant_id <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant id", K(tenant_id), K(ret));
} else if (tenant_id < PRESERVED_TENANT_COUNT) {
if (NULL == (out_allocator = ATOMIC_LOAD(&allocators_[tenant_id]))) {
ObMemAttr attr;
attr.tenant_id_ = OB_SERVER_TENANT_ID;
attr.label_ = ObModIds::OB_MEMSTORE_ALLOCATOR;
SET_USE_500(attr);
void *buf = ob_malloc(sizeof(TAllocator), attr);
if (NULL != buf) {
TAllocator *allocator = new (buf) TAllocator();
bool cas_succeed = false;
if (OB_SUCC(ret)) {
if (OB_FAIL(allocator->init(tenant_id))) {
LOG_WARN("failed to init tenant memstore allocator", K(tenant_id), K(ret));
} else {
LOG_INFO("succ to init tenant memstore allocator", K(tenant_id), K(ret));
cas_succeed = ATOMIC_BCAS(&allocators_[tenant_id], NULL, allocator);
}
}
if (OB_FAIL(ret) || !cas_succeed) {
allocator->~TAllocator();
ob_free(buf);
out_allocator = ATOMIC_LOAD(&allocators_[tenant_id]);
} else {
out_allocator = allocator;
}
} else {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(tenant_id), K(ret));
}
}
} else if (OB_FAIL(allocator_map_.get_refactored(tenant_id, out_allocator))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("failed to get tenant memstore allocator", K(tenant_id), K(ret));
} else {
ret = OB_SUCCESS;
ObMemAttr attr;
attr.tenant_id_ = OB_SERVER_TENANT_ID;
attr.label_ = ObModIds::OB_MEMSTORE_ALLOCATOR;
void *buf = ob_malloc(sizeof(TAllocator), attr);
if (NULL != buf) {
TAllocator *new_allocator = new (buf) TAllocator();
if (OB_FAIL(new_allocator->init(tenant_id))) {
LOG_WARN("failed to init tenant memstore allocator", K(tenant_id), K(ret));
} else if (OB_FAIL(allocator_map_.set_refactored(tenant_id, new_allocator))) {
if (OB_HASH_EXIST == ret) {
if (OB_FAIL(allocator_map_.get_refactored(tenant_id, out_allocator))) {
LOG_WARN("failed to get refactor", K(tenant_id), K(ret));
}
} else {
LOG_WARN("failed to set refactor", K(tenant_id), K(ret));
}
new_allocator->~TAllocator();
ob_free(buf);
} else {
out_allocator = new_allocator;
}
} else {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(tenant_id), K(ret));
}
}
} else if (OB_ISNULL(out_allocator)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("got allocator is NULL", K(tenant_id), K(ret));
}
return ret;
}
ObMemstoreAllocatorMgr &ObMemstoreAllocatorMgr::get_instance()
{
static ObMemstoreAllocatorMgr instance_;
return instance_;
}

View File

@ -1,57 +0,0 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef _OB_SHARE_MEMSTORE_ALLOCATOR_MGR_H_
#define _OB_SHARE_MEMSTORE_ALLOCATOR_MGR_H_
#include "lib/allocator/ob_allocator.h"
#include "lib/alloc/alloc_func.h"
#include "lib/hash/ob_hashmap.h"
namespace oceanbase
{
namespace lib
{
class ObMallocAllocator;
}
namespace common
{
class ObGMemstoreAllocator;
class ObMemstoreAllocatorMgr
{
public:
typedef ObGMemstoreAllocator TAllocator;
typedef common::hash::ObHashMap<uint64_t, TAllocator *> TenantMemostoreAllocatorMap;
ObMemstoreAllocatorMgr();
virtual ~ObMemstoreAllocatorMgr();
int init();
int get_tenant_memstore_allocator(uint64_t tenant_id, TAllocator *&out_allocator);
int64_t get_all_tenants_memstore_used();
static ObMemstoreAllocatorMgr &get_instance();
public:
void set_malloc_allocator(lib::ObMallocAllocator *malloc_allocator) { malloc_allocator_ = malloc_allocator; }
private:
static const uint64_t PRESERVED_TENANT_COUNT = 10000;
static const uint64_t ALLOCATOR_MAP_BUCKET_NUM = 64;
bool is_inited_;
TAllocator *allocators_[PRESERVED_TENANT_COUNT];
TenantMemostoreAllocatorMap allocator_map_;
lib::ObMallocAllocator *malloc_allocator_;
int64_t all_tenants_memstore_used_;
private:
DISALLOW_COPY_AND_ASSIGN(ObMemstoreAllocatorMgr);
}; // end of class ObMemstoreAllocatorMgr
} // end of namespace share
} // end of namespace oceanbase
#endif /* _OB_SHARE_MEMSTORE_ALLOCATOR_MGR_H_ */

View File

@ -0,0 +1,91 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SHARE
#include "ob_shared_memory_allocator_mgr.h"
namespace oceanbase {
namespace share {
#define THROTTLE_CONFIG_LOG(ALLOCATOR, LIMIT, TRIGGER_PERCENTAGE, MAX_DURATION) \
"Unit Name", \
ALLOCATOR::throttle_unit_name(), \
"Memory Limit(MB)", \
LIMIT / 1024 / 1024, \
"Throttle Trigger(MB)", \
LIMIT * trigger_percentage / 100 / 1024 / 1024, \
"Trigger Percentage", \
TRIGGER_PERCENTAGE, \
"Max Alloc Duration", \
MAX_DURATION
void ObSharedMemAllocMgr::update_throttle_config()
{
int64_t tenant_id = MTL_ID();
int64_t total_memory = lib::get_tenant_memory_limit(tenant_id);
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (tenant_config.is_valid()) {
int64_t share_mem_limit_percentage = tenant_config->_tx_share_memory_limit_percentage;
int64_t memstore_limit_percentage = tenant_config->memstore_limit_percentage;
int64_t tx_data_limit_percentage = tenant_config->_tx_data_memory_limit_percentage;
int64_t mds_limit_percentage = tenant_config->_mds_memory_limit_percentage;
int64_t trigger_percentage = tenant_config->writing_throttling_trigger_percentage;
int64_t max_duration = tenant_config->writing_throttling_maximum_duration;
if (0 == share_mem_limit_percentage) {
// 0 means use (memstore_limit + 10)
share_mem_limit_percentage = memstore_limit_percentage + 10;
}
int64_t share_mem_limit = total_memory * share_mem_limit_percentage / 100LL;
int64_t memstore_limit = total_memory * memstore_limit_percentage / 100LL;
int64_t tx_data_limit = total_memory * tx_data_limit_percentage / 100LL;
int64_t mds_limit = total_memory * mds_limit_percentage / 100LL;
(void)share_resource_throttle_tool_.update_throttle_config<FakeAllocatorForTxShare>(
share_mem_limit, trigger_percentage, max_duration);
(void)share_resource_throttle_tool_.update_throttle_config<ObMemstoreAllocator>(
memstore_limit, trigger_percentage, max_duration);
(void)share_resource_throttle_tool_.update_throttle_config<ObTenantTxDataAllocator>(
tx_data_limit, trigger_percentage, max_duration);
(void)share_resource_throttle_tool_.update_throttle_config<ObTenantMdsAllocator>(
mds_limit, trigger_percentage, max_duration);
SHARE_LOG(INFO,
"[Throttle] Update Config",
K(share_mem_limit_percentage),
K(memstore_limit_percentage),
K(tx_data_limit_percentage),
K(mds_limit_percentage),
K(trigger_percentage),
K(max_duration));
SHARE_LOG(INFO,
"[Throttle] Update Config",
THROTTLE_CONFIG_LOG(FakeAllocatorForTxShare, share_mem_limit, trigger_percentage, max_duration));
SHARE_LOG(INFO,
"[Throttle] Update Config",
THROTTLE_CONFIG_LOG(ObMemstoreAllocator, memstore_limit, trigger_percentage, max_duration));
SHARE_LOG(INFO,
"[Throttle] Update Config",
THROTTLE_CONFIG_LOG(ObTenantTxDataAllocator, tx_data_limit, trigger_percentage, max_duration));
SHARE_LOG(INFO,
"[Throttle] Update Config",
THROTTLE_CONFIG_LOG(ObTenantMdsAllocator, mds_limit, trigger_percentage, max_duration));
} else {
SHARE_LOG_RET(WARN, OB_INVALID_CONFIG, "invalid tenant config", K(tenant_id), K(total_memory));
}
}
#undef UPDATE_BY_CONFIG_NAME
} // namespace share
} // namespace oceanbase

View File

@ -0,0 +1,83 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_ALLOCATOR_OB_SHARED_MEMORY_ALLOCATOR_MGR_H_
#define OCEANBASE_ALLOCATOR_OB_SHARED_MEMORY_ALLOCATOR_MGR_H_
#include "share/allocator/ob_memstore_allocator.h"
#include "share/allocator/ob_tx_data_allocator.h"
#include "share/allocator/ob_mds_allocator.h"
#include "share/throttle/ob_share_resource_throttle_tool.h"
namespace oceanbase {
namespace share {
class ObSharedMemAllocMgr {
public:
ObSharedMemAllocMgr()
: share_resource_throttle_tool_(),
memstore_allocator_(),
tx_data_allocator_(),
mds_allocator_() {}
ObSharedMemAllocMgr(ObSharedMemAllocMgr &rhs) = delete;
ObSharedMemAllocMgr &operator=(ObSharedMemAllocMgr &rhs) = delete;
~ObSharedMemAllocMgr() {}
static int mtl_init(ObSharedMemAllocMgr *&shared_mem_alloc_mgr) { return shared_mem_alloc_mgr->init(); }
int init()
{
int ret = OB_SUCCESS;
if (OB_FAIL(tx_data_allocator_.init("TX_DATA_SLICE"))) {
SHARE_LOG(ERROR, "init tx data allocator failed", KR(ret));
} else if (OB_FAIL(memstore_allocator_.init())) {
SHARE_LOG(ERROR, "init memstore allocator failed", KR(ret));
} else if (OB_FAIL(mds_allocator_.init())) {
SHARE_LOG(ERROR, "init mds allocator failed", KR(ret));
} else if (OB_FAIL(
share_resource_throttle_tool_.init(&memstore_allocator_, &tx_data_allocator_, &mds_allocator_))) {
SHARE_LOG(ERROR, "init share resource throttle tool failed", KR(ret));
} else {
share_resource_throttle_tool_.enable_adaptive_limit<FakeAllocatorForTxShare>();
SHARE_LOG(INFO, "finish init mtl share mem allocator mgr", K(MTL_ID()), KP(this));
}
return ret;
}
int start() { return OB_SUCCESS; }
void stop() {}
void wait() {}
void destroy() {}
void update_throttle_config();
ObMemstoreAllocator &memstore_allocator() { return memstore_allocator_; }
ObTenantTxDataAllocator &tx_data_allocator() { return tx_data_allocator_; }
ObTenantMdsAllocator &mds_allocator() { return mds_allocator_; }
TxShareThrottleTool &share_resource_throttle_tool() { return share_resource_throttle_tool_; }
private:
void update_share_throttle_config_(const int64_t total_memory, omt::ObTenantConfigGuard &config);
void update_memstore_throttle_config_(const int64_t total_memory, omt::ObTenantConfigGuard &config);
void update_tx_data_throttle_config_(const int64_t total_memory, omt::ObTenantConfigGuard &config);
void update_mds_throttle_config_(const int64_t total_memory, omt::ObTenantConfigGuard &config);
private:
TxShareThrottleTool share_resource_throttle_tool_;
ObMemstoreAllocator memstore_allocator_;
ObTenantTxDataAllocator tx_data_allocator_;
ObTenantMdsAllocator mds_allocator_;
};
} // namespace share
} // namespace oceanbase
#endif

View File

@ -10,13 +10,15 @@
* See the Mulan PubL v2 for more details.
*/
#include "share/allocator/ob_tenant_mutil_allocator_mgr.h"
#include "lib/allocator/ob_malloc.h"
#include "share/config/ob_server_config.h"
#include "share/allocator/ob_shared_memory_allocator_mgr.h"
#include "share/allocator/ob_tenant_mutil_allocator.h"
#include "ob_gmemstore_allocator.h"
#include "ob_memstore_allocator_mgr.h"
#include "observer/omt/ob_tenant_config_mgr.h"
#include "share/allocator/ob_tenant_mutil_allocator_mgr.h"
#include "share/config/ob_server_config.h"
#include "share/rc/ob_tenant_base.h"
#include "ob_memstore_allocator.h"
using namespace oceanbase::share;
namespace oceanbase
{
@ -405,15 +407,14 @@ int ObTenantMutilAllocatorMgr::update_tenant_mem_limit(const share::TenantUnits
K(tenant_id), K(nway), K(new_tma_limit), K(pre_tma_limit), K(cur_memstore_limit_percent), K(tenant_config));
}
//update memstore threshold of GmemstoreAllocator
ObGMemstoreAllocator* memstore_allocator = NULL;
if (OB_TMP_FAIL(ObMemstoreAllocatorMgr::get_instance().get_tenant_memstore_allocator(tenant_id, memstore_allocator))) {
} else if (OB_ISNULL(memstore_allocator)) {
OB_LOG(WARN, "get_tenant_memstore_allocator failed", K(tenant_id));
} else if (OB_FAIL(memstore_allocator->set_memstore_threshold(tenant_id))) {
OB_LOG(WARN, "failed to set_memstore_threshold of memstore allocator", K(tenant_id), K(ret));
} else {
OB_LOG(INFO, "succ to set_memstore_threshold of memstore allocator", K(tenant_id), K(ret));
//update memstore threshold of MemstoreAllocator
MTL_SWITCH(tenant_id) {
ObMemstoreAllocator &memstore_allocator = MTL(ObSharedMemAllocMgr *)->memstore_allocator();
if (OB_FAIL(memstore_allocator.set_memstore_threshold())) {
OB_LOG(WARN, "failed to set_memstore_threshold of memstore allocator", K(tenant_id), K(ret));
} else {
OB_LOG(INFO, "succ to set_memstore_threshold of memstore allocator", K(tenant_id), K(ret));
}
}
}
}

View File

@ -0,0 +1,113 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SHARE
#include "ob_tx_data_allocator.h"
#include "share/allocator/ob_shared_memory_allocator_mgr.h"
#include "share/rc/ob_tenant_base.h"
#include "storage/tx/ob_tx_data_define.h"
#include "storage/tx_storage/ob_tenant_freezer.h"
namespace oceanbase {
namespace share {
int64_t ObTenantTxDataAllocator::resource_unit_size()
{
static const int64_t TX_DATA_RESOURCE_UNIT_SIZE = OB_MALLOC_NORMAL_BLOCK_SIZE; /* 8KB */
return TX_DATA_RESOURCE_UNIT_SIZE;
}
void ObTenantTxDataAllocator::init_throttle_config(int64_t &resource_limit,
int64_t &trigger_percentage,
int64_t &max_duration)
{
int64_t total_memory = lib::get_tenant_memory_limit(MTL_ID());
// Use tenant config to init throttle config
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
if (tenant_config.is_valid()) {
resource_limit = total_memory * tenant_config->_tx_data_memory_limit_percentage / 100LL;
trigger_percentage = tenant_config->writing_throttling_trigger_percentage;
max_duration = tenant_config->writing_throttling_maximum_duration;
} else {
SHARE_LOG_RET(WARN, OB_INVALID_CONFIG, "init throttle config with default value");
resource_limit = total_memory * TX_DATA_LIMIT_PERCENTAGE / 100;
trigger_percentage = TX_DATA_THROTTLE_TRIGGER_PERCENTAGE;
max_duration = TX_DATA_THROTTLE_MAX_DURATION;
}
}
void ObTenantTxDataAllocator::adaptive_update_limit(const int64_t holding_size,
const int64_t config_specify_resource_limit,
int64_t &resource_limit,
int64_t &last_update_limit_ts,
bool &is_updated)
{
// do nothing
}
int ObTenantTxDataAllocator::init(const char *label)
{
int ret = OB_SUCCESS;
ObMemAttr mem_attr;
mem_attr.label_ = label;
mem_attr.tenant_id_ = MTL_ID();
mem_attr.ctx_id_ = ObCtxIds::TX_DATA_TABLE;
ObSharedMemAllocMgr *share_mem_alloc_mgr = MTL(ObSharedMemAllocMgr *);
throttle_tool_ = &(share_mem_alloc_mgr->share_resource_throttle_tool());
if (IS_INIT){
ret = OB_INIT_TWICE;
SHARE_LOG(WARN, "init tenant mds allocator twice", KR(ret), KPC(this));
} else if (OB_ISNULL(throttle_tool_)) {
ret = OB_ERR_UNEXPECTED;
SHARE_LOG(WARN, "throttle tool is unexpected null", KP(throttle_tool_), KP(share_mem_alloc_mgr));
} else if (OB_FAIL(slice_allocator_.init(
storage::TX_DATA_SLICE_SIZE, OB_MALLOC_NORMAL_BLOCK_SIZE, block_alloc_, mem_attr))) {
SHARE_LOG(WARN, "init slice allocator failed", KR(ret));
} else {
slice_allocator_.set_nway(ObTenantTxDataAllocator::ALLOC_TX_DATA_MAX_CONCURRENCY);
is_inited_ = true;
}
return ret;
}
void ObTenantTxDataAllocator::reset()
{
is_inited_ = false;
slice_allocator_.purge_extra_cached_block(0);
}
void *ObTenantTxDataAllocator::alloc(const bool enable_throttle, const int64_t abs_expire_time)
{
// do throttle if needed
if (OB_LIKELY(enable_throttle)) {
bool is_throttled = false;
(void)throttle_tool_->alloc_resource<ObTenantTxDataAllocator>(
storage::TX_DATA_SLICE_SIZE, abs_expire_time, is_throttled);
if (OB_UNLIKELY(is_throttled)) {
if (MTL(ObTenantFreezer *)->exist_ls_freezing()) {
(void)throttle_tool_->skip_throttle<ObTenantTxDataAllocator>(storage::TX_DATA_SLICE_SIZE);
} else {
(void)throttle_tool_->do_throttle<ObTenantTxDataAllocator>(abs_expire_time);
}
}
}
// allocate memory
void *res = slice_allocator_.alloc();
return res;
}
} // namespace share
} // namespace oceanbase

View File

@ -0,0 +1,63 @@
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_ALLOCATOR_OB_TX_DATA_ALLOCATOR_H_
#define OCEANBASE_ALLOCATOR_OB_TX_DATA_ALLOCATOR_H_
#include "lib/allocator/ob_slice_alloc.h"
#include "share/ob_delegate.h"
#include "share/throttle/ob_share_throttle_define.h"
namespace oceanbase {
namespace share {
class ObTenantTxDataAllocator {
public:
using SliceAllocator = ObSliceAlloc;
// define some default value
static const int64_t TX_DATA_LIMIT_PERCENTAGE = 20;
static const int64_t TX_DATA_THROTTLE_TRIGGER_PERCENTAGE = 60;
static const int64_t TX_DATA_THROTTLE_MAX_DURATION = 2LL * 60LL * 60LL * 1000LL * 1000LL; // 2 hours
static const int64_t ALLOC_TX_DATA_MAX_CONCURRENCY = 32;
static const uint32_t THROTTLE_TX_DATA_INTERVAL = 20 * 1000; // 20ms
// The tx data memtable will trigger a freeze if its memory use is more than 2%
static constexpr double TX_DATA_FREEZE_TRIGGER_PERCENTAGE = 2;
public:
DEFINE_CUSTOM_FUNC_FOR_THROTTLE(TxData);
public:
ObTenantTxDataAllocator()
: is_inited_(false), throttle_tool_(nullptr), block_alloc_(), slice_allocator_() {}
~ObTenantTxDataAllocator() { reset(); }
int init(const char* label);
void *alloc(const bool enable_throttle = true, const int64_t abs_expire_time = 0);
void reset();
int64_t hold() const { return block_alloc_.hold(); }
DELEGATE_WITH_RET(slice_allocator_, free, void);
TO_STRING_KV(K(is_inited_), KP(throttle_tool_), KP(&block_alloc_), KP(&slice_allocator_));
private:
bool is_inited_;
TxShareThrottleTool *throttle_tool_;
common::ObBlockAllocMgr block_alloc_;
SliceAllocator slice_allocator_;
};
} // namespace share
} // namespace oceanbase
#endif