diff --git a/src/storage/blocksstable/ob_imicro_block_writer.cpp b/src/storage/blocksstable/ob_imicro_block_writer.cpp index 83032e4cc..52771d72e 100644 --- a/src/storage/blocksstable/ob_imicro_block_writer.cpp +++ b/src/storage/blocksstable/ob_imicro_block_writer.cpp @@ -90,14 +90,20 @@ int ObMicroBufferWriter::init(const int64_t capacity, const int64_t reserve_size void ObMicroBufferWriter::reset() { - old_buf_ = nullptr; + if (old_buf_ != nullptr) { + allocator_.free(old_buf_); + old_buf_ = nullptr; + } + if (data_ != nullptr) { + allocator_.free(data_); + data_ = nullptr; + } old_size_ = 0; lazy_move_ = false; has_expand_ = false; memory_reclaim_cnt_ = 0; reset_memory_threshold_ = 0; default_reserve_ = 0; - data_ = nullptr; len_ = 0; buffer_size_ = 0; capacity_ = 0; @@ -120,7 +126,7 @@ void ObMicroBufferWriter::reuse() void *buf = nullptr; if (OB_ISNULL(buf = allocator_.alloc(default_reserve_))) { int ret = OB_ALLOCATE_MEMORY_FAILED; - STORAGE_LOG(WARN, "failed to reclaim memory", K(ret), K(default_reserve_), K(allocator_)); + STORAGE_LOG(WARN, "failed to reclaim memory", K(ret), K(default_reserve_)); } else { allocator_.free(data_); buffer_size_ = default_reserve_; diff --git a/src/storage/blocksstable/ob_imicro_block_writer.h b/src/storage/blocksstable/ob_imicro_block_writer.h index 268f02454..b180114e8 100644 --- a/src/storage/blocksstable/ob_imicro_block_writer.h +++ b/src/storage/blocksstable/ob_imicro_block_writer.h @@ -91,7 +91,7 @@ class ObMicroBufferWriter final { public: ObMicroBufferWriter(const int64_t page_size = DEFAULT_MIDDLE_BLOCK_SIZE) - : allocator_("MicroBuffer"), + : allocator_(MTL_ID(), "MicroBuffer"), is_inited_(false), capacity_(0), buffer_size_(0), @@ -167,7 +167,7 @@ public: private: int expand(const int64_t size); private: - compaction::ObLocalArena allocator_; + compaction::ObLocalAllocator allocator_; bool is_inited_; int64_t capacity_; int64_t buffer_size_; //curr buffer size @@ -219,7 +219,7 @@ public: virtual int64_t get_column_count() const = 0; virtual void reset() { - reuse(); + ObIMicroBlockWriter::reuse(); checksum_helper_.reset(); } virtual void dump_diagnose_info() const { STORAGE_LOG(INFO, "IMicroBlockWriter", K(checksum_helper_)); } diff --git a/src/storage/compaction/ob_compaction_memory_context.cpp b/src/storage/compaction/ob_compaction_memory_context.cpp index 5cddfa1b9..677738224 100644 --- a/src/storage/compaction/ob_compaction_memory_context.cpp +++ b/src/storage/compaction/ob_compaction_memory_context.cpp @@ -32,95 +32,6 @@ namespace compaction { -/* - * ================================================= ObLocalArena ================================================= - */ -ObLocalArena::ObLocalArena( - const lib::ObLabel &label, - const int64_t page_size) - : ref_mem_ctx_(nullptr), - arena_(label, page_size, MTL_ID(), ObCtxIds::DEFAULT_CTX_ID), - hist_mem_hold_(0) -{ - ObCompactionMemoryContext *mem_ctx = CURRENT_MEM_CTX(); - if (nullptr != mem_ctx) { - bind_mem_ctx(*mem_ctx); - } -} - -ObLocalArena::ObLocalArena( - ObCompactionMemoryContext &mem_ctx, - const lib::ObLabel &label, - const int64_t page_size) - : ref_mem_ctx_(nullptr), - arena_(label, page_size, MTL_ID(), ObCtxIds::DEFAULT_CTX_ID), - hist_mem_hold_(0) -{ - bind_mem_ctx(mem_ctx); -} - -void ObLocalArena::bind_mem_ctx(ObCompactionMemoryContext &mem_ctx) -{ - if (NULL == ref_mem_ctx_) { - ref_mem_ctx_ = &mem_ctx; - } - arena_.set_ctx_id(ref_mem_ctx_->get_ctx_id()); -} - -ObLocalArena::~ObLocalArena() -{ - reset(); - ref_mem_ctx_ = nullptr; -} - -void* ObLocalArena::alloc(const int64_t size) -{ - void *buf = arena_.alloc(size); - if (OB_NOT_NULL(buf)) { - update_mem_monitor(); - } - return buf; -} - -void* ObLocalArena::alloc(const int64_t size, const ObMemAttr &attr) -{ - void *buf = arena_.alloc(size, attr); - if (OB_NOT_NULL(buf)) { - update_mem_monitor(); - } - return buf; -} - -void ObLocalArena::reset() -{ - arena_.reset(); - update_mem_monitor(); -} - -void ObLocalArena::clear() -{ - arena_.clear(); - update_mem_monitor(); -} - -void ObLocalArena::update_mem_monitor() -{ - ref_mem_ctx_ = nullptr == ref_mem_ctx_ - ? CURRENT_MEM_CTX() - : ref_mem_ctx_; - - int64_t cur_mem_hold = arena_.total(); - if (nullptr != ref_mem_ctx_ && hist_mem_hold_ != cur_mem_hold) { - if (cur_mem_hold > hist_mem_hold_) { - ref_mem_ctx_->inc_local_hold_mem(cur_mem_hold - hist_mem_hold_); - } else { - ref_mem_ctx_->inc_local_free_mem(hist_mem_hold_ - cur_mem_hold); - } - hist_mem_hold_ = cur_mem_hold; - } -} - - /* * ================================================= Mem Monitor Part ================================================= */ diff --git a/src/storage/compaction/ob_compaction_memory_context.h b/src/storage/compaction/ob_compaction_memory_context.h index 3fda1cea8..2da823e45 100644 --- a/src/storage/compaction/ob_compaction_memory_context.h +++ b/src/storage/compaction/ob_compaction_memory_context.h @@ -18,6 +18,9 @@ #include "lib/allocator/ob_allocator.h" #include "lib/allocator/page_arena.h" #include "lib/allocator/ob_fifo_allocator.h" +#include "share/rc/ob_tenant_base.h" +#include "share/scheduler/ob_tenant_dag_scheduler.h" +#include "lib/utility/ob_template_utils.h" #include "lib/lock/ob_spin_lock.h" #include "lib/list/ob_dlist.h" #include "share/ob_delegate.h" @@ -51,76 +54,6 @@ struct ObTabletMergeDagParam; class ObCompactionMemoryContext; -class ObLocalArena : public common::ObIAllocator -{ -public: - ObLocalArena( - const lib::ObLabel &label, - const int64_t page_size = OB_MALLOC_NORMAL_BLOCK_SIZE); - ObLocalArena( - ObCompactionMemoryContext &mem_ctx, - const lib::ObLabel &label, - const int64_t page_size = OB_MALLOC_NORMAL_BLOCK_SIZE); - virtual ~ObLocalArena(); - void bind_mem_ctx(ObCompactionMemoryContext &mem_ctx); - void unbind_mem_ctx() { ref_mem_ctx_ = nullptr; } - common::ObArenaAllocator &get_arena_allocator() { return arena_; } - - virtual void* alloc(const int64_t size) override; - virtual void* alloc(const int64_t size, const ObMemAttr &attr) override; - virtual void free(void *ptr) override { arena_.free(ptr); } - virtual void reset() override; - virtual void clear(); - - DELEGATE_WITH_RET(arena_, alloc_aligned, void*); - DELEGATE_WITH_RET(arena_, realloc, void*); - DELEGATE_WITHOUT_RET(arena_, reset_remain_one_page); - DELEGATE_WITHOUT_RET(arena_, reuse); - DELEGATE_WITHOUT_RET(arena_, set_label); - DELEGATE_WITHOUT_RET(arena_, set_tenant_id); - DELEGATE_WITH_RET(arena_, set_tracer, bool); - DELEGATE_WITH_RET(arena_, revert_tracer, bool); - DELEGATE_WITHOUT_RET(arena_, set_ctx_id); - DELEGATE_WITHOUT_RET(arena_, set_attr); - DELEGATE_WITH_RET(arena_, get_arena, ModuleArena&); - DELEGATE_WITH_RET(arena_, mprotect_arena_allocator, int); - CONST_DELEGATE_WITH_RET(arena_, used, int64_t); - CONST_DELEGATE_WITH_RET(arena_, total, int64_t); - CONST_DELEGATE_WITH_RET(arena_, to_string, int64_t); - -private: - void update_mem_monitor(); -protected: - ObCompactionMemoryContext *ref_mem_ctx_; - common::ObArenaAllocator arena_; - int64_t hist_mem_hold_; -}; - - -class ObLocalSafeArena final : public ObLocalArena -{ -public: - ObLocalSafeArena(const lib::ObLabel &label, const int64_t page_size = OB_MALLOC_NORMAL_BLOCK_SIZE) - : ObLocalArena(label, page_size), lock_() {} - virtual ~ObLocalSafeArena() {} - virtual void *alloc(const int64_t sz) override - { - ObSpinLockGuard guard(lock_); - return arena_.alloc(sz); - } - virtual void* alloc(const int64_t size, const ObMemAttr &attr) override - { - ObSpinLockGuard guard(lock_); - return arena_.alloc(size, attr); - } - DELEGATE_WITH_SPIN_LOCK(arena_, lock_, clear, void); - DELEGATE_WITH_SPIN_LOCK(arena_, lock_, reuse, void); - DELEGATE_WITH_SPIN_LOCK(arena_, lock_, reset, void); -private: - common::ObSpinLock lock_; -}; - - struct ObCompactionMemMonitor { public: @@ -180,6 +113,156 @@ private: DISABLE_COPY_ASSIGN(ObCompactionMemoryContext); }; +template +class ObLocalAllocator : public common::ObIAllocator +{ +public: + ObLocalAllocator( + const lib::ObLabel &label, + const int64_t page_size = OB_MALLOC_NORMAL_BLOCK_SIZE) + : ref_mem_ctx_(nullptr), + allocator_(label, page_size, MTL_ID(), ObCtxIds::DEFAULT_CTX_ID), + hist_mem_hold_(0) + { + ObCompactionMemoryContext *mem_ctx = CURRENT_MEM_CTX(); + if (nullptr != mem_ctx) { + bind_mem_ctx(*mem_ctx); + } + } + + ObLocalAllocator( + const uint64_t tenant_id, + const lib::ObLabel &label) + : ref_mem_ctx_(nullptr), + allocator_(label, tenant_id), + hist_mem_hold_(0) + { + static_assert(std::is_same::value, "error allocator type"); + ObCompactionMemoryContext *mem_ctx = CURRENT_MEM_CTX(); + if (nullptr != mem_ctx) { + bind_mem_ctx(*mem_ctx); + } + } + ObLocalAllocator( + ObCompactionMemoryContext &mem_ctx, + const lib::ObLabel &label, + const int64_t page_size = OB_MALLOC_NORMAL_BLOCK_SIZE) + : ref_mem_ctx_(nullptr), + allocator_(label, page_size, MTL_ID(), ObCtxIds::DEFAULT_CTX_ID), + hist_mem_hold_(0) + { + bind_mem_ctx(mem_ctx); + } + + ~ObLocalAllocator() + { + reset(); + ref_mem_ctx_ = nullptr; + } + + void bind_mem_ctx(ObCompactionMemoryContext &mem_ctx) + { + if (NULL == ref_mem_ctx_) { + ref_mem_ctx_ = &mem_ctx; + } + allocator_.set_ctx_id(ref_mem_ctx_->get_ctx_id()); + } + void unbind_mem_ctx() { ref_mem_ctx_ = nullptr; } + common::ObArenaAllocator &get_arena_allocator() { return allocator_; } + + virtual void* alloc(const int64_t size) override + { + void *buf = allocator_.alloc(size); + if (OB_NOT_NULL(buf)) { + update_mem_monitor(); + } + return buf; + } + virtual void* alloc(const int64_t size, const ObMemAttr &attr) override + { + void *buf = allocator_.alloc(size, attr); + if (OB_NOT_NULL(buf)) { + update_mem_monitor(); + } + return buf; + } + virtual void free(void *ptr) override { allocator_.free(ptr); } + virtual void reset() override + { + allocator_.reset(); + update_mem_monitor(); + } + + template + void clear() + { + allocator_.clear(); + update_mem_monitor(); + } + + DELEGATE_WITH_RET(allocator_, alloc_aligned, void*); + DELEGATE_WITH_RET(allocator_, realloc, void*); + DELEGATE_WITHOUT_RET(allocator_, reset_remain_one_page); + DELEGATE_WITHOUT_RET(allocator_, reuse); + DELEGATE_WITHOUT_RET(allocator_, set_label); + DELEGATE_WITHOUT_RET(allocator_, set_tenant_id); + DELEGATE_WITH_RET(allocator_, set_tracer, bool); + DELEGATE_WITH_RET(allocator_, revert_tracer, bool); + DELEGATE_WITHOUT_RET(allocator_, set_ctx_id); + DELEGATE_WITHOUT_RET(allocator_, set_attr); + DELEGATE_WITH_RET(allocator_, get_arena, ModuleArena&); + DELEGATE_WITH_RET(allocator_, mprotect_arena_allocator, int); + CONST_DELEGATE_WITH_RET(allocator_, used, int64_t); + CONST_DELEGATE_WITH_RET(allocator_, total, int64_t); + CONST_DELEGATE_WITH_RET(allocator_, to_string, int64_t); + +private: + void update_mem_monitor() + { + ref_mem_ctx_ = nullptr == ref_mem_ctx_ + ? CURRENT_MEM_CTX() + : ref_mem_ctx_; + + int64_t cur_mem_hold = allocator_.total(); + if (nullptr != ref_mem_ctx_ && hist_mem_hold_ != cur_mem_hold) { + if (cur_mem_hold > hist_mem_hold_) { + ref_mem_ctx_->inc_local_hold_mem(cur_mem_hold - hist_mem_hold_); + } else { + ref_mem_ctx_->inc_local_free_mem(hist_mem_hold_ - cur_mem_hold); + } + hist_mem_hold_ = cur_mem_hold; + } + } +protected: + ObCompactionMemoryContext *ref_mem_ctx_; + T allocator_; + int64_t hist_mem_hold_; +}; + +using ObLocalArena=ObLocalAllocator; + +class ObLocalSafeArena final : public ObLocalArena +{ +public: + ObLocalSafeArena(const lib::ObLabel &label, const int64_t page_size = OB_MALLOC_NORMAL_BLOCK_SIZE) + : ObLocalArena(label, page_size), lock_() {} + virtual ~ObLocalSafeArena() {} + virtual void *alloc(const int64_t sz) override + { + ObSpinLockGuard guard(lock_); + return allocator_.alloc(sz); + } + virtual void* alloc(const int64_t size, const ObMemAttr &attr) override + { + ObSpinLockGuard guard(lock_); + return allocator_.alloc(size, attr); + } + DELEGATE_WITH_SPIN_LOCK(allocator_, lock_, clear, void); + DELEGATE_WITH_SPIN_LOCK(allocator_, lock_, reuse, void); + DELEGATE_WITH_SPIN_LOCK(allocator_, lock_, reset, void); +private: + common::ObSpinLock lock_; +}; } // compaction } // oceanbase diff --git a/unittest/storage/blocksstable/cs_encoding/test_decoder_filter_perf.h b/unittest/storage/blocksstable/cs_encoding/test_decoder_filter_perf.h index 0ea9f711a..01d91dba6 100644 --- a/unittest/storage/blocksstable/cs_encoding/test_decoder_filter_perf.h +++ b/unittest/storage/blocksstable/cs_encoding/test_decoder_filter_perf.h @@ -312,6 +312,8 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size) } else if (datum_rows_.empty()) { ret = OB_INNER_STAT_ERROR; LOG_WARN("empty micro block", K(ret)); + } else if (OB_FAIL(set_datum_rows_ptr())) { + STORAGE_LOG(WARN, "fail to set datum rows ptr", K(ret)); } else if (OB_FAIL(pivot())) { LOG_WARN("pivot rows to columns failed", K(ret)); } else if (OB_FAIL(row_indexs_.reserve(datum_rows_.count()))) { diff --git a/unittest/storage/blocksstable/encoding/test_raw_decoder.cpp b/unittest/storage/blocksstable/encoding/test_raw_decoder.cpp index 744ac2e1b..d63e9b695 100644 --- a/unittest/storage/blocksstable/encoding/test_raw_decoder.cpp +++ b/unittest/storage/blocksstable/encoding/test_raw_decoder.cpp @@ -60,6 +60,8 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size) } else if (datum_rows_.empty()) { ret = OB_INNER_STAT_ERROR; LOG_WARN("empty micro block", K(ret)); + } else if (OB_FAIL(set_datum_rows_ptr())) { + STORAGE_LOG(WARN, "fail to set datum rows ptr", K(ret)); } else if (OB_FAIL(pivot())) { LOG_WARN("pivot rows to columns failed", K(ret)); } else if (OB_FAIL(row_indexs_.reserve(datum_rows_.count()))) {