diff --git a/src/observer/table_load/ob_table_load_manager.cpp b/src/observer/table_load/ob_table_load_manager.cpp index 8eec82bc9e..85ce64c883 100644 --- a/src/observer/table_load/ob_table_load_manager.cpp +++ b/src/observer/table_load/ob_table_load_manager.cpp @@ -36,8 +36,8 @@ int ObTableLoadManager::init() if (OB_FAIL( table_ctx_map_.create(bucket_num, "TLD_TableCtxMgr", "TLD_TableCtxMgr", MTL_ID()))) { LOG_WARN("fail to create hashmap", KR(ret), K(bucket_num)); - } else if (OB_FAIL(table_handle_map_.create(bucket_num, "TLD_TableCtxMgr", "TLD_TableCtxMgr", - MTL_ID()))) { + } else if (OB_FAIL(table_ctx_index_map_.create(bucket_num, "TLD_TblCtxIMgr", "TLD_TblCtxIMgr", + MTL_ID()))) { LOG_WARN("fail to create hashmap", KR(ret), K(bucket_num)); } else { is_inited_ = true; @@ -61,9 +61,6 @@ int ObTableLoadManager::add_table_ctx(const ObTableLoadUniqueKey &key, LOG_WARN("unexpected dirty table ctx", KR(ret), KP(table_ctx)); } else { const uint64_t table_id = key.table_id_; - TableHandle table_handle; - table_handle.key_ = key; - table_handle.table_ctx_ = table_ctx; obsys::ObWLockGuard guard(rwlock_); if (OB_FAIL(table_ctx_map_.set_refactored(key, table_ctx))) { if (OB_UNLIKELY(OB_HASH_EXIST != ret)) { @@ -72,8 +69,8 @@ int ObTableLoadManager::add_table_ctx(const ObTableLoadUniqueKey &key, ret = OB_ENTRY_EXIST; } } - // force update table index - else if (OB_FAIL(table_handle_map_.set_refactored(table_id, table_handle, 1))) { + // force update table ctx index + else if (OB_FAIL(table_ctx_index_map_.set_refactored(table_id, table_ctx, 1))) { LOG_WARN("fail to set refactored", KR(ret), K(table_id)); // erase from table ctx map, avoid wild pointer is been use int tmp_ret = OB_SUCCESS; @@ -87,38 +84,39 @@ int ObTableLoadManager::add_table_ctx(const ObTableLoadUniqueKey &key, return ret; } -int ObTableLoadManager::remove_table_ctx(const ObTableLoadUniqueKey &key) +int ObTableLoadManager::remove_table_ctx(const ObTableLoadUniqueKey &key, + ObTableLoadTableCtx *table_ctx) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObTableLoadManager not init", KR(ret), KP(this)); - } else if (OB_UNLIKELY(!key.is_valid())) { + } else if (OB_UNLIKELY(!key.is_valid() || nullptr == table_ctx)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), K(key)); + LOG_WARN("invalid args", KR(ret), K(key), KP(table_ctx)); } else { - const uint64_t table_id = key.table_id_; - ObTableLoadTableCtx *table_ctx = nullptr; - TableHandle table_handle; { + const uint64_t table_id = key.table_id_; + HashMapEraseIfEqual erase_if_equal(table_ctx); + bool is_erased = false; obsys::ObWLockGuard guard(rwlock_); - if (OB_FAIL(table_ctx_map_.erase_refactored(key, &table_ctx))) { + if (OB_FAIL(table_ctx_map_.erase_if(key, erase_if_equal, is_erased))) { if (OB_UNLIKELY(OB_HASH_NOT_EXIST == ret)) { LOG_WARN("fail to erase refactored", KR(ret), K(key)); } else { ret = OB_ENTRY_NOT_EXIST; } + } else if (OB_UNLIKELY(!is_erased)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected table ctx", KR(ret), KPC(table_ctx)); } - // remove table index if key match - else if (OB_FAIL(table_handle_map_.get_refactored(table_id, table_handle))) { + // try remove table ctx index + else if (OB_FAIL(table_ctx_index_map_.erase_if(table_id, erase_if_equal, is_erased))) { if (OB_UNLIKELY(OB_HASH_NOT_EXIST == ret)) { LOG_WARN("fail to get refactored", KR(ret), K(table_id)); } else { ret = OB_SUCCESS; } - } else if (table_handle.key_ == key && - OB_FAIL(table_handle_map_.erase_refactored(table_id))) { - LOG_WARN("fail to erase refactored", KR(ret), K(table_id), K(table_handle)); } } if (OB_SUCC(ret)) { @@ -138,14 +136,12 @@ int ObTableLoadManager::get_all_table_ctx(ObIArray &table LOG_WARN("ObTableLoadManager not init", KR(ret), KP(this)); } else { table_ctx_array.reset(); - obsys::ObWLockGuard guard(rwlock_); + obsys::ObRLockGuard guard(rwlock_); for (TableCtxMap::const_iterator iter = table_ctx_map_.begin(); OB_SUCC(ret) && iter != table_ctx_map_.end(); ++iter) { const ObTableLoadUniqueKey &key = iter->first; ObTableLoadTableCtx *table_ctx = iter->second; - if (OB_FAIL(add_dirty_list(table_ctx))) { - LOG_WARN("fail to add dirty list", KR(ret), K(key), KP(table_ctx)); - } else if (OB_FAIL(table_ctx_array.push_back(table_ctx))) { + if (OB_FAIL(table_ctx_array.push_back(table_ctx))) { LOG_WARN("fail to push back", KR(ret), K(key)); } else { table_ctx->inc_ref_count(); @@ -194,16 +190,14 @@ int ObTableLoadManager::get_table_ctx_by_table_id(uint64_t table_id, LOG_WARN("ObTableLoadManager not init", KR(ret), KP(this)); } else { table_ctx = nullptr; - TableHandle table_handle; obsys::ObRLockGuard guard(rwlock_); - if (OB_FAIL(table_handle_map_.get_refactored(table_id, table_handle))) { + if (OB_FAIL(table_ctx_index_map_.get_refactored(table_id, table_ctx))) { if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) { LOG_WARN("fail to get refactored", KR(ret), K(table_id)); } else { ret = OB_ENTRY_NOT_EXIST; } } else { - table_ctx = table_handle.table_ctx_; table_ctx->inc_ref_count(); } } @@ -256,6 +250,7 @@ void ObTableLoadManager::put_table_ctx(ObTableLoadTableCtx *table_ctx) ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), KP(table_ctx)); } else { + // not release here OB_ASSERT(table_ctx->dec_ref_count() >= 0); } } @@ -266,24 +261,6 @@ int64_t ObTableLoadManager::get_table_ctx_count() const return table_ctx_map_.size(); } -int64_t ObTableLoadManager::get_dirty_list_count() const -{ - ObMutexGuard guard(mutex_); - return dirty_list_.get_size(); -} - -bool ObTableLoadManager::is_table_ctx_empty() const -{ - obsys::ObRLockGuard guard(rwlock_); - return table_ctx_map_.empty(); -} - -bool ObTableLoadManager::is_dirty_list_empty() const -{ - ObMutexGuard guard(mutex_); - return dirty_list_.is_empty(); -} - int ObTableLoadManager::add_dirty_list(ObTableLoadTableCtx *table_ctx) { int ret = OB_SUCCESS; @@ -326,5 +303,11 @@ int ObTableLoadManager::get_releasable_table_ctx_list( return ret; } +int64_t ObTableLoadManager::get_dirty_list_count() const +{ + ObMutexGuard guard(mutex_); + return dirty_list_.get_size(); +} + } // namespace observer } // namespace oceanbase diff --git a/src/observer/table_load/ob_table_load_manager.h b/src/observer/table_load/ob_table_load_manager.h index 31dacd7dc9..c03cfe3936 100644 --- a/src/observer/table_load/ob_table_load_manager.h +++ b/src/observer/table_load/ob_table_load_manager.h @@ -23,7 +23,7 @@ public: int init(); // table ctx holds a reference count int add_table_ctx(const ObTableLoadUniqueKey &key, ObTableLoadTableCtx *table_ctx); - int remove_table_ctx(const ObTableLoadUniqueKey &key); + int remove_table_ctx(const ObTableLoadUniqueKey &key, ObTableLoadTableCtx *table_ctx); // table ctx holds a reference count int get_all_table_ctx(common::ObIArray &table_ctx_array); // table ctx holds a reference count @@ -34,33 +34,42 @@ public: int get_inactive_table_ctx_list(common::ObIArray &table_ctx_array); void put_table_ctx(ObTableLoadTableCtx *table_ctx); int64_t get_table_ctx_count() const; - int64_t get_dirty_list_count() const; - bool is_table_ctx_empty() const; - bool is_dirty_list_empty() const; // table ctx no reference counting int get_releasable_table_ctx_list(common::ObIArray &table_ctx_array); -public: + int64_t get_dirty_list_count() const; +private: int add_dirty_list(ObTableLoadTableCtx *table_ctx); private: - struct TableHandle - { - public: - TableHandle() : table_ctx_(nullptr) {} - TO_STRING_KV(K_(key), KP_(table_ctx)); - public: - ObTableLoadUniqueKey key_; - ObTableLoadTableCtx *table_ctx_; - }; // key => table_ctx typedef common::hash::ObHashMap TableCtxMap; - // table_id => table_handle - typedef common::hash::ObHashMap - TableHandleMap; + // table_id => table_ctx + typedef common::hash::ObHashMap + TableCtxIndexMap; + + class HashMapEraseIfEqual + { + public: + HashMapEraseIfEqual(ObTableLoadTableCtx *table_ctx) : table_ctx_(table_ctx) {} + bool operator()( + common::hash::HashMapPair &entry) const + { + return table_ctx_ == entry.second; + } + bool operator()(common::hash::HashMapPair &entry) const + { + return table_ctx_ == entry.second; + } + public: + ObTableLoadTableCtx *table_ctx_; + }; +private: mutable obsys::ObRWLock rwlock_; TableCtxMap table_ctx_map_; - TableHandleMap table_handle_map_; // index of the latest task + TableCtxIndexMap table_ctx_index_map_; // index of the latest task + // for release table ctx in background mutable lib::ObMutex mutex_; common::ObDList dirty_list_; bool is_inited_; diff --git a/src/observer/table_load/ob_table_load_service.cpp b/src/observer/table_load/ob_table_load_service.cpp index bc0d805994..9d788c4db0 100644 --- a/src/observer/table_load/ob_table_load_service.cpp +++ b/src/observer/table_load/ob_table_load_service.cpp @@ -282,7 +282,7 @@ int ObTableLoadService::remove_ctx(ObTableLoadTableCtx *table_ctx) LOG_WARN("null table load service", KR(ret)); } else { ObTableLoadUniqueKey key(table_ctx->param_.table_id_, table_ctx->ddl_param_.task_id_); - ret = service->get_manager().remove_table_ctx(key); + ret = service->get_manager().remove_table_ctx(key, table_ctx); } return ret; } @@ -361,14 +361,14 @@ int ObTableLoadService::start() ret = OB_NOT_INIT; LOG_WARN("ObTableLoadService not init", KR(ret), KP(this)); } else { - gc_timer_.set_run_wrapper(MTL_CTX()); - if (OB_FAIL(gc_timer_.init("TLD_GC", ObMemAttr(MTL_ID(), "GC_TIMER")))) { + timer_.set_run_wrapper(MTL_CTX()); + if (OB_FAIL(timer_.init("TLD_Timer", ObMemAttr(MTL_ID(), "TLD_TIMER")))) { LOG_WARN("fail to init gc timer", KR(ret)); - } else if (OB_FAIL(gc_timer_.schedule(check_tenant_task_, CHECK_TENANT_INTERVAL, true))) { + } else if (OB_FAIL(timer_.schedule(check_tenant_task_, CHECK_TENANT_INTERVAL, true))) { LOG_WARN("fail to schedule check tenant task", KR(ret)); - } else if (OB_FAIL(gc_timer_.schedule(gc_task_, GC_INTERVAL, true))) { + } else if (OB_FAIL(timer_.schedule(gc_task_, GC_INTERVAL, true))) { LOG_WARN("fail to schedule gc task", KR(ret)); - } else if (OB_FAIL(gc_timer_.schedule(release_task_, RELEASE_INTERVAL, true))) { + } else if (OB_FAIL(timer_.schedule(release_task_, RELEASE_INTERVAL, true))) { LOG_WARN("fail to schedule release task", KR(ret)); } } @@ -379,20 +379,20 @@ int ObTableLoadService::stop() { int ret = OB_SUCCESS; is_stop_ = true; - gc_timer_.stop(); + timer_.stop(); return ret; } void ObTableLoadService::wait() { - gc_timer_.wait(); + timer_.wait(); release_all_ctx(); } void ObTableLoadService::destroy() { is_inited_ = false; - gc_timer_.destroy(); + timer_.destroy(); } void ObTableLoadService::fail_all_ctx(int error_code) @@ -452,7 +452,7 @@ void ObTableLoadService::release_all_ctx() manager_.put_table_ctx(table_ctx); } } - if (manager_.is_table_ctx_empty()) { + if (0 == manager_.get_table_ctx_count()) { break; } else { ob_usleep(1 * 1000 * 1000); @@ -474,7 +474,7 @@ void ObTableLoadService::release_all_ctx() LOG_INFO("free table ctx", K(tenant_id), K(table_id), K(hidden_table_id), KP(table_ctx)); ObTableLoadService::free_ctx(table_ctx); } - if (manager_.is_dirty_list_empty()) { + if (0 == manager_.get_dirty_list_count()) { break; } else { ob_usleep(1 * 1000 * 1000); diff --git a/src/observer/table_load/ob_table_load_service.h b/src/observer/table_load/ob_table_load_service.h index 6c4030b96f..7d9478596b 100644 --- a/src/observer/table_load/ob_table_load_service.h +++ b/src/observer/table_load/ob_table_load_service.h @@ -85,7 +85,7 @@ private: }; private: ObTableLoadManager manager_; - common::ObTimer gc_timer_; + common::ObTimer timer_; ObCheckTenantTask check_tenant_task_; ObGCTask gc_task_; ObReleaseTask release_task_;