fix direct load compact hang

This commit is contained in:
suz-yang
2024-06-18 04:21:39 +00:00
committed by ob-robot
parent 7d9ca152f5
commit e2661c1548
2 changed files with 149 additions and 41 deletions

View File

@ -112,13 +112,12 @@ void ObTableLoadTableCompactResult::release_all_table_data()
*/
ObTableLoadTableCompactCtx::ObTableLoadTableCompactCtx()
: store_ctx_(nullptr), merger_(nullptr), compactor_(nullptr)
: store_ctx_(nullptr), merger_(nullptr)
{
}
ObTableLoadTableCompactCtx::~ObTableLoadTableCompactCtx()
{
release_compactor();
}
int ObTableLoadTableCompactCtx::init(ObTableLoadStoreCtx *store_ctx, ObTableLoadMerger &merger)
@ -143,28 +142,40 @@ bool ObTableLoadTableCompactCtx::is_valid() const
return nullptr != store_ctx_ && nullptr != merger_;
}
int ObTableLoadTableCompactCtx::new_compactor()
int ObTableLoadTableCompactCtx::new_compactor(ObTableLoadTableCompactorHandle &compactor_handle)
{
int ret = OB_SUCCESS;
compactor_handle.reset();
ObTableLoadTableCompactor *compactor = nullptr;
obsys::ObWLockGuard guard(rwlock_);
if (OB_NOT_NULL(compactor_)) {
if (OB_UNLIKELY(compactor_handle_.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected not null compactor", KR(ret), KP(compactor_));
LOG_WARN("unexpected not null compactor", KR(ret), K(compactor_handle_));
} else {
ObMemAttr attr(MTL_ID(), "TLD_Compactor");
if (store_ctx_->is_multiple_mode_) {
if (store_ctx_->table_data_desc_.is_heap_table_) {
compactor_ = OB_NEW(ObTableLoadMultipleHeapTableCompactor, attr);
compactor = OB_NEW(ObTableLoadMultipleHeapTableCompactor, attr);
} else {
compactor_ = OB_NEW(ObTableLoadMemCompactor, attr);
compactor = OB_NEW(ObTableLoadMemCompactor, attr);
}
} else {
// 有主键表不排序
compactor_ = OB_NEW(ObTableLoadParallelMergeTableCompactor, attr);
compactor = OB_NEW(ObTableLoadParallelMergeTableCompactor, attr);
}
if (OB_ISNULL(compactor_)) {
if (OB_ISNULL(compactor)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadTableCompactor", KR(ret));
} else if (OB_FAIL(compactor_handle_.set_compactor(compactor))) {
LOG_WARN("fail to set compactor", KR(ret));
} else {
compactor_handle = compactor_handle_;
}
if (OB_FAIL(ret)) {
if (nullptr != compactor) {
OB_DELETE(ObTableLoadTableCompactor, attr, compactor);
compactor = nullptr;
}
}
}
return ret;
@ -172,24 +183,36 @@ int ObTableLoadTableCompactCtx::new_compactor()
void ObTableLoadTableCompactCtx::release_compactor()
{
obsys::ObWLockGuard guard(rwlock_);
if (nullptr != compactor_) {
ObMemAttr attr(MTL_ID(), "TLD_Compactor");
OB_DELETE(ObTableLoadTableCompactor, attr, compactor_);
compactor_ = nullptr;
ObTableLoadTableCompactorHandle compactor_handle;
{
obsys::ObWLockGuard guard(rwlock_);
compactor_handle = compactor_handle_;
compactor_handle_.reset();
}
}
int ObTableLoadTableCompactCtx::get_compactor(ObTableLoadTableCompactorHandle &compactor_handle)
{
int ret = OB_SUCCESS;
obsys::ObRLockGuard guard(rwlock_);
compactor_handle = compactor_handle_;
return ret;
}
int ObTableLoadTableCompactCtx::start()
{
int ret = OB_SUCCESS;
if (OB_FAIL(new_compactor())) {
ObTableLoadTableCompactorHandle compactor_handle;
if (OB_FAIL(new_compactor(compactor_handle))) {
LOG_WARN("fail to new compactor", KR(ret));
} else if (OB_UNLIKELY(!compactor_handle.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected invalid compactor handle", KR(ret), K(compactor_handle));
} else {
obsys::ObRLockGuard guard(rwlock_);
if (OB_FAIL(compactor_->init(this))) {
ObTableLoadTableCompactor *compactor = compactor_handle.get_compactor();
if (OB_FAIL(compactor->init(this))) {
LOG_WARN("fail to init compactor", KR(ret));
} else if (OB_FAIL(compactor_->start())) {
} else if (OB_FAIL(compactor->start())) {
LOG_WARN("fail to start compactor", KR(ret));
}
}
@ -198,9 +221,13 @@ int ObTableLoadTableCompactCtx::start()
void ObTableLoadTableCompactCtx::stop()
{
obsys::ObRLockGuard guard(rwlock_);
if (OB_NOT_NULL(compactor_)) {
compactor_->stop();
int ret = OB_SUCCESS;
ObTableLoadTableCompactorHandle compactor_handle;
if (OB_FAIL(get_compactor(compactor_handle))) {
LOG_WARN("fail to get compactor", KR(ret));
} else if (compactor_handle.is_valid()) {
ObTableLoadTableCompactor *compactor = compactor_handle.get_compactor();
compactor->stop();
}
}
@ -217,7 +244,7 @@ int ObTableLoadTableCompactCtx::handle_table_compact_success()
*/
ObTableLoadTableCompactor::ObTableLoadTableCompactor()
: compact_ctx_(nullptr), is_inited_(false)
: compact_ctx_(nullptr), ref_cnt_(0), is_inited_(false)
{
}
@ -245,5 +272,52 @@ int ObTableLoadTableCompactor::init(ObTableLoadTableCompactCtx *compact_ctx)
return ret;
}
/**
* ObTableLoadTableCompactorHandle
*/
ObTableLoadTableCompactorHandle &ObTableLoadTableCompactorHandle::operator =(const ObTableLoadTableCompactorHandle &other)
{
if (this != &other) {
reset();
if (OB_NOT_NULL(other.compactor_)) {
compactor_ = other.compactor_;
compactor_->inc_ref();
}
}
return *this;
}
void ObTableLoadTableCompactorHandle::reset()
{
if (nullptr != compactor_) {
const int64_t ref_cnt = compactor_->dec_ref();
if (0 == ref_cnt) {
ObMemAttr attr(MTL_ID(), "TLD_Compactor");
OB_DELETE(ObTableLoadTableCompactor, attr, compactor_);
}
compactor_ = nullptr;
}
}
bool ObTableLoadTableCompactorHandle::is_valid() const
{
return nullptr != compactor_;
}
int ObTableLoadTableCompactorHandle::set_compactor(ObTableLoadTableCompactor *compactor)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(compactor)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KP(compactor));
} else {
reset();
compactor_ = compactor;
compactor_->inc_ref();
}
return ret;
}
} // namespace observer
} // namespace oceanbase

View File

@ -23,7 +23,7 @@ namespace observer
{
class ObTableLoadStoreCtx;
class ObTableLoadMerger;
class ObTableLoadTableCompactor;
class ObTableLoadTableCompactCtx;
struct ObTableLoadTableCompactTabletResult : public common::LinkHashValue<common::ObTabletID>
{
@ -51,6 +51,54 @@ public:
TabletResultMap tablet_result_map_;
};
class ObTableLoadTableCompactor
{
public:
ObTableLoadTableCompactor();
virtual ~ObTableLoadTableCompactor();
int init(ObTableLoadTableCompactCtx *compact_ctx);
virtual int start() = 0;
virtual void stop() = 0;
OB_INLINE int64_t inc_ref()
{
const int64_t cnt = ATOMIC_AAF(&ref_cnt_, 1);
return cnt;
}
OB_INLINE int64_t dec_ref()
{
const int64_t cnt = ATOMIC_SAF(&ref_cnt_, 1 /* just sub 1 */);
return cnt;
}
OB_INLINE int64_t get_ref() const { return ATOMIC_LOAD(&ref_cnt_); }
protected:
virtual int inner_init() = 0;
protected:
ObTableLoadTableCompactCtx *compact_ctx_;
int64_t ref_cnt_;
bool is_inited_;
};
class ObTableLoadTableCompactorHandle
{
public:
ObTableLoadTableCompactorHandle() : compactor_(nullptr) {}
ObTableLoadTableCompactorHandle(const ObTableLoadTableCompactorHandle &other)
: compactor_(nullptr)
{
*this = other;
}
ObTableLoadTableCompactorHandle &operator=(const ObTableLoadTableCompactorHandle &other);
~ObTableLoadTableCompactorHandle() { reset(); }
void reset();
bool is_valid() const;
int set_compactor(ObTableLoadTableCompactor *compactor);
ObTableLoadTableCompactor *get_compactor() const { return compactor_; }
TO_STRING_KV(KP_(compactor));
private:
ObTableLoadTableCompactor *compactor_;
};
class ObTableLoadTableCompactCtx
{
public:
@ -61,33 +109,19 @@ public:
int start();
void stop();
int handle_table_compact_success();
TO_STRING_KV(KP_(store_ctx), KP_(merger), KP_(compactor));
TO_STRING_KV(KP_(store_ctx), KP_(merger), K_(compactor_handle));
private:
int new_compactor();
int new_compactor(ObTableLoadTableCompactorHandle &compactor_handle);
void release_compactor();
int get_compactor(ObTableLoadTableCompactorHandle &compactor_handle);
public:
ObTableLoadStoreCtx *store_ctx_;
ObTableLoadMerger *merger_;
mutable obsys::ObRWLock rwlock_;
ObTableLoadTableCompactor *compactor_;
ObTableLoadTableCompactorHandle compactor_handle_;
ObTableLoadTableCompactResult result_;
};
class ObTableLoadTableCompactor
{
public:
ObTableLoadTableCompactor();
virtual ~ObTableLoadTableCompactor();
int init(ObTableLoadTableCompactCtx *compact_ctx);
virtual int start() = 0;
virtual void stop() = 0;
protected:
virtual int inner_init() = 0;
protected:
ObTableLoadTableCompactCtx *compact_ctx_;
bool is_inited_;
};
} // namespace observer
} // namespace oceanbase