Fix direct load core at stop table compactor

This commit is contained in:
obdev
2024-02-08 17:58:42 +00:00
committed by ob-robot
parent 277099bcce
commit 8db66b6444
2 changed files with 46 additions and 28 deletions

View File

@ -110,17 +110,13 @@ void ObTableLoadTableCompactResult::release_all_table_data()
*/ */
ObTableLoadTableCompactCtx::ObTableLoadTableCompactCtx() ObTableLoadTableCompactCtx::ObTableLoadTableCompactCtx()
: allocator_("TLD_TCCtx"), store_ctx_(nullptr), merger_(nullptr), compactor_(nullptr) : store_ctx_(nullptr), merger_(nullptr), compactor_(nullptr)
{ {
} }
ObTableLoadTableCompactCtx::~ObTableLoadTableCompactCtx() ObTableLoadTableCompactCtx::~ObTableLoadTableCompactCtx()
{ {
if (nullptr != compactor_) { release_compactor();
compactor_->~ObTableLoadTableCompactor();
allocator_.free(compactor_);
compactor_ = nullptr;
}
} }
int ObTableLoadTableCompactCtx::init(ObTableLoadStoreCtx *store_ctx, ObTableLoadMerger &merger) int ObTableLoadTableCompactCtx::init(ObTableLoadStoreCtx *store_ctx, ObTableLoadMerger &merger)
@ -133,7 +129,6 @@ int ObTableLoadTableCompactCtx::init(ObTableLoadStoreCtx *store_ctx, ObTableLoad
if (OB_FAIL(result_.init())) { if (OB_FAIL(result_.init())) {
LOG_WARN("fail to init result", KR(ret)); LOG_WARN("fail to init result", KR(ret));
} else { } else {
allocator_.set_tenant_id(MTL_ID());
store_ctx_ = store_ctx; store_ctx_ = store_ctx;
merger_ = &merger; merger_ = &merger;
} }
@ -146,37 +141,61 @@ bool ObTableLoadTableCompactCtx::is_valid() const
return nullptr != store_ctx_ && nullptr != merger_; return nullptr != store_ctx_ && nullptr != merger_;
} }
ObTableLoadTableCompactor *ObTableLoadTableCompactCtx::new_compactor() int ObTableLoadTableCompactCtx::new_compactor()
{ {
ObTableLoadTableCompactor *ret = nullptr; int ret = OB_SUCCESS;
if (store_ctx_->is_multiple_mode_) { obsys::ObWLockGuard guard(rwlock_);
if (store_ctx_->table_data_desc_.is_heap_table_) { if (OB_NOT_NULL(compactor_)) {
ret = OB_NEWx(ObTableLoadMultipleHeapTableCompactor, (&allocator_)); ret = OB_ERR_UNEXPECTED;
} else { LOG_WARN("unexpected not null compactor", KR(ret), KP(compactor_));
ret = OB_NEWx(ObTableLoadMemCompactor, (&allocator_));
}
} else { } else {
ret = OB_NEWx(ObTableLoadGeneralTableCompactor, (&allocator_)); ObMemAttr attr(MTL_ID(), "TLD_Compactor");
if (store_ctx_->is_multiple_mode_) {
if (store_ctx_->table_data_desc_.is_heap_table_) {
compactor_ = OB_NEW(ObTableLoadMultipleHeapTableCompactor, attr);
} else {
compactor_ = OB_NEW(ObTableLoadMemCompactor, attr);
}
} else {
compactor_ = OB_NEW(ObTableLoadGeneralTableCompactor, attr);
}
if (OB_ISNULL(compactor_)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadTableCompactor", KR(ret));
}
} }
return ret; return ret;
} }
void ObTableLoadTableCompactCtx::release_compactor()
{
obsys::ObWLockGuard guard(rwlock_);
if (nullptr != compactor_) {
ObMemAttr attr(MTL_ID(), "TLD_Compactor");
OB_DELETE(ObTableLoadTableCompactor, attr, compactor_);
compactor_ = nullptr;
}
}
int ObTableLoadTableCompactCtx::start() int ObTableLoadTableCompactCtx::start()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_ISNULL(compactor_ = new_compactor())) { if (OB_FAIL(new_compactor())) {
ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new compactor", KR(ret));
LOG_WARN("fail to new ObTableLoadGeneralTableCompactor", KR(ret)); } else {
} else if (OB_FAIL(compactor_->init(this))) { obsys::ObRLockGuard guard(rwlock_);
LOG_WARN("fail to init compactor", KR(ret)); if (OB_FAIL(compactor_->init(this))) {
} else if (OB_FAIL(compactor_->start())) { LOG_WARN("fail to init compactor", KR(ret));
LOG_WARN("fail to start compactor", KR(ret)); } else if (OB_FAIL(compactor_->start())) {
LOG_WARN("fail to start compactor", KR(ret));
}
} }
return ret; return ret;
} }
void ObTableLoadTableCompactCtx::stop() void ObTableLoadTableCompactCtx::stop()
{ {
obsys::ObRLockGuard guard(rwlock_);
if (OB_NOT_NULL(compactor_)) { if (OB_NOT_NULL(compactor_)) {
compactor_->stop(); compactor_->stop();
} }
@ -185,9 +204,7 @@ void ObTableLoadTableCompactCtx::stop()
int ObTableLoadTableCompactCtx::handle_table_compact_success() int ObTableLoadTableCompactCtx::handle_table_compact_success()
{ {
// release compactor // release compactor
compactor_->~ObTableLoadTableCompactor(); release_compactor();
allocator_.free(compactor_);
compactor_ = nullptr;
// notify merger // notify merger
return merger_->handle_table_compact_success(); return merger_->handle_table_compact_success();
} }

View File

@ -60,12 +60,13 @@ public:
int handle_table_compact_success(); int handle_table_compact_success();
TO_STRING_KV(KP_(store_ctx), KP_(merger), KP_(compactor)); TO_STRING_KV(KP_(store_ctx), KP_(merger), KP_(compactor));
private: private:
ObTableLoadTableCompactor *new_compactor(); int new_compactor();
void release_compactor();
public: public:
common::ObArenaAllocator allocator_;
ObTableLoadStoreCtx *store_ctx_; ObTableLoadStoreCtx *store_ctx_;
ObTableLoadMerger *merger_; ObTableLoadMerger *merger_;
mutable obsys::ObRWLock rwlock_;
ObTableLoadTableCompactor *compactor_; ObTableLoadTableCompactor *compactor_;
ObTableLoadTableCompactResult result_; ObTableLoadTableCompactResult result_;
}; };