supporting sub_ctxs for each tenant_ctx_allocator, which can optimize the problem of memory fragmentation.

This commit is contained in:
tushicheng 2023-09-04 02:40:49 +00:00 committed by ob-robot
parent 5ae3de7429
commit a91a0535f3
11 changed files with 193 additions and 53 deletions

View File

@ -22,6 +22,7 @@
#include "lib/utility/ob_macro_utils.h"
#include "lib/alloc/alloc_assist.h"
#include "lib/alloc/abit_set.h"
#include "lib/allocator/ob_mod_define.h"
#ifndef NDEBUG
#define MEMCHK_LEVEL 1
@ -65,6 +66,7 @@ enum ObAllocPrio
OB_HIGH_ALLOC
};
struct ObLabel
{
ObLabel()
@ -123,6 +125,7 @@ struct ObMemAttr
uint64_t tenant_id_;
ObLabel label_;
uint64_t ctx_id_;
uint64_t sub_ctx_id_;
ObAllocPrio prio_;
explicit ObMemAttr(
@ -133,6 +136,7 @@ struct ObMemAttr
: tenant_id_(tenant_id),
label_(label),
ctx_id_(ctx_id),
sub_ctx_id_(ObSubCtxIds::MAX_SUB_CTX_ID),
prio_(prio) {}
int64_t to_string(char* buf, const int64_t buf_len) const;
bool use_500() const { return use_500_; }

View File

@ -31,7 +31,14 @@ void *ObTenantCtxAllocator::alloc(const int64_t size, const ObMemAttr &attr)
{
abort_unless(attr.tenant_id_ == tenant_id_);
abort_unless(attr.ctx_id_ == ctx_id_);
void *ptr = common_alloc(size, attr, *this, obj_mgr_);
void *ptr = NULL;
if (OB_LIKELY(ObSubCtxIds::MAX_SUB_CTX_ID == attr.sub_ctx_id_)) {
ptr = common_alloc(size, attr, *this, obj_mgr_);
} else if (OB_UNLIKELY(attr.sub_ctx_id_ < ObSubCtxIds::MAX_SUB_CTX_ID)) {
ptr = common_alloc(size, attr, *this, obj_mgrs_[attr.sub_ctx_id_]);
} else {
LIB_LOG_RET(WARN, OB_ERR_UNEXPECTED, "allocate memory with unexpected sub_ctx_id");
}
return ptr;
}

View File

@ -41,7 +41,9 @@ using InvokeFunc = std::function<int (const ObTenantMemoryMgr*)>;
public:
explicit ObTenantCtxAllocator(uint64_t tenant_id, uint64_t ctx_id = 0)
: resource_handle_(), ref_cnt_(0), tenant_id_(tenant_id),
ctx_id_(ctx_id), deleted_(false), obj_mgr_(*this, tenant_id_, ctx_id_),
ctx_id_(ctx_id), deleted_(false),
obj_mgr_(*this, tenant_id_, ctx_id_, INTACT_NORMAL_AOBJECT_SIZE,
common::ObCtxParallel::instance().parallel_of_ctx(ctx_id_), NULL),
idle_size_(0), head_chunk_(), chunk_cnt_(0),
chunk_freelist_mutex_(common::ObLatchIds::CHUNK_FREE_LIST_LOCK),
using_list_mutex_(common::ObLatchIds::CHUNK_USING_LIST_LOCK),
@ -55,8 +57,16 @@ public:
attr.ctx_id_ = ctx_id;
chunk_freelist_mutex_.enable_record_stat(false);
using_list_mutex_.enable_record_stat(false);
for (int i = 0; i < ObSubCtxIds::MAX_SUB_CTX_ID; ++i) {
new (obj_mgrs_ + i) ObjectMgr(*this, tenant_id_, ctx_id_, INTACT_MIDDLE_AOBJECT_SIZE, 4, &obj_mgr_);
}
}
virtual ~ObTenantCtxAllocator()
{
for (int i = 0; i < ObSubCtxIds::MAX_SUB_CTX_ID; ++i) {
obj_mgrs_[i].~ObjectMgr();
}
}
virtual ~ObTenantCtxAllocator() {}
int set_tenant_memory_mgr()
{
int ret = common::OB_SUCCESS;
@ -165,7 +175,17 @@ public:
int iter_label(VisitFunc func) const;
int64_t sync_wash(int64_t wash_size);
int64_t sync_wash();
bool check_has_unfree(char *first_label) { return obj_mgr_.check_has_unfree(first_label); }
bool check_has_unfree(char *first_label)
{
bool has_unfree = obj_mgr_.check_has_unfree();
if (has_unfree) {
bool tmp_has_unfree = obj_mgr_.check_has_unfree(first_label);
for (int i = 0; i < ObSubCtxIds::MAX_SUB_CTX_ID && !tmp_has_unfree; ++i) {
tmp_has_unfree = obj_mgrs_[i].check_has_unfree(first_label);
}
}
return has_unfree;
}
void update_wash_stat(int64_t related_chunks, int64_t blocks, int64_t size);
private:
int64_t inc_ref_cnt(int64_t cnt) { return ATOMIC_FAA(&ref_cnt_, cnt); }
@ -214,6 +234,9 @@ private:
int64_t wash_related_chunks_;
int64_t washed_blocks_;
int64_t washed_size_;
union {
ObjectMgr obj_mgrs_[ObSubCtxIds::MAX_SUB_CTX_ID];
};
}; // end of class ObTenantCtxAllocator
} // end of namespace lib

View File

@ -17,16 +17,17 @@
using namespace oceanbase;
using namespace lib;
SubObjectMgr::SubObjectMgr(const bool for_logger, const int64_t tenant_id, const int64_t ctx_id)
SubObjectMgr::SubObjectMgr(const bool for_logger, const int64_t tenant_id, const int64_t ctx_id,
const uint32_t ablock_size, IBlockMgr *blk_mgr)
: IBlockMgr(tenant_id, ctx_id), mutex_(common::ObLatchIds::ALLOC_OBJECT_LOCK),
normal_locker_(mutex_), logger_locker_(mutex_),
locker_(!for_logger ? static_cast<ISetLocker&>(normal_locker_) :
static_cast<ISetLocker&>(logger_locker_)),
bs_(), os_()
bs_(), os_(NULL, ablock_size)
{
bs_.set_locker(&locker_);
os_.set_locker(&locker_);
os_.set_block_mgr(this);
NULL == blk_mgr ? os_.set_block_mgr(this) : os_.set_block_mgr(blk_mgr);
#ifndef ENABLE_SANITY
mutex_.enable_record_stat(false);
#endif
@ -59,10 +60,11 @@ void SubObjectMgr::free_block(ABlock *block)
bs_.free_block(block);
}
ObjectMgr::ObjectMgr(ObTenantCtxAllocator &allocator, uint64_t tenant_id, uint64_t ctx_id)
ObjectMgr::ObjectMgr(ObTenantCtxAllocator &allocator, uint64_t tenant_id, uint64_t ctx_id,
uint32_t ablock_size, int parallel, IBlockMgr *blk_mgr)
: IBlockMgr(tenant_id, ctx_id), ta_(allocator),
sub_cnt_(1),
root_mgr_(common::ObCtxIds::LOGGER_CTX_ID == ctx_id, tenant_id, ctx_id),
ablock_size_(ablock_size), parallel_(4), blk_mgr_(blk_mgr), sub_cnt_(1),
root_mgr_(common::ObCtxIds::LOGGER_CTX_ID == ctx_id, tenant_id, ctx_id, ablock_size_, blk_mgr_),
last_wash_ts_(0), last_washed_size_(0)
{
root_mgr_.set_tenant_ctx_allocator(allocator);
@ -101,9 +103,8 @@ AObject *ObjectMgr::alloc_object(uint64_t size, const ObMemAttr &attr)
}
}
if (OB_ISNULL(obj)) {
const int limit = common::ObCtxParallel::instance().parallel_of_ctx(ctx_id_);
auto cnt = ATOMIC_LOAD(&sub_cnt_);
if (cnt < limit) {
if (cnt < parallel_) {
if (OB_NOT_NULL(sub_mgr = create_sub_mgr())) {
if (ATOMIC_BCAS(&sub_mgrs_[cnt], nullptr, sub_mgr)) {
obj = sub_mgr->alloc_object(size, attr);
@ -179,9 +180,8 @@ ABlock *ObjectMgr::alloc_block(uint64_t size, const ObMemAttr &attr)
}
}
if (OB_ISNULL(block)) {
const int limit = common::ObCtxParallel::instance().parallel_of_ctx(ctx_id_);
auto cnt = ATOMIC_LOAD(&sub_cnt_);
if (cnt < limit) {
if (cnt < parallel_) {
if (OB_NOT_NULL(sub_mgr = create_sub_mgr())) {
if (ATOMIC_BCAS(&sub_mgrs_[cnt], nullptr, sub_mgr)) {
block = sub_mgr->alloc_block(size, attr);
@ -229,8 +229,8 @@ SubObjectMgr *ObjectMgr::create_sub_mgr()
root_mgr.unlock();
if (OB_NOT_NULL(obj)) {
SANITY_UNPOISON(obj->data_, obj->alloc_bytes_);
sub_mgr = new (obj->data_) SubObjectMgr(common::ObCtxIds::LOGGER_CTX_ID == ctx_id_,
tenant_id_, ctx_id_);
sub_mgr = new (obj->data_) SubObjectMgr(common::ObCtxIds::LOGGER_CTX_ID == ctx_id_, tenant_id_, ctx_id_,
ablock_size_, blk_mgr_);
sub_mgr->set_tenant_ctx_allocator(ta_);
}
return sub_mgr;
@ -296,6 +296,22 @@ ObjectMgr::Stat ObjectMgr::get_stat()
};
}
bool ObjectMgr::check_has_unfree()
{
bool has_unfree = false;
for (uint64_t idx = 0; idx < ATOMIC_LOAD(&sub_cnt_) && !has_unfree; idx++) {
auto sub_mgr = ATOMIC_LOAD(&sub_mgrs_[idx]);
if (OB_ISNULL(sub_mgr)) {
// do nothing
} else {
sub_mgr->lock();
DEFER(sub_mgr->unlock());
has_unfree = sub_mgr->check_has_unfree();
}
}
return has_unfree;
}
bool ObjectMgr::check_has_unfree(char *first_label)
{
bool has_unfree = false;
@ -310,4 +326,4 @@ bool ObjectMgr::check_has_unfree(char *first_label)
}
}
return has_unfree;
}
}

View File

@ -36,7 +36,8 @@ class SubObjectMgr : public IBlockMgr
{
friend class ObTenantCtxAllocator;
public:
SubObjectMgr(const bool for_logger, const int64_t tenant_id, const int64_t ctx_id);
SubObjectMgr(const bool for_logger, const int64_t tenant_id, const int64_t ctx_id,
const uint32_t ablock_size, IBlockMgr *blk_mgr);
virtual ~SubObjectMgr() {}
OB_INLINE void set_tenant_ctx_allocator(ObTenantCtxAllocator &allocator)
{
@ -59,13 +60,13 @@ public:
OB_INLINE int64_t get_hold() { return bs_.get_total_hold(); }
OB_INLINE int64_t get_payload() { return bs_.get_total_payload(); }
OB_INLINE int64_t get_used() { return bs_.get_total_used(); }
OB_INLINE bool check_has_unfree()
{
return bs_.check_has_unfree();
}
OB_INLINE bool check_has_unfree(char *first_label)
{
const bool has_unfree = bs_.check_has_unfree();
if (has_unfree) {
os_.check_has_unfree(first_label);
}
return has_unfree;
return os_.check_has_unfree(first_label);
}
private:
#ifndef ENABLE_SANITY
@ -80,7 +81,7 @@ private:
ObjectSet os_;
};
class ObjectMgr : public IBlockMgr
class ObjectMgr final : public IBlockMgr
{
static const int N = 32;
public:
@ -93,7 +94,8 @@ public:
int64_t last_wash_ts_;
};
public:
ObjectMgr(ObTenantCtxAllocator &allocator, uint64_t tenant_id, uint64_t ctx_id);
ObjectMgr(ObTenantCtxAllocator &allocator, uint64_t tenant_id, uint64_t ctx_id,
uint32_t ablock_size, int parallel, IBlockMgr *blk_mgr);
~ObjectMgr();
void reset();
@ -108,6 +110,7 @@ public:
void print_usage() const;
int64_t sync_wash(int64_t wash_size) override;
Stat get_stat();
bool check_has_unfree();
bool check_has_unfree(char *first_label);
private:
SubObjectMgr *create_sub_mgr();
@ -115,6 +118,9 @@ private:
public:
ObTenantCtxAllocator &ta_;
uint32_t ablock_size_;
int parallel_;
IBlockMgr *blk_mgr_;
int sub_cnt_;
SubObjectMgr root_mgr_;
SubObjectMgr *sub_mgrs_[N];

View File

@ -38,6 +38,12 @@ CTX_ITEM_DEF(UNEXPECTED_IN_500)
CTX_ITEM_DEF(MAX_CTX_ID)
#endif
#ifdef SUB_CTX_ITEM_DEF
SUB_CTX_ITEM_DEF(TEST1)
SUB_CTX_ITEM_DEF(TEST2)
SUB_CTX_ITEM_DEF(MAX_SUB_CTX_ID)
#endif
// Label does not need to be defined here, just pass char * directly in alloc,
// It is reserved here to be compatible with the existing code of the upper layer
#ifdef LABEL_ITEM_DEF
@ -659,6 +665,16 @@ private:
const char *ctx_names_[CTX_COUNT_LIMIT];
};
struct ObSubCtxIds
{
enum ObSubCtxIdEnum
{
#define SUB_CTX_ITEM_DEF(name) name,
#include "lib/allocator/ob_mod_define.h"
#undef SUB_CTX_ITEM_DEF
};
};
struct ObModIds
{
#define LABEL_ITEM_DEF(name, real_name) static constexpr const char name[] = #real_name;

View File

@ -238,7 +238,7 @@ TEST_F(TestObjectMgr, TestSubObjectMgr)
abort_unless(ptr != MAP_FAILED);
int64_t tenant_id = OB_SERVER_TENANT_ID;
int64_t ctx_id = ObCtxIds::DEFAULT_CTX_ID;
SubObjectMgr som(false, tenant_id, ctx_id);
SubObjectMgr som(false, tenant_id, ctx_id, INTACT_NORMAL_AOBJECT_SIZE, NULL);
ObMemAttr attr;
som.set_tenant_ctx_allocator(*ObMallocAllocator::get_instance()->get_tenant_ctx_allocator(
tenant_id, ctx_id).ref_allocator());

View File

@ -13,6 +13,7 @@
#include <gtest/gtest.h>
#define private public
#include "lib/alloc/ob_tenant_ctx_allocator.h"
#include "lib/alloc/object_mgr.h"
#undef private
#include "lib/alloc/alloc_func.h"
#include "lib/resource/ob_resource_mgr.h"
@ -325,6 +326,71 @@ TEST(TestTenantAllocator, chunk_free_list_push_pop_concurrency)
ASSERT_EQ(0, chunk_num);
}
TEST(TestTenantAllocator, sub_ctx_id)
{
uint64_t tenant_id = 1010;
uint64_t ctx_id = 0;
ObMemAttr mem_attr(tenant_id, "TestSubCtx", ctx_id);
auto malloc_allocator = ObMallocAllocator::get_instance();
ASSERT_EQ(OB_SUCCESS, malloc_allocator->create_and_add_tenant_allocator(tenant_id));
ObTenantCtxAllocator *ta = NULL;
{
auto guard = malloc_allocator->get_tenant_ctx_allocator(tenant_id, ctx_id);
ta = guard.ref_allocator();
}
ObjectMgr &obj_mgr = ta->obj_mgr_;
ObjectMgr *obj_mgrs = ta->obj_mgrs_;
int size = 1<<20;
void *ptrs[ObSubCtxIds::MAX_SUB_CTX_ID];
memset(&ptrs, 0, sizeof(ptrs));
for (int i = 0; i < ObSubCtxIds::MAX_SUB_CTX_ID; ++i) {
mem_attr.sub_ctx_id_ = i;
ptrs[i] = ob_malloc(size, mem_attr);
ASSERT_EQ(true, NULL != ptrs[i]);
if (NULL != ptrs[i]) {
AObject *obj = reinterpret_cast<AObject*>((char*)ptrs[i] - AOBJECT_HEADER_SIZE);
AChunk *chunk = AChunk::ptr2chunk(obj);
ASSERT_EQ(&obj_mgr.root_mgr_.bs_, chunk->block_set_);
ABlock *block = chunk->ptr2blk(obj);
ASSERT_EQ(&obj_mgrs[i].root_mgr_.os_, block->obj_set_);
}
}
if (ObSubCtxIds::MAX_SUB_CTX_ID > 2 &&
NULL != ptrs[0] && NULL != ptrs[1]) {
AObject *obj_0 = reinterpret_cast<AObject*>((char*)ptrs[0] - AOBJECT_HEADER_SIZE);
AObject *obj_1 = reinterpret_cast<AObject*>((char*)ptrs[1] - AOBJECT_HEADER_SIZE);
ASSERT_NE(AChunk::ptr2chunk(obj_0)->ptr2blk(obj_0)->obj_set_,
AChunk::ptr2chunk(obj_1)->ptr2blk(obj_1)->obj_set_);
}
mem_attr.sub_ctx_id_ = ObSubCtxIds::MAX_SUB_CTX_ID + 1;
void *ptr = ob_malloc(size, mem_attr);
ASSERT_EQ(true, NULL == ptr);
int64_t wash_size = ta->sync_wash(INT64_MAX);
ASSERT_NE(0, wash_size);
ptr = ob_malloc(size, ObMemAttr(tenant_id, "TestSubCtx", ctx_id));
ASSERT_EQ(true, NULL != ptr);
ASSERT_EQ(wash_size, ta->sync_wash(INT64_MAX) * ObSubCtxIds::MAX_SUB_CTX_ID);
ob_free(ptr);
for (int i = 0; i < ObSubCtxIds::MAX_SUB_CTX_ID; ++i) {
if (NULL != ptrs[i]) {
ob_free(ptrs[i]);
}
}
ASSERT_EQ(OB_SUCCESS, malloc_allocator->recycle_tenant_allocator(tenant_id));
uint64_t tenant_id_1 = 1011;
ObMemAttr mem_attr_1(tenant_id_1, "TestSubCtx", ctx_id);
mem_attr_1.sub_ctx_id_ = 0;
ASSERT_EQ(OB_SUCCESS, malloc_allocator->create_and_add_tenant_allocator(tenant_id_1));
ASSERT_EQ(true, NULL != ob_malloc(size, mem_attr_1));
ASSERT_NE(OB_SUCCESS, malloc_allocator->recycle_tenant_allocator(tenant_id_1));
}
int main(int argc, char *argv[])
{
signal(49, SIG_IGN);

View File

@ -3191,7 +3191,7 @@ int ObSql::generate_plan(ParseResult &parse_result,
ObExplainDisplayOpt option;
option.with_tree_line_ = false;
ObSqlPlan sql_plan(logical_plan->get_allocator());
ObSEArray<common::ObString, 64> plan_strs;
ObSEArray<common::ObString, 32> plan_strs;
if (OB_TMP_FAIL(sql_plan.print_sql_plan(logical_plan,
EXPLAIN_EXTENDED,
option,

View File

@ -604,19 +604,20 @@ int ObShardingInfo::check_if_match_partition_wise(const EqualSets &equal_sets,
bool is_equal = false;
PwjTable l_table;
PwjTable r_table;
ObStrictPwjComparer pwj_comparer;
if (OB_FAIL(l_table.init(*first_left_sharding))) {
LOG_WARN("failed to init pwj table with sharding info", K(ret));
} else if (OB_FAIL(r_table.init(*first_right_sharding))) {
LOG_WARN("failed to init pwj table with sharding info", K(ret));
} else if (OB_FAIL(pwj_comparer.add_table(l_table, is_equal))) {
LOG_WARN("failed to add table", K(ret));
} else if (!is_equal) {
// do nothing
} else if (OB_FAIL(pwj_comparer.add_table(r_table, is_equal))) {
LOG_WARN("failed to add table", K(ret));
} else if (is_equal) {
is_partition_wise = true;
SMART_VAR(ObStrictPwjComparer, pwj_comparer) {
if (OB_FAIL(l_table.init(*first_left_sharding))) {
LOG_WARN("failed to init pwj table with sharding info", K(ret));
} else if (OB_FAIL(r_table.init(*first_right_sharding))) {
LOG_WARN("failed to init pwj table with sharding info", K(ret));
} else if (OB_FAIL(pwj_comparer.add_table(l_table, is_equal))) {
LOG_WARN("failed to add table", K(ret));
} else if (!is_equal) {
// do nothing
} else if (OB_FAIL(pwj_comparer.add_table(r_table, is_equal))) {
LOG_WARN("failed to add table", K(ret));
} else if (is_equal){
is_partition_wise = true;
}
}
}
LOG_TRACE("succeed check if match partition wise",
@ -1003,18 +1004,19 @@ int ObShardingInfo::is_sharding_equal(const ObShardingInfo *left_sharding,
} else {
PwjTable l_table;
PwjTable r_table;
ObStrictPwjComparer pwj_comparer;
if (OB_FAIL(l_table.init(*left_sharding))) {
LOG_WARN("failed to init pwj table with sharding info", K(ret));
} else if (OB_FAIL(r_table.init(*right_sharding))) {
LOG_WARN("failed to init pwj table with sharding info", K(ret));
} else if (OB_FAIL(pwj_comparer.add_table(l_table, is_equal))) {
LOG_WARN("failed to add table", K(ret));
} else if (!is_equal) {
// do nothing
} else if (OB_FAIL(pwj_comparer.add_table(r_table, is_equal))) {
LOG_WARN("failed to add table", K(ret));
} else { /*do nothing*/ }
SMART_VAR(ObStrictPwjComparer, pwj_comparer) {
if (OB_FAIL(l_table.init(*left_sharding))) {
LOG_WARN("failed to init pwj table with sharding info", K(ret));
} else if (OB_FAIL(r_table.init(*right_sharding))) {
LOG_WARN("failed to init pwj table with sharding info", K(ret));
} else if (OB_FAIL(pwj_comparer.add_table(l_table, is_equal))) {
LOG_WARN("failed to add table", K(ret));
} else if (!is_equal) {
// do nothing
} else if (OB_FAIL(pwj_comparer.add_table(r_table, is_equal))) {
LOG_WARN("failed to add table", K(ret));
} else { /*do nothing*/ }
}
}
if (OB_SUCC(ret)) {
LOG_TRACE("succeed to check whether sharding info is equal", K(is_equal));

View File

@ -51,7 +51,7 @@ int ObFullTabletCreator::init(const uint64_t tenant_id)
} else {
ContextParam param;
param.set_mem_attr(tenant_id, "MSTXCTX", common::ObCtxIds::DEFAULT_CTX_ID)
.set_ablock_size(64L << 10)
.set_ablock_size(lib::INTACT_MIDDLE_AOBJECT_SIZE)
.set_properties(ALLOC_THREAD_SAFE);
if (OB_FAIL(ROOT_CONTEXT->CREATE_CONTEXT(mstx_mem_ctx_, param))) {
LOG_WARN("fail to create entity", K(ret));