BUGFIX: fix the usage of memstore

This commit is contained in:
obdev 2023-06-22 01:12:34 +00:00 committed by ob-robot
parent 45298490e2
commit b727235065
9 changed files with 206 additions and 61 deletions

View File

@ -159,6 +159,7 @@ public:
void free(Handle& ref);
int64_t allocated() const { return ATOMIC_LOAD(&allocated_); }
int64_t retired() const { return ATOMIC_LOAD(&retired_); }
int64_t reclaimed() const { return ATOMIC_LOAD(&reclaimed_); }
int64_t hold() const {
int64_t rsize = ATOMIC_LOAD(&reclaimed_);
int64_t asize = ATOMIC_LOAD(&allocated_);

View File

@ -63,9 +63,6 @@ int ObGMemstoreAllocator::AllocHandle::init(uint64_t tenant_id)
ret = OB_ERR_UNEXPECTED;
} else {
host->init_handle(*this, tenant_id);
if (0 == (last_freeze_timestamp_ = host->get_last_freeze_timestamp())) {
COMMON_LOG(ERROR, "unexpected value", K(last_freeze_timestamp_));
}
}
return ret;
}
@ -126,7 +123,7 @@ void* ObGMemstoreAllocator::alloc(AllocHandle& handle, int64_t size)
ret = OB_ERR_UNEXPECTED;
COMMON_LOG(ERROR, "virtual tenant should not have memstore", K(ret), K(tenant_id));
} else if (FALSE_IT(freezer = MTL(storage::ObTenantFreezer*))) {
} else if (OB_FAIL(freezer->check_tenant_out_of_memstore_limit(is_out_of_mem))) {
} else if (OB_FAIL(freezer->check_memstore_full_internal(is_out_of_mem))) {
COMMON_LOG(ERROR, "fail to check tenant out of mem limit", K(ret), K(tenant_id));
}
}

View File

@ -61,17 +61,15 @@ public:
memtable::ObMemtable& mt_;
GAlloc* host_;
ArenaHandle arena_handle_;
AllocHandle(memtable::ObMemtable& mt): mt_(mt), host_(NULL), last_freeze_timestamp_(0) {
AllocHandle(memtable::ObMemtable& mt): mt_(mt), host_(NULL) {
do_reset();
}
void do_reset() {
ListHandle::reset();
arena_handle_.reset();
host_ = NULL;
last_freeze_timestamp_ = 0;
}
int64_t get_group_id() const { return id_ < 0? INT64_MAX: (id_ % Arena::MAX_CACHED_GROUP_COUNT); }
int64_t get_last_freeze_timestamp() const { return last_freeze_timestamp_; }
int init(uint64_t tenant_id);
void set_host(GAlloc* host) { host_ = host; }
void destroy() {
@ -107,23 +105,18 @@ public:
host_->set_frozen(*this);
}
}
INHERIT_TO_STRING_KV("ListHandle", ListHandle, KP_(host), K_(arena_handle),
K_(last_freeze_timestamp));
private:
int64_t last_freeze_timestamp_;
INHERIT_TO_STRING_KV("ListHandle", ListHandle, KP_(host), K_(arena_handle));
};
public:
ObGMemstoreAllocator():
lock_(common::ObLatchIds::MEMSTORE_ALLOCATOR_LOCK),
hlist_(),
arena_(),
last_freeze_timestamp_(0) {}
arena_() {}
~ObGMemstoreAllocator() {}
public:
int init(uint64_t tenant_id)
{
update_last_freeze_timestamp();
return arena_.init(tenant_id);
}
void init_handle(AllocHandle& handle, uint64_t tenant_id);
@ -141,18 +134,25 @@ public:
return ret;
}
public:
int64_t get_mem_active_memstore_used() {
int64_t get_active_memstore_used() {
int64_t hazard = hlist_.hazard();
return hazard == INT64_MAX? 0: (arena_.allocated() - hazard);
}
int64_t get_frozen_memstore_pos() const {
int64_t get_freezable_active_memstore_used() {
int64_t hazard = hlist_.hazard();
return hazard == INT64_MAX? 0: hazard;
return hazard == INT64_MAX? 0: (arena_.retired() - hazard);
}
int64_t get_max_cached_memstore_size() const {
return arena_.get_max_cached_memstore_size();
}
int64_t get_mem_total_memstore_used() const { return arena_.hold(); }
int64_t get_total_memstore_used() const { return arena_.hold(); }
int64_t get_frozen_memstore_pos() const {
int64_t hazard = hlist_.hazard();
return hazard == INT64_MAX? 0: hazard;
}
int64_t get_memstore_reclaimed_pos() const { return arena_.reclaimed(); }
int64_t get_memstore_allocated_pos() const { return arena_.allocated(); }
int64_t get_retire_clock() const { return arena_.retired(); }
void log_frozen_memstore_info(char* buf, int64_t limit) {
if (NULL != buf && limit > 0) {
FrozenMemstoreInfoLogger logger(buf, limit);
@ -182,15 +182,6 @@ public:
{
return arena_.expected_wait_time(seq);
}
int64_t get_retire_clock() const { return arena_.retired(); }
bool exist_active_memtable_below_clock(const int64_t clock) const {
return hlist_.hazard() < clock;
}
int64_t get_last_freeze_timestamp() { return ATOMIC_LOAD(&last_freeze_timestamp_); }
void update_last_freeze_timestamp()
{
ATOMIC_STORE(&last_freeze_timestamp_, ObTimeUtility::current_time());
}
private:
int64_t nway_per_group();
int set_memstore_threshold_without_lock(uint64_t tenant_id);
@ -198,7 +189,6 @@ private:
Lock lock_;
HandleList hlist_;
Arena arena_;
int64_t last_freeze_timestamp_;
};
}; // end namespace common

View File

@ -360,7 +360,6 @@ public:
inline void set_resolve_active_memtable_left_boundary(bool flag) { ATOMIC_STORE(&resolve_active_memtable_left_boundary_, flag); }
inline bool get_resolve_active_memtable_left_boundary() { return ATOMIC_LOAD(&resolve_active_memtable_left_boundary_); }
void set_freeze_state(const int64_t state);
int get_merge_priority_info(ObMergePriorityInfo &merge_priority_info) const;
void set_minor_merged();
int64_t get_minor_merged_time() const { return minor_merged_time_; }
common::ObIAllocator &get_allocator() {return local_allocator_;}

View File

@ -91,7 +91,7 @@ int ObAccessService::check_tenant_out_of_memstore_limit_(bool &is_out_of_mem)
is_out_of_mem = false;
ObTenantFreezer *freezer = nullptr;
freezer = MTL(ObTenantFreezer *);
if (OB_FAIL(freezer->check_tenant_out_of_memstore_limit(is_out_of_mem))) {
if (OB_FAIL(freezer->check_memstore_full(is_out_of_mem))) {
LOG_WARN("check tenant out of memstore limit", K(ret));
} else {
// do nothing

View File

@ -986,6 +986,7 @@ int ObTenantFreezer::get_tenant_mem_usage_(ObTenantFreezeCtx &ctx)
int ret = OB_SUCCESS;
ObTenantMemstoreAllocator *tenant_allocator = NULL;
int64_t active_memstore_used = 0;
int64_t freezable_active_memstore_used = 0;
int64_t total_memstore_used = 0;
int64_t total_memstore_hold = 0;
int64_t max_cached_memstore_size = 0;
@ -998,13 +999,15 @@ int ObTenantFreezer::get_tenant_mem_usage_(ObTenantFreezeCtx &ctx)
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("[TenantFreezer] tenant memstore allocator is NULL", KR(ret), K(tenant_id));
} else {
active_memstore_used = tenant_allocator->get_mem_active_memstore_used();
total_memstore_used = tenant_allocator->get_mem_total_memstore_used();
active_memstore_used = tenant_allocator->get_active_memstore_used();
freezable_active_memstore_used = tenant_allocator->get_freezable_active_memstore_used();
total_memstore_used = tenant_allocator->get_total_memstore_used();
total_memstore_hold = get_tenant_memory_hold(tenant_id,
ObCtxIds::MEMSTORE_CTX_ID);
max_cached_memstore_size = tenant_allocator->get_max_cached_memstore_size();
}
ctx.active_memstore_used_ = active_memstore_used;
ctx.freezable_active_memstore_used_ = freezable_active_memstore_used;
ctx.total_memstore_used_ = total_memstore_used;
ctx.total_memstore_hold_ = total_memstore_hold;
ctx.max_cached_memstore_size_ = max_cached_memstore_size;
@ -1012,6 +1015,57 @@ int ObTenantFreezer::get_tenant_mem_usage_(ObTenantFreezeCtx &ctx)
return ret;
}
int ObTenantFreezer::get_tenant_mem_stat_(ObTenantStatistic &stat)
{
int ret = OB_SUCCESS;
ObTenantMemstoreAllocator *tenant_allocator = NULL;
int64_t active_memstore_used = 0;
int64_t total_memstore_used = 0;
int64_t total_memstore_hold = 0;
int64_t max_cached_memstore_size = 0;
int64_t memstore_allocated_pos = 0;
int64_t memstore_frozen_pos = 0;
int64_t memstore_reclaimed_pos = 0;
const uint64_t tenant_id = MTL_ID();
ObTenantFreezeCtx ctx;
tenant_info_.get_freeze_ctx(ctx);
if (OB_FAIL(get_freeze_trigger_(ctx))) {
LOG_WARN("[TenantFreezer] get tenant minor freeze trigger error", KR(ret), K(tenant_info_.tenant_id_));
} else if (OB_FAIL(allocator_mgr_->get_tenant_memstore_allocator(tenant_id,
tenant_allocator))) {
LOG_WARN("[TenantFreezer] failed to get_tenant_memstore_allocator", KR(ret), K(tenant_id));
} else if (NULL == tenant_allocator) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("[TenantFreezer] tenant memstore allocator is NULL", KR(ret), K(tenant_id));
} else {
active_memstore_used = tenant_allocator->get_active_memstore_used();
total_memstore_used = tenant_allocator->get_total_memstore_used();
total_memstore_hold = get_tenant_memory_hold(tenant_id,
ObCtxIds::MEMSTORE_CTX_ID);
max_cached_memstore_size = tenant_allocator->get_max_cached_memstore_size();
memstore_allocated_pos = tenant_allocator->get_memstore_allocated_pos();
memstore_frozen_pos = tenant_allocator->get_frozen_memstore_pos();
memstore_reclaimed_pos = tenant_allocator->get_memstore_reclaimed_pos();
}
stat.active_memstore_used_ = active_memstore_used;
stat.total_memstore_used_ = total_memstore_used;
stat.total_memstore_hold_ = total_memstore_hold;
stat.memstore_freeze_trigger_ = ctx.memstore_freeze_trigger_;
stat.memstore_limit_ = ctx.mem_memstore_limit_;
stat.tenant_memory_limit_ = get_tenant_memory_limit(tenant_id);
stat.tenant_memory_hold_ = get_tenant_memory_hold(tenant_id);
stat.kvcache_mem_ = ctx.kvcache_mem_;
stat.max_cached_memstore_size_ = max_cached_memstore_size;
stat.memstore_allocated_pos_ = memstore_allocated_pos;
stat.memstore_frozen_pos_ = memstore_frozen_pos;
stat.memstore_reclaimed_pos_ = memstore_reclaimed_pos;
return ret;
}
static inline bool is_add_overflow(int64_t first, int64_t second, int64_t &res)
{
if (first + second < 0) {
@ -1086,12 +1140,14 @@ int ObTenantFreezer::get_freeze_trigger_(ObTenantFreezeCtx &ctx)
return ret;
}
int ObTenantFreezer::check_tenant_out_of_memstore_limit(bool &is_out_of_mem)
int ObTenantFreezer::check_memstore_full_(bool &last_result,
int64_t &last_check_timestamp,
bool &is_out_of_mem,
const bool from_user)
{
int ret = OB_SUCCESS;
RLOCAL(int64_t, last_check_timestamp);
RLOCAL(bool, last_result);
int64_t current_time = ObClockGenerator::getClock();
const int64_t reserved_memstore = from_user ? REPLAY_RESERVE_MEMSTORE_BYTES : 0;
ObTenantFreezeCtx ctx;
if (!is_inited_) {
ret = OB_NOT_INIT;
@ -1110,7 +1166,7 @@ int ObTenantFreezer::check_tenant_out_of_memstore_limit(bool &is_out_of_mem)
} else if (OB_FAIL(get_tenant_mem_usage_(ctx))) {
LOG_WARN("[TenantFreezer] fail to get mem usage", KR(ret), K(tenant_info_.tenant_id_));
} else {
is_out_of_mem = (ctx.total_memstore_hold_ > ctx.mem_memstore_limit_ - REPLAY_RESERVE_MEMSTORE_BYTES);
is_out_of_mem = (ctx.total_memstore_hold_ > ctx.mem_memstore_limit_ - reserved_memstore);
}
last_check_timestamp = current_time;
}
@ -1122,6 +1178,34 @@ int ObTenantFreezer::check_tenant_out_of_memstore_limit(bool &is_out_of_mem)
return ret;
}
int ObTenantFreezer::check_memstore_full_internal(bool &is_out_of_mem)
{
int ret = OB_SUCCESS;
RLOCAL_INIT(int64_t, last_check_timestamp, 0);
RLOCAL_INIT(bool, last_result, false);
if (OB_FAIL(check_memstore_full_(last_result,
last_check_timestamp,
is_out_of_mem,
false /* does not from user */))) {
LOG_WARN("check memstore full failed", K(ret));
}
return ret;
}
int ObTenantFreezer::check_memstore_full(bool &is_out_of_mem)
{
int ret = OB_SUCCESS;
RLOCAL_INIT(int64_t, last_check_timestamp, 0);
RLOCAL_INIT(bool, last_result, false);
if (OB_FAIL(check_memstore_full_(last_result,
last_check_timestamp,
is_out_of_mem,
true /* from user */))) {
LOG_WARN("check memstore full failed", K(ret));
}
return ret;
}
bool ObTenantFreezer::tenant_need_major_freeze()
{
int ret = OB_SUCCESS;
@ -1143,7 +1227,7 @@ bool ObTenantFreezer::tenant_need_major_freeze()
if (bool_ret) {
LOG_INFO("A major freeze is needed",
"active_memstore_used_",
ctx.active_memstore_used_,
ctx.freezable_active_memstore_used_,
"memstore_freeze_trigger_limit_",
ctx.memstore_freeze_trigger_,
"tenant_id",
@ -1266,21 +1350,19 @@ int ObTenantFreezer::print_tenant_usage(
int64_t &pos)
{
int ret = OB_SUCCESS;
ObTenantFreezeCtx ctx;
ObTenantStatistic stat;
lib::ObMallocAllocator *mallocator = lib::ObMallocAllocator::get_instance();
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("[TenantFreezer] tenant manager not init", KR(ret));
} else if (FALSE_IT(tenant_info_.get_freeze_ctx(ctx))) {
} else if (OB_FAIL(get_tenant_mem_usage_(ctx))) {
LOG_WARN("[TenantFreezer] fail to get mem usage", KR(ret), K(tenant_info_.tenant_id_));
} else if (OB_FAIL(get_freeze_trigger_(ctx))) {
LOG_WARN("[TenantFreezer] get tenant minor freeze trigger error", KR(ret), K(tenant_info_.tenant_id_));
} else if (OB_FAIL(get_tenant_mem_stat_(stat))) {
LOG_WARN("[TenantFreezer] fail to get tenant mem stat", KR(ret), K(tenant_info_.tenant_id_));
} else {
ret = databuff_printf(print_buf, buf_len, pos,
"[TENANT_MEMORY] "
"tenant_id=% '9ld "
"now=% '15ld "
"active_memstore_used=% '15ld "
"total_memstore_used=% '15ld "
"total_memstore_hold=% '15ld "
@ -1289,17 +1371,24 @@ int ObTenantFreezer::print_tenant_usage(
"mem_tenant_limit=% '15ld "
"mem_tenant_hold=% '15ld "
"kv_cache_mem=% '15ld "
"max_mem_memstore_can_get_now=% '15ld\n",
"max_mem_memstore_can_get_now=% '15ld "
"memstore_alloc_pos=% '15ld "
"memstore_frozen_pos=% '15ld "
"memstore_reclaimed_pos=% '15ld\n",
tenant_info_.tenant_id_,
ctx.active_memstore_used_,
ctx.total_memstore_used_,
ctx.total_memstore_hold_,
ctx.memstore_freeze_trigger_,
ctx.mem_memstore_limit_,
get_tenant_memory_limit(tenant_info_.tenant_id_),
get_tenant_memory_hold(tenant_info_.tenant_id_),
ctx.kvcache_mem_,
ctx.max_mem_memstore_can_get_now_);
ObTimeUtility::fast_current_time(),
stat.active_memstore_used_,
stat.total_memstore_used_,
stat.total_memstore_hold_,
stat.memstore_freeze_trigger_,
stat.memstore_limit_,
stat.tenant_memory_limit_,
stat.tenant_memory_hold_,
stat.kvcache_mem_,
stat.memstore_can_get_now_,
stat.memstore_allocated_pos_,
stat.memstore_frozen_pos_,
stat.memstore_reclaimed_pos_);
}
if (!OB_ISNULL(mallocator)) {
@ -1329,10 +1418,10 @@ bool ObTenantFreezer::need_freeze_(const ObTenantFreezeCtx &ctx)
{
bool need_freeze = false;
// 1. trigger by active memstore used.
if (ctx.active_memstore_used_ > ctx.memstore_freeze_trigger_ + ctx.max_cached_memstore_size_) {
if (ctx.freezable_active_memstore_used_ > ctx.memstore_freeze_trigger_) {
need_freeze = true;
LOG_INFO("[TenantFreezer] A minor freeze is needed by active memstore used.",
K(ctx.active_memstore_used_), K(ctx.memstore_freeze_trigger_), K(ctx.max_cached_memstore_size_));
K(ctx.freezable_active_memstore_used_), K(ctx.memstore_freeze_trigger_), K(ctx.max_cached_memstore_size_));
}
return need_freeze;
}
@ -1372,7 +1461,7 @@ int ObTenantFreezer::do_minor_freeze_(const ObTenantFreezeCtx &ctx)
int tmp_ret = OB_SUCCESS;
bool rollback_freeze_cnt = false;
LOG_INFO("[TenantFreezer] A minor freeze is needed",
"active_memstore_used_", ctx.active_memstore_used_,
"active_memstore_used_", ctx.freezable_active_memstore_used_,
"memstore_freeze_trigger", ctx.memstore_freeze_trigger_,
"max_cached_memstore_size", ctx.max_cached_memstore_size_,
"mem_tenant_remain", get_tenant_memory_remain(MTL_ID()),
@ -1460,10 +1549,10 @@ void ObTenantFreezer::log_frozen_memstore_info_if_need_(const ObTenantFreezeCtx
int ret = OB_SUCCESS;
ObTenantMemstoreAllocator *tenant_allocator = NULL;
if (ctx.total_memstore_hold_ > ctx.memstore_freeze_trigger_ ||
ctx.active_memstore_used_ > ctx.memstore_freeze_trigger_) {
ctx.freezable_active_memstore_used_ > ctx.memstore_freeze_trigger_) {
// There is an unreleased memstable
LOG_INFO("[TenantFreezer] tenant have inactive memstores",
K(ctx.active_memstore_used_),
K(ctx.freezable_active_memstore_used_),
K(ctx.total_memstore_used_),
K(ctx.total_memstore_hold_),
"memstore_freeze_trigger_limit_",

View File

@ -120,8 +120,10 @@ public:
const bool force_refresh = true);
// get the tenant memstore limit.
int get_tenant_memstore_limit(int64_t &mem_limit);
// this is used to check if the tenant's memstore is out.
int check_tenant_out_of_memstore_limit(bool &is_out_of_mem);
// this is used to check if the tenant's memstore is out at user side.
int check_memstore_full(bool &is_out_of_mem);
// this is used for internal check rather than user side.
int check_memstore_full_internal(bool &is_out_of_mem);
// this check if a major freeze is needed
bool tenant_need_major_freeze();
// used to print a log.
@ -145,6 +147,10 @@ public:
ObServerConfig *get_config() { return config_; }
bool exist_ls_freezing();
private:
int check_memstore_full_(bool &last_result,
int64_t &last_check_timestamp,
bool &is_out_of_mem,
const bool from_user = true);
static int ls_freeze_(ObLS *ls,
const bool is_sync = true,
const bool force_freeze = true,
@ -173,6 +179,7 @@ private:
int post_tx_data_freeze_request_();
int post_mds_table_freeze_request_();
int get_tenant_mem_usage_(ObTenantFreezeCtx &ctx);
int get_tenant_mem_stat_(ObTenantStatistic &stat);
static int get_freeze_trigger_(ObTenantFreezeCtx &ctx);
static bool need_freeze_(const ObTenantFreezeCtx &ctx);
bool is_minor_need_slow_(const ObTenantFreezeCtx &ctx);

View File

@ -46,6 +46,7 @@ ObTenantFreezeCtx::ObTenantFreezeCtx()
max_mem_memstore_can_get_now_(0),
kvcache_mem_(0),
active_memstore_used_(0),
freezable_active_memstore_used_(0),
total_memstore_used_(0),
total_memstore_hold_(0),
max_cached_memstore_size_(0)
@ -61,11 +62,45 @@ void ObTenantFreezeCtx::reset()
max_mem_memstore_can_get_now_ = 0;
kvcache_mem_ = 0;
active_memstore_used_ = 0;
freezable_active_memstore_used_ = 0;
total_memstore_used_ = 0;
total_memstore_hold_ = 0;
max_cached_memstore_size_ = 0;
}
ObTenantStatistic::ObTenantStatistic()
: active_memstore_used_(0),
total_memstore_used_(0),
total_memstore_hold_(0),
memstore_freeze_trigger_(0),
memstore_limit_(0),
tenant_memory_limit_(0),
tenant_memory_hold_(0),
kvcache_mem_(0),
memstore_can_get_now_(0),
max_cached_memstore_size_(0),
memstore_allocated_pos_(0),
memstore_frozen_pos_(0),
memstore_reclaimed_pos_(0)
{}
void ObTenantStatistic::reset()
{
active_memstore_used_ = 0;
total_memstore_used_ = 0;
total_memstore_hold_ = 0;
memstore_freeze_trigger_ = 0;
memstore_limit_ = 0;
tenant_memory_limit_ = 0;
tenant_memory_hold_ = 0;
kvcache_mem_ = 0;
memstore_can_get_now_ = 0;
max_cached_memstore_size_ = 0;
memstore_allocated_pos_ = 0;
memstore_frozen_pos_ = 0;
memstore_reclaimed_pos_ = 0;
}
ObTenantInfo::ObTenantInfo()
: tenant_id_(INT64_MAX),
is_loaded_(false),

View File

@ -73,14 +73,41 @@ public:
int64_t kvcache_mem_;
int64_t active_memstore_used_;
int64_t freezable_active_memstore_used_;
int64_t total_memstore_used_;
int64_t total_memstore_hold_;
int64_t max_cached_memstore_size_;
private:
DISABLE_COPY_ASSIGN(ObTenantFreezeCtx);
};
struct ObTenantStatistic
{
public:
ObTenantStatistic();
~ObTenantStatistic() { reset(); }
void reset();
public:
int64_t active_memstore_used_;
int64_t total_memstore_used_;
int64_t total_memstore_hold_;
int64_t memstore_freeze_trigger_;
int64_t memstore_limit_;
int64_t tenant_memory_limit_;
int64_t tenant_memory_hold_;
int64_t kvcache_mem_;
int64_t memstore_can_get_now_;
int64_t max_cached_memstore_size_;
// these used to analysis write/frozen/release speed of tenant memstore
int64_t memstore_allocated_pos_;
int64_t memstore_frozen_pos_;
int64_t memstore_reclaimed_pos_;
private:
DISABLE_COPY_ASSIGN(ObTenantStatistic);
};
// store the tenant info, such as memory limit, memstore limit,
// slow freeze flag, freezing flag and so on.
class ObTenantInfo : public ObDLinkBase<ObTenantInfo>