recover Reduce zstd memory overhead and fix the bug of vslice_alloc

This commit is contained in:
tushicheng 2024-07-12 09:54:10 +00:00 committed by ob-robot
parent f3a53b154c
commit 2083125b4c
3 changed files with 55 additions and 34 deletions

View File

@ -27,6 +27,7 @@ class ObBlockVSlicer
{
friend class ObVSliceAlloc;
public:
enum { K = INT64_MAX };
static const uint32_t ITEM_MAGIC_CODE = 0XCCEEDDF1;
static const uint32_t ITEM_MAGIC_CODE_MASK = 0XFFFFFFF0;
typedef ObBlockVSlicer Host;
@ -39,32 +40,43 @@ public:
int64_t size_;
} __attribute__((aligned (16)));;
ObBlockVSlicer(ObVSliceAlloc* vslice_alloc, int64_t blk_size)
: vslice_alloc_(vslice_alloc), blk_size_(blk_size), ref_(0), pos_(0) {}
: vslice_alloc_(vslice_alloc), arena_(nullptr), blk_size_(blk_size), ref_(K), pos_(0) {}
~ObBlockVSlicer() {}
int64_t get_blk_size() const { return blk_size_; }
int64_t get_using_size() { return ATOMIC_LOAD(&ref_); }
int64_t get_using_size()
{
int64_t v = ATOMIC_LOAD(&ref_);
return v > K ? (v - K) : v;
}
int64_t get_limit() const { return blk_size_ - sizeof(*this); }
Item* alloc_item(int64_t size, int64_t &leak_pos) {
Item* p = NULL;
int64_t alloc_size = size + sizeof(*p);
int64_t pos = ATOMIC_FAA(&pos_, alloc_size);
int64_t limit = blk_size_ - sizeof(*this);
if (pos + alloc_size <= limit) {
if (pos + alloc_size <= get_limit()) {
p = (Item*)(base_ + pos);
new(p)Item(this, alloc_size);
} else if (pos <= limit) {
ATOMIC_FAA(&ref_, alloc_size);
} else if (pos <= get_limit()) {
leak_pos = pos;
}
return p;
}
bool free_item(Item* p) { return 0 == ATOMIC_AAF(&ref_, -p->size_); }
bool freeze(int64_t &old_pos) {
old_pos = ATOMIC_FAA(&pos_, blk_size_ + 1);
return (old_pos < blk_size_);
bool free_item(Item* p, bool &can_purge)
{
int64_t nv = ATOMIC_AAF(&ref_, -p->size_);
can_purge = (K == nv);
return 0 == nv;
}
bool retire(int64_t val) { return 0 == ATOMIC_AAF(&ref_, val); }
bool freeze() {
int64_t old_pos = ATOMIC_FAA(&pos_, blk_size_ + 1);
return (old_pos <= get_limit());
}
bool retire() { return 0 == ATOMIC_AAF(&ref_, -K); }
ObVSliceAlloc* get_vslice_alloc() { return vslice_alloc_; }
private:
ObVSliceAlloc* vslice_alloc_;
void *arena_;
int64_t blk_size_ CACHE_ALIGNED;
int64_t ref_ CACHE_ALIGNED;
int64_t pos_ CACHE_ALIGNED;
@ -140,9 +152,8 @@ public:
blk = new(buf)Block(this, direct_size);
int64_t leak_pos = -1;
ret = blk->alloc_item(aligned_size, leak_pos);
int64_t old_pos = INT64_MAX;
if (blk->freeze(old_pos)) {
if (blk->retire(old_pos)) {
if (blk->freeze()) {
if (blk->retire()) {
destroy_block(blk);
}
}
@ -170,6 +181,7 @@ public:
// alloc block fail, end
tmp_ret = OB_ALLOCATE_MEMORY_FAILED;
} else {
nb->arena_ = &arena;
if (!arena.cas(NULL, nb)) {
// no need wait sync
destroy_block(nb);
@ -178,7 +190,7 @@ public:
}
if (blk2destroy) {
arena.sync();
if (blk2destroy->retire(leak_pos)) {
if (blk2destroy->retire()) {
destroy_block(blk2destroy);
}
}
@ -196,6 +208,7 @@ public:
Block::Item* item = (Block::Item*)p - 1;
abort_unless(Block::ITEM_MAGIC_CODE == item->MAGIC_CODE_);
Block* blk = item->host_;
Arena *arena = (Arena*)blk->arena_;
#ifndef NDEBUG
abort_unless(blk->get_vslice_alloc() == this);
abort_unless(bsize_ != 0);
@ -206,35 +219,40 @@ public:
}
#endif
item->MAGIC_CODE_ = item->MAGIC_CODE_ & Block::ITEM_MAGIC_CODE_MASK;
if (blk->free_item(item)) {
bool can_purge = false;
if (blk->free_item(item, can_purge)) {
destroy_block(blk);
} else if (can_purge) {
purge_extra_cached_block(*arena);
}
}
#endif
}
void purge_extra_cached_block(int keep, bool need_check = false) {
for(int i = MAX_ARENA_NUM - 1; i >= keep; i--) {
Arena& arena = arena_[i];
arena.ref(1);
Block* old_blk = arena.clear();
if (NULL != old_blk) {
int64_t old_pos = INT64_MAX;
if (old_blk->freeze(old_pos)) {
arena.ref(-1);
arena.sync();
if (old_blk->retire(old_pos)) {
destroy_block(old_blk);
} else if (need_check) {
// can not monitor all leak !!!
LIB_LOG_RET(ERROR, common::OB_ERR_UNEXPECTED, "there was memory leak", K(old_blk->ref_));
}
} else {
arena.ref(-1);
void purge_extra_cached_block(Arena &arena, bool need_check = false) {
arena.ref(1);
Block* old_blk = arena.clear();
if (NULL != old_blk) {
if (old_blk->freeze()) {
arena.ref(-1);
arena.sync();
if (old_blk->retire()) {
destroy_block(old_blk);
} else if (need_check) {
// can not monitor all leak !!!
LIB_LOG_RET(ERROR, common::OB_ERR_UNEXPECTED, "there was memory leak", K(old_blk->ref_));
}
} else {
arena.ref(-1);
}
} else {
arena.ref(-1);
}
}
void purge_extra_cached_block(int keep, bool need_check = false) {
for(int i = MAX_ARENA_NUM - 1; i >= keep; i--) {
purge_extra_cached_block(arena_[i], need_check);
}
}
double get_block_using_ratio(void* p) {

View File

@ -17,7 +17,7 @@ namespace oceanbase
namespace common
{
ObCompressorPool::ObCompressorPool()
:allocator_(SET_USE_500(ObMemAttr(OB_SERVER_TENANT_ID, "Compressor")), OB_MALLOC_BIG_BLOCK_SIZE),
:allocator_(ObMemAttr(OB_SERVER_TENANT_ID, "Compressor"), OB_MALLOC_BIG_BLOCK_SIZE),
none_compressor(),
lz4_compressor(),
lz4_compressor_1_9_1(),

View File

@ -62,6 +62,7 @@ TEST(TestConcurrentFIFOAllocator, single_thread)
allocator.free(ptr_buffer[i]);
ptr_buffer[i] = NULL;
}
ASSERT_EQ(allocator.allocated(), 0);
allocator.destroy();
}
@ -87,6 +88,7 @@ TEST(TestConcurrentFIFOAllocator, single_thread2)
ptr_buffer[i] = NULL;
}
}
ASSERT_EQ(allocator.allocated(), 0);
allocator.destroy();
}
@ -129,6 +131,7 @@ TEST(TestConcurrentFIFOAllocator, multipe_threads_direct_alloc)
ASSERT_EQ(0, pthread_join(work_thread[i], NULL));
}
ASSERT_EQ(0, pthread_barrier_destroy(&barrier1));
ASSERT_EQ(allocator1.allocated(), 0);
allocator1.destroy();
}