Fix direct load manager get_all_table_ctx
This commit is contained in:
@ -36,8 +36,8 @@ int ObTableLoadManager::init()
|
||||
if (OB_FAIL(
|
||||
table_ctx_map_.create(bucket_num, "TLD_TableCtxMgr", "TLD_TableCtxMgr", MTL_ID()))) {
|
||||
LOG_WARN("fail to create hashmap", KR(ret), K(bucket_num));
|
||||
} else if (OB_FAIL(table_handle_map_.create(bucket_num, "TLD_TableCtxMgr", "TLD_TableCtxMgr",
|
||||
MTL_ID()))) {
|
||||
} else if (OB_FAIL(table_ctx_index_map_.create(bucket_num, "TLD_TblCtxIMgr", "TLD_TblCtxIMgr",
|
||||
MTL_ID()))) {
|
||||
LOG_WARN("fail to create hashmap", KR(ret), K(bucket_num));
|
||||
} else {
|
||||
is_inited_ = true;
|
||||
@ -61,9 +61,6 @@ int ObTableLoadManager::add_table_ctx(const ObTableLoadUniqueKey &key,
|
||||
LOG_WARN("unexpected dirty table ctx", KR(ret), KP(table_ctx));
|
||||
} else {
|
||||
const uint64_t table_id = key.table_id_;
|
||||
TableHandle table_handle;
|
||||
table_handle.key_ = key;
|
||||
table_handle.table_ctx_ = table_ctx;
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
if (OB_FAIL(table_ctx_map_.set_refactored(key, table_ctx))) {
|
||||
if (OB_UNLIKELY(OB_HASH_EXIST != ret)) {
|
||||
@ -72,8 +69,8 @@ int ObTableLoadManager::add_table_ctx(const ObTableLoadUniqueKey &key,
|
||||
ret = OB_ENTRY_EXIST;
|
||||
}
|
||||
}
|
||||
// force update table index
|
||||
else if (OB_FAIL(table_handle_map_.set_refactored(table_id, table_handle, 1))) {
|
||||
// force update table ctx index
|
||||
else if (OB_FAIL(table_ctx_index_map_.set_refactored(table_id, table_ctx, 1))) {
|
||||
LOG_WARN("fail to set refactored", KR(ret), K(table_id));
|
||||
// erase from table ctx map, avoid wild pointer is been use
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
@ -87,38 +84,39 @@ int ObTableLoadManager::add_table_ctx(const ObTableLoadUniqueKey &key,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadManager::remove_table_ctx(const ObTableLoadUniqueKey &key)
|
||||
int ObTableLoadManager::remove_table_ctx(const ObTableLoadUniqueKey &key,
|
||||
ObTableLoadTableCtx *table_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadManager not init", KR(ret), KP(this));
|
||||
} else if (OB_UNLIKELY(!key.is_valid())) {
|
||||
} else if (OB_UNLIKELY(!key.is_valid() || nullptr == table_ctx)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(key));
|
||||
LOG_WARN("invalid args", KR(ret), K(key), KP(table_ctx));
|
||||
} else {
|
||||
const uint64_t table_id = key.table_id_;
|
||||
ObTableLoadTableCtx *table_ctx = nullptr;
|
||||
TableHandle table_handle;
|
||||
{
|
||||
const uint64_t table_id = key.table_id_;
|
||||
HashMapEraseIfEqual erase_if_equal(table_ctx);
|
||||
bool is_erased = false;
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
if (OB_FAIL(table_ctx_map_.erase_refactored(key, &table_ctx))) {
|
||||
if (OB_FAIL(table_ctx_map_.erase_if(key, erase_if_equal, is_erased))) {
|
||||
if (OB_UNLIKELY(OB_HASH_NOT_EXIST == ret)) {
|
||||
LOG_WARN("fail to erase refactored", KR(ret), K(key));
|
||||
} else {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
}
|
||||
} else if (OB_UNLIKELY(!is_erased)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpected table ctx", KR(ret), KPC(table_ctx));
|
||||
}
|
||||
// remove table index if key match
|
||||
else if (OB_FAIL(table_handle_map_.get_refactored(table_id, table_handle))) {
|
||||
// try remove table ctx index
|
||||
else if (OB_FAIL(table_ctx_index_map_.erase_if(table_id, erase_if_equal, is_erased))) {
|
||||
if (OB_UNLIKELY(OB_HASH_NOT_EXIST == ret)) {
|
||||
LOG_WARN("fail to get refactored", KR(ret), K(table_id));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
}
|
||||
} else if (table_handle.key_ == key &&
|
||||
OB_FAIL(table_handle_map_.erase_refactored(table_id))) {
|
||||
LOG_WARN("fail to erase refactored", KR(ret), K(table_id), K(table_handle));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -138,14 +136,12 @@ int ObTableLoadManager::get_all_table_ctx(ObIArray<ObTableLoadTableCtx *> &table
|
||||
LOG_WARN("ObTableLoadManager not init", KR(ret), KP(this));
|
||||
} else {
|
||||
table_ctx_array.reset();
|
||||
obsys::ObWLockGuard guard(rwlock_);
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
for (TableCtxMap::const_iterator iter = table_ctx_map_.begin();
|
||||
OB_SUCC(ret) && iter != table_ctx_map_.end(); ++iter) {
|
||||
const ObTableLoadUniqueKey &key = iter->first;
|
||||
ObTableLoadTableCtx *table_ctx = iter->second;
|
||||
if (OB_FAIL(add_dirty_list(table_ctx))) {
|
||||
LOG_WARN("fail to add dirty list", KR(ret), K(key), KP(table_ctx));
|
||||
} else if (OB_FAIL(table_ctx_array.push_back(table_ctx))) {
|
||||
if (OB_FAIL(table_ctx_array.push_back(table_ctx))) {
|
||||
LOG_WARN("fail to push back", KR(ret), K(key));
|
||||
} else {
|
||||
table_ctx->inc_ref_count();
|
||||
@ -194,16 +190,14 @@ int ObTableLoadManager::get_table_ctx_by_table_id(uint64_t table_id,
|
||||
LOG_WARN("ObTableLoadManager not init", KR(ret), KP(this));
|
||||
} else {
|
||||
table_ctx = nullptr;
|
||||
TableHandle table_handle;
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
if (OB_FAIL(table_handle_map_.get_refactored(table_id, table_handle))) {
|
||||
if (OB_FAIL(table_ctx_index_map_.get_refactored(table_id, table_ctx))) {
|
||||
if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) {
|
||||
LOG_WARN("fail to get refactored", KR(ret), K(table_id));
|
||||
} else {
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
}
|
||||
} else {
|
||||
table_ctx = table_handle.table_ctx_;
|
||||
table_ctx->inc_ref_count();
|
||||
}
|
||||
}
|
||||
@ -256,6 +250,7 @@ void ObTableLoadManager::put_table_ctx(ObTableLoadTableCtx *table_ctx)
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), KP(table_ctx));
|
||||
} else {
|
||||
// not release here
|
||||
OB_ASSERT(table_ctx->dec_ref_count() >= 0);
|
||||
}
|
||||
}
|
||||
@ -266,24 +261,6 @@ int64_t ObTableLoadManager::get_table_ctx_count() const
|
||||
return table_ctx_map_.size();
|
||||
}
|
||||
|
||||
int64_t ObTableLoadManager::get_dirty_list_count() const
|
||||
{
|
||||
ObMutexGuard guard(mutex_);
|
||||
return dirty_list_.get_size();
|
||||
}
|
||||
|
||||
bool ObTableLoadManager::is_table_ctx_empty() const
|
||||
{
|
||||
obsys::ObRLockGuard guard(rwlock_);
|
||||
return table_ctx_map_.empty();
|
||||
}
|
||||
|
||||
bool ObTableLoadManager::is_dirty_list_empty() const
|
||||
{
|
||||
ObMutexGuard guard(mutex_);
|
||||
return dirty_list_.is_empty();
|
||||
}
|
||||
|
||||
int ObTableLoadManager::add_dirty_list(ObTableLoadTableCtx *table_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -326,5 +303,11 @@ int ObTableLoadManager::get_releasable_table_ctx_list(
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t ObTableLoadManager::get_dirty_list_count() const
|
||||
{
|
||||
ObMutexGuard guard(mutex_);
|
||||
return dirty_list_.get_size();
|
||||
}
|
||||
|
||||
} // namespace observer
|
||||
} // namespace oceanbase
|
||||
|
@ -23,7 +23,7 @@ public:
|
||||
int init();
|
||||
// table ctx holds a reference count
|
||||
int add_table_ctx(const ObTableLoadUniqueKey &key, ObTableLoadTableCtx *table_ctx);
|
||||
int remove_table_ctx(const ObTableLoadUniqueKey &key);
|
||||
int remove_table_ctx(const ObTableLoadUniqueKey &key, ObTableLoadTableCtx *table_ctx);
|
||||
// table ctx holds a reference count
|
||||
int get_all_table_ctx(common::ObIArray<ObTableLoadTableCtx *> &table_ctx_array);
|
||||
// table ctx holds a reference count
|
||||
@ -34,33 +34,42 @@ public:
|
||||
int get_inactive_table_ctx_list(common::ObIArray<ObTableLoadTableCtx *> &table_ctx_array);
|
||||
void put_table_ctx(ObTableLoadTableCtx *table_ctx);
|
||||
int64_t get_table_ctx_count() const;
|
||||
int64_t get_dirty_list_count() const;
|
||||
bool is_table_ctx_empty() const;
|
||||
bool is_dirty_list_empty() const;
|
||||
// table ctx no reference counting
|
||||
int get_releasable_table_ctx_list(common::ObIArray<ObTableLoadTableCtx *> &table_ctx_array);
|
||||
public:
|
||||
int64_t get_dirty_list_count() const;
|
||||
private:
|
||||
int add_dirty_list(ObTableLoadTableCtx *table_ctx);
|
||||
private:
|
||||
struct TableHandle
|
||||
{
|
||||
public:
|
||||
TableHandle() : table_ctx_(nullptr) {}
|
||||
TO_STRING_KV(K_(key), KP_(table_ctx));
|
||||
public:
|
||||
ObTableLoadUniqueKey key_;
|
||||
ObTableLoadTableCtx *table_ctx_;
|
||||
};
|
||||
// key => table_ctx
|
||||
typedef common::hash::ObHashMap<ObTableLoadUniqueKey, ObTableLoadTableCtx *,
|
||||
common::hash::NoPthreadDefendMode>
|
||||
TableCtxMap;
|
||||
// table_id => table_handle
|
||||
typedef common::hash::ObHashMap<uint64_t, TableHandle, common::hash::NoPthreadDefendMode>
|
||||
TableHandleMap;
|
||||
// table_id => table_ctx
|
||||
typedef common::hash::ObHashMap<uint64_t, ObTableLoadTableCtx *,
|
||||
common::hash::NoPthreadDefendMode>
|
||||
TableCtxIndexMap;
|
||||
|
||||
class HashMapEraseIfEqual
|
||||
{
|
||||
public:
|
||||
HashMapEraseIfEqual(ObTableLoadTableCtx *table_ctx) : table_ctx_(table_ctx) {}
|
||||
bool operator()(
|
||||
common::hash::HashMapPair<ObTableLoadUniqueKey, ObTableLoadTableCtx *> &entry) const
|
||||
{
|
||||
return table_ctx_ == entry.second;
|
||||
}
|
||||
bool operator()(common::hash::HashMapPair<uint64_t, ObTableLoadTableCtx *> &entry) const
|
||||
{
|
||||
return table_ctx_ == entry.second;
|
||||
}
|
||||
public:
|
||||
ObTableLoadTableCtx *table_ctx_;
|
||||
};
|
||||
private:
|
||||
mutable obsys::ObRWLock rwlock_;
|
||||
TableCtxMap table_ctx_map_;
|
||||
TableHandleMap table_handle_map_; // index of the latest task
|
||||
TableCtxIndexMap table_ctx_index_map_; // index of the latest task
|
||||
// for release table ctx in background
|
||||
mutable lib::ObMutex mutex_;
|
||||
common::ObDList<ObTableLoadTableCtx> dirty_list_;
|
||||
bool is_inited_;
|
||||
|
@ -282,7 +282,7 @@ int ObTableLoadService::remove_ctx(ObTableLoadTableCtx *table_ctx)
|
||||
LOG_WARN("null table load service", KR(ret));
|
||||
} else {
|
||||
ObTableLoadUniqueKey key(table_ctx->param_.table_id_, table_ctx->ddl_param_.task_id_);
|
||||
ret = service->get_manager().remove_table_ctx(key);
|
||||
ret = service->get_manager().remove_table_ctx(key, table_ctx);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -361,14 +361,14 @@ int ObTableLoadService::start()
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("ObTableLoadService not init", KR(ret), KP(this));
|
||||
} else {
|
||||
gc_timer_.set_run_wrapper(MTL_CTX());
|
||||
if (OB_FAIL(gc_timer_.init("TLD_GC", ObMemAttr(MTL_ID(), "GC_TIMER")))) {
|
||||
timer_.set_run_wrapper(MTL_CTX());
|
||||
if (OB_FAIL(timer_.init("TLD_Timer", ObMemAttr(MTL_ID(), "TLD_TIMER")))) {
|
||||
LOG_WARN("fail to init gc timer", KR(ret));
|
||||
} else if (OB_FAIL(gc_timer_.schedule(check_tenant_task_, CHECK_TENANT_INTERVAL, true))) {
|
||||
} else if (OB_FAIL(timer_.schedule(check_tenant_task_, CHECK_TENANT_INTERVAL, true))) {
|
||||
LOG_WARN("fail to schedule check tenant task", KR(ret));
|
||||
} else if (OB_FAIL(gc_timer_.schedule(gc_task_, GC_INTERVAL, true))) {
|
||||
} else if (OB_FAIL(timer_.schedule(gc_task_, GC_INTERVAL, true))) {
|
||||
LOG_WARN("fail to schedule gc task", KR(ret));
|
||||
} else if (OB_FAIL(gc_timer_.schedule(release_task_, RELEASE_INTERVAL, true))) {
|
||||
} else if (OB_FAIL(timer_.schedule(release_task_, RELEASE_INTERVAL, true))) {
|
||||
LOG_WARN("fail to schedule release task", KR(ret));
|
||||
}
|
||||
}
|
||||
@ -379,20 +379,20 @@ int ObTableLoadService::stop()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_stop_ = true;
|
||||
gc_timer_.stop();
|
||||
timer_.stop();
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObTableLoadService::wait()
|
||||
{
|
||||
gc_timer_.wait();
|
||||
timer_.wait();
|
||||
release_all_ctx();
|
||||
}
|
||||
|
||||
void ObTableLoadService::destroy()
|
||||
{
|
||||
is_inited_ = false;
|
||||
gc_timer_.destroy();
|
||||
timer_.destroy();
|
||||
}
|
||||
|
||||
void ObTableLoadService::fail_all_ctx(int error_code)
|
||||
@ -452,7 +452,7 @@ void ObTableLoadService::release_all_ctx()
|
||||
manager_.put_table_ctx(table_ctx);
|
||||
}
|
||||
}
|
||||
if (manager_.is_table_ctx_empty()) {
|
||||
if (0 == manager_.get_table_ctx_count()) {
|
||||
break;
|
||||
} else {
|
||||
ob_usleep(1 * 1000 * 1000);
|
||||
@ -474,7 +474,7 @@ void ObTableLoadService::release_all_ctx()
|
||||
LOG_INFO("free table ctx", K(tenant_id), K(table_id), K(hidden_table_id), KP(table_ctx));
|
||||
ObTableLoadService::free_ctx(table_ctx);
|
||||
}
|
||||
if (manager_.is_dirty_list_empty()) {
|
||||
if (0 == manager_.get_dirty_list_count()) {
|
||||
break;
|
||||
} else {
|
||||
ob_usleep(1 * 1000 * 1000);
|
||||
|
@ -85,7 +85,7 @@ private:
|
||||
};
|
||||
private:
|
||||
ObTableLoadManager manager_;
|
||||
common::ObTimer gc_timer_;
|
||||
common::ObTimer timer_;
|
||||
ObCheckTenantTask check_tenant_task_;
|
||||
ObGCTask gc_task_;
|
||||
ObReleaseTask release_task_;
|
||||
|
Reference in New Issue
Block a user