diff --git a/src/observer/table_load/ob_table_load_table_compactor.cpp b/src/observer/table_load/ob_table_load_table_compactor.cpp index 96c8bd27c7..ffcaf501ea 100644 --- a/src/observer/table_load/ob_table_load_table_compactor.cpp +++ b/src/observer/table_load/ob_table_load_table_compactor.cpp @@ -110,17 +110,13 @@ void ObTableLoadTableCompactResult::release_all_table_data() */ ObTableLoadTableCompactCtx::ObTableLoadTableCompactCtx() - : allocator_("TLD_TCCtx"), store_ctx_(nullptr), merger_(nullptr), compactor_(nullptr) + : store_ctx_(nullptr), merger_(nullptr), compactor_(nullptr) { } ObTableLoadTableCompactCtx::~ObTableLoadTableCompactCtx() { - if (nullptr != compactor_) { - compactor_->~ObTableLoadTableCompactor(); - allocator_.free(compactor_); - compactor_ = nullptr; - } + release_compactor(); } int ObTableLoadTableCompactCtx::init(ObTableLoadStoreCtx *store_ctx, ObTableLoadMerger &merger) @@ -133,7 +129,6 @@ int ObTableLoadTableCompactCtx::init(ObTableLoadStoreCtx *store_ctx, ObTableLoad if (OB_FAIL(result_.init())) { LOG_WARN("fail to init result", KR(ret)); } else { - allocator_.set_tenant_id(MTL_ID()); store_ctx_ = store_ctx; merger_ = &merger; } @@ -146,37 +141,61 @@ bool ObTableLoadTableCompactCtx::is_valid() const return nullptr != store_ctx_ && nullptr != merger_; } -ObTableLoadTableCompactor *ObTableLoadTableCompactCtx::new_compactor() +int ObTableLoadTableCompactCtx::new_compactor() { - ObTableLoadTableCompactor *ret = nullptr; - if (store_ctx_->is_multiple_mode_) { - if (store_ctx_->table_data_desc_.is_heap_table_) { - ret = OB_NEWx(ObTableLoadMultipleHeapTableCompactor, (&allocator_)); - } else { - ret = OB_NEWx(ObTableLoadMemCompactor, (&allocator_)); - } + int ret = OB_SUCCESS; + obsys::ObWLockGuard guard(rwlock_); + if (OB_NOT_NULL(compactor_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected not null compactor", KR(ret), KP(compactor_)); } else { - ret = OB_NEWx(ObTableLoadGeneralTableCompactor, (&allocator_)); + ObMemAttr attr(MTL_ID(), "TLD_Compactor"); + if (store_ctx_->is_multiple_mode_) { + if (store_ctx_->table_data_desc_.is_heap_table_) { + compactor_ = OB_NEW(ObTableLoadMultipleHeapTableCompactor, attr); + } else { + compactor_ = OB_NEW(ObTableLoadMemCompactor, attr); + } + } else { + compactor_ = OB_NEW(ObTableLoadGeneralTableCompactor, attr); + } + if (OB_ISNULL(compactor_)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to new ObTableLoadTableCompactor", KR(ret)); + } } return ret; } +void ObTableLoadTableCompactCtx::release_compactor() +{ + obsys::ObWLockGuard guard(rwlock_); + if (nullptr != compactor_) { + ObMemAttr attr(MTL_ID(), "TLD_Compactor"); + OB_DELETE(ObTableLoadTableCompactor, attr, compactor_); + compactor_ = nullptr; + } +} + int ObTableLoadTableCompactCtx::start() { int ret = OB_SUCCESS; - if (OB_ISNULL(compactor_ = new_compactor())) { - ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("fail to new ObTableLoadGeneralTableCompactor", KR(ret)); - } else if (OB_FAIL(compactor_->init(this))) { - LOG_WARN("fail to init compactor", KR(ret)); - } else if (OB_FAIL(compactor_->start())) { - LOG_WARN("fail to start compactor", KR(ret)); + if (OB_FAIL(new_compactor())) { + LOG_WARN("fail to new compactor", KR(ret)); + } else { + obsys::ObRLockGuard guard(rwlock_); + if (OB_FAIL(compactor_->init(this))) { + LOG_WARN("fail to init compactor", KR(ret)); + } else if (OB_FAIL(compactor_->start())) { + LOG_WARN("fail to start compactor", KR(ret)); + } } return ret; } void ObTableLoadTableCompactCtx::stop() { + obsys::ObRLockGuard guard(rwlock_); if (OB_NOT_NULL(compactor_)) { compactor_->stop(); } @@ -185,9 +204,7 @@ void ObTableLoadTableCompactCtx::stop() int ObTableLoadTableCompactCtx::handle_table_compact_success() { // release compactor - compactor_->~ObTableLoadTableCompactor(); - allocator_.free(compactor_); - compactor_ = nullptr; + release_compactor(); // notify merger return merger_->handle_table_compact_success(); } diff --git a/src/observer/table_load/ob_table_load_table_compactor.h b/src/observer/table_load/ob_table_load_table_compactor.h index 805aa1e328..bc3ac4bb47 100644 --- a/src/observer/table_load/ob_table_load_table_compactor.h +++ b/src/observer/table_load/ob_table_load_table_compactor.h @@ -60,12 +60,13 @@ public: int handle_table_compact_success(); TO_STRING_KV(KP_(store_ctx), KP_(merger), KP_(compactor)); private: - ObTableLoadTableCompactor *new_compactor(); + int new_compactor(); + void release_compactor(); public: - common::ObArenaAllocator allocator_; ObTableLoadStoreCtx *store_ctx_; ObTableLoadMerger *merger_; + mutable obsys::ObRWLock rwlock_; ObTableLoadTableCompactor *compactor_; ObTableLoadTableCompactResult result_; };