Fix direct load core at stop table compactor
This commit is contained in:
@ -110,17 +110,13 @@ void ObTableLoadTableCompactResult::release_all_table_data()
|
||||
*/
|
||||
|
||||
ObTableLoadTableCompactCtx::ObTableLoadTableCompactCtx()
|
||||
: allocator_("TLD_TCCtx"), store_ctx_(nullptr), merger_(nullptr), compactor_(nullptr)
|
||||
: store_ctx_(nullptr), merger_(nullptr), compactor_(nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
ObTableLoadTableCompactCtx::~ObTableLoadTableCompactCtx()
|
||||
{
|
||||
if (nullptr != compactor_) {
|
||||
compactor_->~ObTableLoadTableCompactor();
|
||||
allocator_.free(compactor_);
|
||||
compactor_ = nullptr;
|
||||
}
|
||||
release_compactor();
|
||||
}
|
||||
|
||||
int ObTableLoadTableCompactCtx::init(ObTableLoadStoreCtx *store_ctx, ObTableLoadMerger &merger)
|
||||
@ -133,7 +129,6 @@ int ObTableLoadTableCompactCtx::init(ObTableLoadStoreCtx *store_ctx, ObTableLoad
|
||||
if (OB_FAIL(result_.init())) {
|
||||
LOG_WARN("fail to init result", KR(ret));
|
||||
} else {
|
||||
allocator_.set_tenant_id(MTL_ID());
|
||||
store_ctx_ = store_ctx;
|
||||
merger_ = &merger;
|
||||
}
|
||||
@ -146,37 +141,61 @@ bool ObTableLoadTableCompactCtx::is_valid() const
|
||||
return nullptr != store_ctx_ && nullptr != merger_;
|
||||
}
|
||||
|
||||
ObTableLoadTableCompactor *ObTableLoadTableCompactCtx::new_compactor()
|
||||
int ObTableLoadTableCompactCtx::new_compactor()
|
||||
{
|
||||
ObTableLoadTableCompactor *ret = nullptr;
|
||||
if (store_ctx_->is_multiple_mode_) {
|
||||
if (store_ctx_->table_data_desc_.is_heap_table_) {
|
||||
ret = OB_NEWx(ObTableLoadMultipleHeapTableCompactor, (&allocator_));
|
||||
} else {
|
||||
ret = OB_NEWx(ObTableLoadMemCompactor, (&allocator_));
|
||||
}
|
||||
int ret = OB_SUCCESS;
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
if (OB_NOT_NULL(compactor_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected not null compactor", KR(ret), KP(compactor_));
|
||||
} else {
|
||||
ret = OB_NEWx(ObTableLoadGeneralTableCompactor, (&allocator_));
|
||||
ObMemAttr attr(MTL_ID(), "TLD_Compactor");
|
||||
if (store_ctx_->is_multiple_mode_) {
|
||||
if (store_ctx_->table_data_desc_.is_heap_table_) {
|
||||
compactor_ = OB_NEW(ObTableLoadMultipleHeapTableCompactor, attr);
|
||||
} else {
|
||||
compactor_ = OB_NEW(ObTableLoadMemCompactor, attr);
|
||||
}
|
||||
} else {
|
||||
compactor_ = OB_NEW(ObTableLoadGeneralTableCompactor, attr);
|
||||
}
|
||||
if (OB_ISNULL(compactor_)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to new ObTableLoadTableCompactor", KR(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTableLoadTableCompactCtx::release_compactor()
|
||||
{
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
if (nullptr != compactor_) {
|
||||
ObMemAttr attr(MTL_ID(), "TLD_Compactor");
|
||||
OB_DELETE(ObTableLoadTableCompactor, attr, compactor_);
|
||||
compactor_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
int ObTableLoadTableCompactCtx::start()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(compactor_ = new_compactor())) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to new ObTableLoadGeneralTableCompactor", KR(ret));
|
||||
} else if (OB_FAIL(compactor_->init(this))) {
|
||||
LOG_WARN("fail to init compactor", KR(ret));
|
||||
} else if (OB_FAIL(compactor_->start())) {
|
||||
LOG_WARN("fail to start compactor", KR(ret));
|
||||
if (OB_FAIL(new_compactor())) {
|
||||
LOG_WARN("fail to new compactor", KR(ret));
|
||||
} else {
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
if (OB_FAIL(compactor_->init(this))) {
|
||||
LOG_WARN("fail to init compactor", KR(ret));
|
||||
} else if (OB_FAIL(compactor_->start())) {
|
||||
LOG_WARN("fail to start compactor", KR(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTableLoadTableCompactCtx::stop()
|
||||
{
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
if (OB_NOT_NULL(compactor_)) {
|
||||
compactor_->stop();
|
||||
}
|
||||
@ -185,9 +204,7 @@ void ObTableLoadTableCompactCtx::stop()
|
||||
int ObTableLoadTableCompactCtx::handle_table_compact_success()
|
||||
{
|
||||
// release compactor
|
||||
compactor_->~ObTableLoadTableCompactor();
|
||||
allocator_.free(compactor_);
|
||||
compactor_ = nullptr;
|
||||
release_compactor();
|
||||
// notify merger
|
||||
return merger_->handle_table_compact_success();
|
||||
}
|
||||
|
@ -60,12 +60,13 @@ public:
|
||||
int handle_table_compact_success();
|
||||
TO_STRING_KV(KP_(store_ctx), KP_(merger), KP_(compactor));
|
||||
private:
|
||||
ObTableLoadTableCompactor *new_compactor();
|
||||
int new_compactor();
|
||||
void release_compactor();
|
||||
|
||||
public:
|
||||
common::ObArenaAllocator allocator_;
|
||||
ObTableLoadStoreCtx *store_ctx_;
|
||||
ObTableLoadMerger *merger_;
|
||||
mutable obsys::ObRWLock rwlock_;
|
||||
ObTableLoadTableCompactor *compactor_;
|
||||
ObTableLoadTableCompactResult result_;
|
||||
};
|
||||
|
Reference in New Issue
Block a user