diff --git a/deps/oblib/src/lib/allocator/ob_vslice_alloc.h b/deps/oblib/src/lib/allocator/ob_vslice_alloc.h index cb36dc7e1..69b4aab00 100644 --- a/deps/oblib/src/lib/allocator/ob_vslice_alloc.h +++ b/deps/oblib/src/lib/allocator/ob_vslice_alloc.h @@ -27,6 +27,7 @@ class ObBlockVSlicer { friend class ObVSliceAlloc; public: + enum { K = INT64_MAX }; static const uint32_t ITEM_MAGIC_CODE = 0XCCEEDDF1; static const uint32_t ITEM_MAGIC_CODE_MASK = 0XFFFFFFF0; typedef ObBlockVSlicer Host; @@ -39,32 +40,43 @@ public: int64_t size_; } __attribute__((aligned (16)));; ObBlockVSlicer(ObVSliceAlloc* vslice_alloc, int64_t blk_size) - : vslice_alloc_(vslice_alloc), blk_size_(blk_size), ref_(0), pos_(0) {} + : vslice_alloc_(vslice_alloc), arena_(nullptr), blk_size_(blk_size), ref_(K), pos_(0) {} ~ObBlockVSlicer() {} int64_t get_blk_size() const { return blk_size_; } - int64_t get_using_size() { return ATOMIC_LOAD(&ref_); } + int64_t get_using_size() + { + int64_t v = ATOMIC_LOAD(&ref_); + return v > K ? (v - K) : v; + } + int64_t get_limit() const { return blk_size_ - sizeof(*this); } Item* alloc_item(int64_t size, int64_t &leak_pos) { Item* p = NULL; int64_t alloc_size = size + sizeof(*p); int64_t pos = ATOMIC_FAA(&pos_, alloc_size); - int64_t limit = blk_size_ - sizeof(*this); - if (pos + alloc_size <= limit) { + if (pos + alloc_size <= get_limit()) { p = (Item*)(base_ + pos); new(p)Item(this, alloc_size); - } else if (pos <= limit) { + ATOMIC_FAA(&ref_, alloc_size); + } else if (pos <= get_limit()) { leak_pos = pos; } return p; } - bool free_item(Item* p) { return 0 == ATOMIC_AAF(&ref_, -p->size_); } - bool freeze(int64_t &old_pos) { - old_pos = ATOMIC_FAA(&pos_, blk_size_ + 1); - return (old_pos < blk_size_); + bool free_item(Item* p, bool &can_purge) + { + int64_t nv = ATOMIC_AAF(&ref_, -p->size_); + can_purge = (K == nv); + return 0 == nv; } - bool retire(int64_t val) { return 0 == ATOMIC_AAF(&ref_, val); } + bool freeze() { + int64_t old_pos = ATOMIC_FAA(&pos_, blk_size_ + 1); + return (old_pos <= get_limit()); + } + bool retire() { return 0 == ATOMIC_AAF(&ref_, -K); } ObVSliceAlloc* get_vslice_alloc() { return vslice_alloc_; } private: ObVSliceAlloc* vslice_alloc_; + void *arena_; int64_t blk_size_ CACHE_ALIGNED; int64_t ref_ CACHE_ALIGNED; int64_t pos_ CACHE_ALIGNED; @@ -140,9 +152,8 @@ public: blk = new(buf)Block(this, direct_size); int64_t leak_pos = -1; ret = blk->alloc_item(aligned_size, leak_pos); - int64_t old_pos = INT64_MAX; - if (blk->freeze(old_pos)) { - if (blk->retire(old_pos)) { + if (blk->freeze()) { + if (blk->retire()) { destroy_block(blk); } } @@ -170,6 +181,7 @@ public: // alloc block fail, end tmp_ret = OB_ALLOCATE_MEMORY_FAILED; } else { + nb->arena_ = &arena; if (!arena.cas(NULL, nb)) { // no need wait sync destroy_block(nb); @@ -178,7 +190,7 @@ public: } if (blk2destroy) { arena.sync(); - if (blk2destroy->retire(leak_pos)) { + if (blk2destroy->retire()) { destroy_block(blk2destroy); } } @@ -196,6 +208,7 @@ public: Block::Item* item = (Block::Item*)p - 1; abort_unless(Block::ITEM_MAGIC_CODE == item->MAGIC_CODE_); Block* blk = item->host_; + Arena *arena = (Arena*)blk->arena_; #ifndef NDEBUG abort_unless(blk->get_vslice_alloc() == this); abort_unless(bsize_ != 0); @@ -206,35 +219,40 @@ public: } #endif item->MAGIC_CODE_ = item->MAGIC_CODE_ & Block::ITEM_MAGIC_CODE_MASK; - if (blk->free_item(item)) { + bool can_purge = false; + if (blk->free_item(item, can_purge)) { destroy_block(blk); + } else if (can_purge) { + purge_extra_cached_block(*arena); } } #endif } - void purge_extra_cached_block(int keep, bool need_check = false) { - for(int i = MAX_ARENA_NUM - 1; i >= keep; i--) { - Arena& arena = arena_[i]; - arena.ref(1); - Block* old_blk = arena.clear(); - if (NULL != old_blk) { - int64_t old_pos = INT64_MAX; - if (old_blk->freeze(old_pos)) { - arena.ref(-1); - arena.sync(); - if (old_blk->retire(old_pos)) { - destroy_block(old_blk); - } else if (need_check) { - // can not monitor all leak !!! - LIB_LOG_RET(ERROR, common::OB_ERR_UNEXPECTED, "there was memory leak", K(old_blk->ref_)); - } - } else { - arena.ref(-1); + + void purge_extra_cached_block(Arena &arena, bool need_check = false) { + arena.ref(1); + Block* old_blk = arena.clear(); + if (NULL != old_blk) { + if (old_blk->freeze()) { + arena.ref(-1); + arena.sync(); + if (old_blk->retire()) { + destroy_block(old_blk); + } else if (need_check) { + // can not monitor all leak !!! + LIB_LOG_RET(ERROR, common::OB_ERR_UNEXPECTED, "there was memory leak", K(old_blk->ref_)); } } else { arena.ref(-1); } + } else { + arena.ref(-1); + } + } + void purge_extra_cached_block(int keep, bool need_check = false) { + for(int i = MAX_ARENA_NUM - 1; i >= keep; i--) { + purge_extra_cached_block(arena_[i], need_check); } } double get_block_using_ratio(void* p) { diff --git a/deps/oblib/src/lib/compress/ob_compressor_pool.cpp b/deps/oblib/src/lib/compress/ob_compressor_pool.cpp index 5f8da7afd..5675a9752 100644 --- a/deps/oblib/src/lib/compress/ob_compressor_pool.cpp +++ b/deps/oblib/src/lib/compress/ob_compressor_pool.cpp @@ -17,7 +17,7 @@ namespace oceanbase namespace common { ObCompressorPool::ObCompressorPool() - :allocator_(SET_USE_500(ObMemAttr(OB_SERVER_TENANT_ID, "Compressor")), OB_MALLOC_BIG_BLOCK_SIZE), + :allocator_(ObMemAttr(OB_SERVER_TENANT_ID, "Compressor"), OB_MALLOC_BIG_BLOCK_SIZE), none_compressor(), lz4_compressor(), lz4_compressor_1_9_1(), diff --git a/deps/oblib/unittest/lib/allocator/test_concurrent_fifo_allocator.cpp b/deps/oblib/unittest/lib/allocator/test_concurrent_fifo_allocator.cpp index 346949338..996cc6fcf 100644 --- a/deps/oblib/unittest/lib/allocator/test_concurrent_fifo_allocator.cpp +++ b/deps/oblib/unittest/lib/allocator/test_concurrent_fifo_allocator.cpp @@ -62,6 +62,7 @@ TEST(TestConcurrentFIFOAllocator, single_thread) allocator.free(ptr_buffer[i]); ptr_buffer[i] = NULL; } + ASSERT_EQ(allocator.allocated(), 0); allocator.destroy(); } @@ -87,6 +88,7 @@ TEST(TestConcurrentFIFOAllocator, single_thread2) ptr_buffer[i] = NULL; } } + ASSERT_EQ(allocator.allocated(), 0); allocator.destroy(); } @@ -129,6 +131,7 @@ TEST(TestConcurrentFIFOAllocator, multipe_threads_direct_alloc) ASSERT_EQ(0, pthread_join(work_thread[i], NULL)); } ASSERT_EQ(0, pthread_barrier_destroy(&barrier1)); + ASSERT_EQ(allocator1.allocated(), 0); allocator1.destroy(); }