diff --git a/mittest/mtlenv/mock_tenant_module_env.h b/mittest/mtlenv/mock_tenant_module_env.h index 0b36ecb0bc..2bed09d6ab 100644 --- a/mittest/mtlenv/mock_tenant_module_env.h +++ b/mittest/mtlenv/mock_tenant_module_env.h @@ -85,6 +85,7 @@ #include "logservice/palf/log_define.h" #include "storage/high_availability/ob_rebuild_service.h" #include "observer/table/ob_htable_lock_mgr.h" +#include "observer/table/ob_table_session_pool.h" namespace oceanbase { @@ -703,6 +704,7 @@ int MockTenantModuleEnv::init() MTL_BIND2(mtl_new_default, ObRebuildService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND(table::ObHTableLockMgr::mtl_init, table::ObHTableLockMgr::mtl_destroy); MTL_BIND2(mtl_new_default, omt::ObTenantSrs::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); + MTL_BIND2(mtl_new_default, table::ObTableApiSessPoolMgr::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); } if (OB_FAIL(ret)) { diff --git a/src/observer/omt/ob_multi_tenant.cpp b/src/observer/omt/ob_multi_tenant.cpp index 4fe9b58a82..d25e33d625 100644 --- a/src/observer/omt/ob_multi_tenant.cpp +++ b/src/observer/omt/ob_multi_tenant.cpp @@ -137,6 +137,7 @@ #include "share/errsim_module/ob_tenant_errsim_event_mgr.h" #endif #include "observer/table/ob_htable_lock_mgr.h" +#include "observer/table/ob_table_session_pool.h" using namespace oceanbase; using namespace oceanbase::lib; @@ -531,6 +532,7 @@ int ObMultiTenant::init(ObAddr myaddr, MTL_BIND2(mtl_new_default, ObSharedTimer::mtl_init, ObSharedTimer::mtl_start, ObSharedTimer::mtl_stop, ObSharedTimer::mtl_wait, mtl_destroy_default); MTL_BIND2(mtl_new_default, ObOptStatMonitorManager::mtl_init, ObOptStatMonitorManager::mtl_start, ObOptStatMonitorManager::mtl_stop, ObOptStatMonitorManager::mtl_wait, mtl_destroy_default); MTL_BIND2(mtl_new_default, ObTenantSrs::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); + MTL_BIND2(mtl_new_default, table::ObTableApiSessPoolMgr::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); } if (OB_SUCC(ret)) { diff --git a/src/observer/table/ob_table_context.cpp b/src/observer/table/ob_table_context.cpp index b42757e0f1..dbfc6d6d8a 100644 --- a/src/observer/table/ob_table_context.cpp +++ b/src/observer/table/ob_table_context.cpp @@ -65,7 +65,7 @@ int ObTableCtx::init_sess_info(ObTableApiCredential &credential) int ret = OB_SUCCESS; // try get session from session pool - if (OB_FAIL(GCTX.table_service_->get_sess_mgr().get_sess_info(credential, sess_guard_))) { + if (OB_FAIL(TABLEAPI_SESS_POOL_MGR->get_sess_info(credential, sess_guard_))) { LOG_WARN("fail to get session info", K(ret), K(credential)); } else if (OB_ISNULL(sess_guard_.get_sess_node_val())) { ret = OB_ERR_UNEXPECTED; diff --git a/src/observer/table/ob_table_rpc_processor.cpp b/src/observer/table/ob_table_rpc_processor.cpp index f59d889281..88ad1a3304 100644 --- a/src/observer/table/ob_table_rpc_processor.cpp +++ b/src/observer/table/ob_table_rpc_processor.cpp @@ -83,9 +83,12 @@ int ObTableLoginP::process() } else if (OB_FAIL(generate_credential(result_.tenant_id_, result_.user_id_, result_.database_id_, login.ttl_us_, user_token, result_.credential_))) { LOG_WARN("failed to generate credential", K(ret), K(login)); - } else if (OB_FAIL(GCTX.table_service_->get_sess_mgr().update_sess(credential_))) { - LOG_WARN("failed to update session pool", K(ret), K_(credential)); } else { + MTL_SWITCH(credential_.tenant_id_) { + if (OB_FAIL(TABLEAPI_SESS_POOL_MGR->update_sess(credential_))) { + LOG_WARN("failed to update session pool", K(ret), K_(credential)); + } + } result_.reserved1_ = 0; result_.reserved2_ = 0; result_.server_version_ = ObString::make_string(PACKAGE_STRING); @@ -258,7 +261,7 @@ int ObTableApiProcessorBase::check_user_access(const ObString &credential_str) const ObTableApiCredential *sess_credetial = nullptr; if (OB_FAIL(serialization::decode(credential_str.ptr(), credential_str.length(), pos, credential_))) { LOG_WARN("failed to serialize credential", K(ret), K(pos)); - } else if (OB_FAIL(gctx_.table_service_->get_sess_mgr().get_sess_info(credential_, guard))) { + } else if (OB_FAIL(TABLEAPI_SESS_POOL_MGR->get_sess_info(credential_, guard))) { LOG_WARN("fail to get session info", K(ret), K_(credential)); } else if (OB_FAIL(guard.get_credential(sess_credetial))) { LOG_WARN("fail to get credential", K(ret)); diff --git a/src/observer/table/ob_table_service.cpp b/src/observer/table/ob_table_service.cpp index 3e710d8f69..1d7391f01b 100644 --- a/src/observer/table/ob_table_service.cpp +++ b/src/observer/table/ob_table_service.cpp @@ -18,24 +18,17 @@ using namespace oceanbase::table; int ObTableService::init() { - int ret = OB_SUCCESS; - if (OB_FAIL(sess_pool_mgr_.init())) { - LOG_WARN("fail to init tableapi session pool manager", K(ret)); - } - return ret; + return OB_SUCCESS; } void ObTableService::stop() { - sess_pool_mgr_.stop(); } void ObTableService::wait() { - sess_pool_mgr_.wait(); } void ObTableService::destroy() { - sess_pool_mgr_.destroy(); } diff --git a/src/observer/table/ob_table_service.h b/src/observer/table/ob_table_service.h index 082eed5880..e7b340c228 100644 --- a/src/observer/table/ob_table_service.h +++ b/src/observer/table/ob_table_service.h @@ -28,9 +28,6 @@ public: void stop(); void wait(); void destroy(); - table::ObTableApiSessPoolMgr& get_sess_mgr() { return sess_pool_mgr_; } -private: - table::ObTableApiSessPoolMgr sess_pool_mgr_; private: // disallow copy DISALLOW_COPY_AND_ASSIGN(ObTableService); diff --git a/src/observer/table/ob_table_session_pool.cpp b/src/observer/table/ob_table_session_pool.cpp index fd1ed0b6ec..a2430e432b 100644 --- a/src/observer/table/ob_table_session_pool.cpp +++ b/src/observer/table/ob_table_session_pool.cpp @@ -12,101 +12,148 @@ #define USING_LOG_PREFIX SERVER #include "ob_table_session_pool.h" +#include "observer/omt/ob_multi_tenant.h" + +using namespace oceanbase::share; +using namespace oceanbase::common; + namespace oceanbase { namespace table { +/* + init session pool manager when create tenant + - we just obly init the metadata when mtl_init. +*/ +int ObTableApiSessPoolMgr::mtl_init(ObTableApiSessPoolMgr *&mgr) +{ + return mgr->init(); +} + +/* + start tableapi retired session task + - 60 second interval + - repeated +*/ +int ObTableApiSessPoolMgr::start() +{ + int ret = OB_SUCCESS; + + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("table api session pool mgr isn't inited", K(ret)); + } else if (OB_FAIL(TG_SCHEDULE(MTL(omt::ObSharedTimer*)->get_tg_id(), + elimination_task_, + ELIMINATE_SESSION_DELAY/* 60s */, + true/* repeat */))) { + LOG_WARN("failed to schedule tableapi retired session task", K(ret)); + } else { + elimination_task_.is_inited_ = true; + } + + return ret; +} + +// stop tableapi retired session task +void ObTableApiSessPoolMgr::stop() +{ + if (OB_LIKELY(elimination_task_.is_inited_)) { + TG_CANCEL_TASK(MTL(omt::ObSharedTimer*)->get_tg_id(), elimination_task_); + } +} + +// tableapi retired session task wait +void ObTableApiSessPoolMgr::wait() +{ + if (OB_LIKELY(elimination_task_.is_inited_)) { + TG_WAIT_TASK(MTL(omt::ObSharedTimer*)->get_tg_id(), elimination_task_); + } +} + +/* + destroy session pool manager. + - cancel timer task. + - destroy session pool. +*/ +void ObTableApiSessPoolMgr::destroy() +{ + int ret = OB_SUCCESS; + if (is_inited_) { + // 1. cancel timer task + if (elimination_task_.is_inited_) { + bool is_exist = true; + if (OB_SUCC(TG_TASK_EXIST(MTL(omt::ObSharedTimer*)->get_tg_id(), elimination_task_, is_exist))) { + if (is_exist) { + TG_CANCEL_TASK(MTL(omt::ObSharedTimer*)->get_tg_id(), elimination_task_); + TG_WAIT_TASK(MTL(omt::ObSharedTimer*)->get_tg_id(), elimination_task_); + elimination_task_.is_inited_ = false; + } + } + } + + // 2. destroy session pool + if (OB_NOT_NULL(pool_)) { + pool_->destroy(); + pool_ = nullptr; + } + allocator_.reset(); + is_inited_ = false; + } +} + +// init session pool manager. int ObTableApiSessPoolMgr::init() { int ret = OB_SUCCESS; if (!is_inited_) { - ObMemAttr bucket_attr, node_attr; - bucket_attr.label_ = "HashBucApiSessM"; - node_attr.label_ = "HasNodApiSessP"; - if (OB_FAIL(sess_pool_map_.create(hash::cal_next_prime(16), - bucket_attr, - node_attr))) { - LOG_WARN("failed to init tableapi session pool", K(ret)); - } else { - elimination_task_.sess_pool_mgr_ = this; - timer_.set_run_wrapper(MTL_CTX()); - if (OB_FAIL(timer_.init())) { - LOG_WARN("fail to init timer", K(ret)); - } else if (OB_FAIL(timer_.schedule(elimination_task_, ELIMINATE_SESSION_DELAY, true))) { - LOG_WARN("fail to schedule session pool elimination task. ", K(ret)); - } else if (OB_FAIL(timer_.start())) { - LOG_WARN("fail to start session pool elimination task timer.", K(ret)); - } else { - is_inited_ = true; - } - } + elimination_task_.sess_pool_mgr_ = this; + is_inited_ = true; } return ret; } -void ObTableApiSessPoolMgr::stop() -{ - timer_.stop(); -} -void ObTableApiSessPoolMgr::wait() -{ - timer_.wait(); -} - -void ObTableApiSessPoolMgr::destroy() -{ - if (is_inited_) { - is_inited_ = false; - timer_.destroy(); - for (SessPoolMap::iterator it = sess_pool_map_.begin(); - it != sess_pool_map_.end(); - it++) { - if (OB_ISNULL(it->second)) { - BACKTRACE_RET(ERROR, OB_ERR_UNEXPECTED, true, "session pool is null"); - } else { - it->second->~ObTableApiSessPool(); - ob_free(it->second); - } - } - } -} - -int ObTableApiSessPoolMgr::get_sess_info(ObTableApiCredential &credential, - ObTableApiSessGuard &guard) +/* + get a session or create a new one if it doesn't exist + - 1. the user should access the current tenant, so we check tenant id. + - 2. ObTableApiSessGuard holds the reference count of session. + - 3. pool_ have been created when login normally, + but some inner operation did not login, such as ttl operation, so we create a new pool for ttl. +*/ +int ObTableApiSessPoolMgr::get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard) { int ret = OB_SUCCESS; - ObTableApiSessPoolGuard &pool_guard = guard.get_sess_pool_guard(); - if (OB_FAIL(get_or_create_sess_pool(credential, pool_guard))) { - LOG_WARN("fail to get session or create pool", K(ret)); - } else if (OB_FAIL(pool_guard.get_sess_pool()->get_sess_info(credential, guard))) { - LOG_WARN("fail to get sess info", K(ret), K(credential)); + + if (credential.tenant_id_ != MTL_ID()) { + ret = OB_NOT_SUPPORTED; + LOG_WARN("access wrong tenant", K(ret), K(credential.tenant_id_), K(MTL_ID())); + } else if (OB_UNLIKELY(OB_ISNULL(pool_)) && OB_FAIL(create_session_pool_safe())) { + LOG_WARN("fail to create session pool", K(ret), K(credential)); + } else if (OB_ISNULL(pool_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session pool is null", K(ret), K(credential)); + } else if (OB_FAIL(pool_->get_sess_info(credential, guard))) { + LOG_WARN("fail to get session info", K(ret), K(credential)); } return ret; } -int ObTableApiSessPoolMgr::get_or_create_sess_pool(ObTableApiCredential &credential, - ObTableApiSessPoolGuard &guard) +/* + create session pool safely. + - lock for allocator concurrency. +*/ +int ObTableApiSessPoolMgr::create_session_pool_safe() { int ret = OB_SUCCESS; - const uint64_t tenant_id = credential.tenant_id_; - if (OB_FAIL(get_session_pool(tenant_id, guard))) { - if (OB_HASH_NOT_EXIST != ret) { - LOG_WARN("fail to get session pool", K(ret), K(credential)); - } else { // ret = OB_HASH_NOT_EXIST - ObLockGuard lock_guard(lock_); // lock first - if (OB_FAIL(get_session_pool(tenant_id, guard))) { // double check - if (OB_HASH_NOT_EXIST != ret) { - LOG_WARN("fail to get session pool", K(ret), K(credential)); - } else { // ret = OB_HASH_NOT_EXIST - if (OB_FAIL(extend_sess_pool(tenant_id, guard))) { - LOG_WARN("fail to extend sess pool", K(ret), K(tenant_id)); - } - } + if (OB_ISNULL(pool_)) { + ObLockGuard guard(lock_); + if (OB_ISNULL(pool_)) { // double check + if (OB_FAIL(create_session_pool_unsafe())) { + LOG_WARN("fail to create session pool", K(ret)); } } } @@ -114,161 +161,90 @@ int ObTableApiSessPoolMgr::get_or_create_sess_pool(ObTableApiCredential &credent return ret; } -int ObTableApiSessPoolMgr::update_sess(ObTableApiCredential &credential) -{ - int ret = OB_SUCCESS; - ObTableApiSessPoolGuard guard; - const uint64_t tenant_id = credential.tenant_id_; - if (OB_FAIL(get_session_pool(tenant_id, guard))) { - if (OB_HASH_NOT_EXIST == ret) { - if (OB_FAIL(extend_sess_pool(tenant_id, guard))) { - LOG_WARN("fail to extend sess pool", K(ret), K(tenant_id)); - } - } else { - LOG_WARN("fait to get session pool", K(ret), K(tenant_id)); - } - } - - if (OB_FAIL(ret)) { - // do nothing - } else if (OB_FAIL(guard.get_sess_pool()->update_sess(credential))) { - LOG_WARN("fail to update sess pool", K(ret), K(tenant_id), K(credential)); - } - - return ret; -} - -int ObTableApiSessPoolMgr::create_pool_if_not_exists(int64_t tenant_id) -{ - int ret = OB_SUCCESS; - ObTableApiSessPoolGuard guard; - if (OB_FAIL(get_session_pool(tenant_id, guard))) { - if (OB_HASH_NOT_EXIST == ret) { - if (OB_FAIL(extend_sess_pool(tenant_id, guard))) { - LOG_WARN("fail to extend sess pool", K(ret), K(tenant_id)); - } else { - LOG_INFO("success to extend sess pool", K(ret), K(tenant_id)); - } - } else { - LOG_WARN("fait to get session pool", K(ret), K(tenant_id)); - } - } - - return ret; -} - -int ObTableApiSessPoolMgr::extend_sess_pool(uint64_t tenant_id, - ObTableApiSessPoolGuard &guard) +int ObTableApiSessPoolMgr::create_session_pool_unsafe() { int ret = OB_SUCCESS; void *buf = nullptr; ObTableApiSessPool *tmp_pool = nullptr; - if (OB_ISNULL(buf = ob_malloc(sizeof(ObTableApiSessPool), "ApiSessPool"))) { + if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObTableApiSessPool)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc mem for ObTableApiSessPool", K(ret)); - } else if (FALSE_IT(tmp_pool = new (buf) ObTableApiSessPool(tenant_id))) { + } else if (FALSE_IT(tmp_pool = new (buf) ObTableApiSessPool())) { + } else if (OB_FAIL(tmp_pool->init())) { + LOG_WARN("fail to init sess pool", K(ret)); + allocator_.free(tmp_pool); + tmp_pool = nullptr; } else { - if (OB_FAIL(tmp_pool->init())) { - LOG_WARN("fail to init sess pool", K(ret), K(tenant_id)); - } else if (OB_FAIL(sess_pool_map_.set_refactored(tenant_id, tmp_pool))) { - if (OB_HASH_EXIST != ret) { - LOG_WARN("fail to add sess pool to hash map", K(ret), K(*tmp_pool)); - } else { // this pool has been set by other thread, free it - tmp_pool->~ObTableApiSessPool(); - ob_free(tmp_pool); - tmp_pool = nullptr; - // get sess pool - ObTableApiSessPoolMgrAtomic op; - if (OB_FAIL(sess_pool_map_.read_atomic(tenant_id, op))) { - LOG_WARN("fail to get sess pool", K(ret), K(tenant_id)); - } else { - ObTableApiSessPool *pool = op.get_session_pool(); - pool->inc_ref_count(); - guard.set_sess_pool(pool); - } - } - } else { - tmp_pool->inc_ref_count(); - guard.set_sess_pool(tmp_pool); - } - - if (OB_FAIL(ret) && OB_NOT_NULL(tmp_pool)) { - tmp_pool->~ObTableApiSessPool(); - ob_free(tmp_pool); - tmp_pool = nullptr; - } + pool_ = tmp_pool; } return ret; } -int ObTableApiSessPoolMgr::get_session_pool(uint64_t tenant_id, ObTableApiSessPoolGuard &guard) +/* + update session when login. + - 1. because tableapi is not aware of changes to system variables, + users need to log in again to get the latest system variables. + - 2. we will create a new session node which has the latest system variables + to replace the old session node. + - 3. login is handled by sys tenant. + - 4. login has concurrency, many thread will login together. +*/ +int ObTableApiSessPoolMgr::update_sess(ObTableApiCredential &credential) { int ret = OB_SUCCESS; - if (is_inited_) { - ObTableApiSessPoolMgrAtomic op; - ret = sess_pool_map_.read_atomic(tenant_id, op); - if (OB_SUCC(ret)) { // exist - ObTableApiSessPool *pool = op.get_session_pool(); - pool->inc_ref_count(); - guard.set_sess_pool(pool); - } else if (OB_HASH_NOT_EXIST == ret) { - // do nothing - } else { - LOG_WARN("fait to atomic get session pool", K(ret), K(tenant_id)); - } + if (OB_UNLIKELY(OB_ISNULL(pool_)) && OB_FAIL(create_session_pool_safe())) { + LOG_WARN("fail to create session pool", K(ret), K(credential)); + } else if (OB_FAIL(pool_->update_sess(credential))) { + LOG_WARN("fail to update sess pool", K(ret), K(credential)); } return ret; } -// 1. 淘汰长期未被使用的session -// 2. 回收淘汰的session + +/* + The background timer tasks to delete session node. + - retire session node that have not been accessed for more than 3 minutes. + - recycle session node in retired node list. +*/ void ObTableApiSessPoolMgr::ObTableApiSessEliminationTask::runTimerTask() { int ret = OB_SUCCESS; - if (OB_ISNULL(sess_pool_mgr_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("sess_pool_mgr_ not inited", K(ret)); - } else if (OB_FAIL(run_retire_sess_task())) { + if (OB_FAIL(run_retire_sess_task())) { LOG_WARN("fail to run retire sess task", K(ret)); } else if (OB_FAIL(run_recycle_retired_sess_task())) { LOG_WARN("fail to run recycle retired sess task", K(ret)); } } +/* + retire session node that have not been accessed for more than 3 minutes. + - move session node which have not been accessed for more than 3 minutes to retired node list. +*/ int ObTableApiSessPoolMgr::ObTableApiSessEliminationTask::run_retire_sess_task() { int ret = OB_SUCCESS; if (OB_ISNULL(sess_pool_mgr_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("sess_pool_mgr_ not inited", K(ret)); - } else { - ObTableApiSessPoolForeachOp op; - if (OB_FAIL(sess_pool_mgr_->sess_pool_map_.foreach_refactored(op))) { - LOG_WARN("fail to foreach sess pool hash map", K(ret)); - } else { - const ObTableApiSessPoolForeachOp::TelantIdArray &arr = op.get_telant_id_array(); - const int64_t N = arr.count(); - for (int64_t i = 0; i < N && OB_SUCC(ret); i++) { - uint64_t tenant_id = arr.at(i); - ObTableApiSessPoolGuard pool_guard; - if (OB_FAIL(sess_pool_mgr_->get_session_pool(tenant_id, pool_guard))) { - LOG_WARN("fail to get sess pool", K(ret), K(tenant_id)); - } else if (OB_FAIL(pool_guard.get_sess_pool()->move_sess_to_retired_list())) { - LOG_WARN("fail to move retired session", K(ret)); - } - } - } + LOG_WARN("sess_pool_mgr_ is null", K(ret)); + } else if (OB_ISNULL(sess_pool_mgr_->pool_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session pool is null", K(ret)); + } else if (sess_pool_mgr_->pool_->retire_session_node()) { + LOG_WARN("fail to retire session node", K(ret)); } return ret; } +/* + evict retired session node from retired node list. +*/ int ObTableApiSessPoolMgr::ObTableApiSessEliminationTask::run_recycle_retired_sess_task() { int ret = OB_SUCCESS; @@ -276,54 +252,20 @@ int ObTableApiSessPoolMgr::ObTableApiSessEliminationTask::run_recycle_retired_se if (OB_ISNULL(sess_pool_mgr_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("sess_pool_mgr_ not inited", K(ret)); - } else { - ObTableApiSessPoolForeachOp op; - if (OB_FAIL(sess_pool_mgr_->sess_pool_map_.foreach_refactored(op))) { - LOG_WARN("fail to foreach sess pool hash map", K(ret)); - } else { - const ObTableApiSessPoolForeachOp::TelantIdArray &arr = op.get_telant_id_array(); - const int64_t N = arr.count(); - for (int64_t i = 0; i < N && OB_SUCC(ret); i++) { - uint64_t tenant_id = arr.at(i); - ObTableApiSessPoolGuard pool_guard; - ObTableApiSessPool *pool = nullptr; - if (OB_FAIL(sess_pool_mgr_->get_session_pool(tenant_id, pool_guard))) { - LOG_WARN("fail to get sess pool", K(ret), K(tenant_id)); - } else if (FALSE_IT(pool = pool_guard.get_sess_pool())) { - } else if (OB_FAIL(pool->evict_retired_sess())) { - LOG_WARN("fail to evict retired sess", K(ret)); - } else if (pool->is_empty()) { - // 1. 标记delete,由最后一个持有者释放内存 - // 2. 从hash表中摘出,避免新的请求获取到这个pool - pool->set_deleted(); - ObTableApiSessPool *del_pool = nullptr; - if (OB_FAIL(sess_pool_mgr_->sess_pool_map_.erase_refactored(tenant_id, &del_pool))) { - if (OB_HASH_NOT_EXIST != ret) { - LOG_WARN("fail to erase sess pool from sess pool hash map", K(ret), K(tenant_id)); - } - } - } - } - } - } - - return ret; -} - -int ObTableApiSessPoolMgrAtomic::operator() (MapKV &entry) -{ - int ret = common::OB_SUCCESS; - - if (OB_ISNULL(entry.second)) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret)); - } else { - sess_pool_ = entry.second; + } else if (OB_ISNULL(sess_pool_mgr_->pool_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session pool is null", K(ret)); + } else if (OB_FAIL(sess_pool_mgr_->pool_->evict_retired_sess())) { + LOG_WARN("fail to evict retired sess", K(ret)); } return ret; } +/* + init session pool + - init key_node_map_ which is a hashmap, key is user_id, value is ObTableApiSessNode* +*/ int ObTableApiSessPool::init(int64_t hash_bucket/* = SESS_POOL_DEFAULT_BUCKET_NUM */) { int ret = OB_SUCCESS; @@ -332,8 +274,8 @@ int ObTableApiSessPool::init(int64_t hash_bucket/* = SESS_POOL_DEFAULT_BUCKET_NU if (OB_FAIL(key_node_map_.create(hash::cal_next_prime(hash_bucket), "HashBucApiSessP", "HasNodApiSess", - tenant_id_))) { - LOG_WARN("fail to init sess pool", K(ret), K(hash_bucket), K_(tenant_id)); + MTL_ID()))) { + LOG_WARN("fail to init sess pool", K(ret), K(hash_bucket), K(MTL_ID())); } else { is_inited_ = true; } @@ -342,18 +284,53 @@ int ObTableApiSessPool::init(int64_t hash_bucket/* = SESS_POOL_DEFAULT_BUCKET_NU return ret; } +/* + destroy session pool. + - free all session. +*/ void ObTableApiSessPool::destroy() { - if (is_inited_) { - if (OB_SUCCESS != evict_all_session()) { - LOG_WARN_RET(OB_ERR_UNEXPECTED, "fail to evict all seesion"); + int ret = OB_SUCCESS; + ObTableApiSessForeachOp op; + + // clear map + if (OB_FAIL(key_node_map_.foreach_refactored(op))) { + LOG_WARN("fail to foreach sess key node map", K(ret)); + } else { + const ObTableApiSessForeachOp::SessKvArray &arr = op.get_key_value_array(); + const int64_t N = arr.count(); + for (int64_t i = 0; OB_SUCC(ret) && i < N; ++i) { + const ObTableApiSessForeachOp::ObTableApiSessKV &kv = arr.at(i); + ObTableApiSessNode *del_node = nullptr; + if (OB_FAIL(key_node_map_.erase_refactored(kv.key_, &del_node))) { + if (OB_HASH_NOT_EXIST != ret) { + LOG_WARN("fail to erase sess from sess hash map", K(ret), K(kv)); + } + } else if (OB_NOT_NULL(del_node)) { + del_node->destroy(); + } } - is_inited_ = false; } + + // clear retired_nodes_ + DLIST_FOREACH(node, retired_nodes_) { + if (OB_NOT_NULL(node)) { + node->destroy(); + } + } + + retired_nodes_.clear(); + key_node_map_.destroy(); + allocator_.reset(); + is_inited_ = false; } -// 将过期的node从hash map中摘掉,加入retired_nodes_链表中 -int ObTableApiSessPool::move_sess_to_retired_list() +/* + loop all session node to retire. + - nodes which have not been visited for more than 5 minutes will be retired. + - move retired node to retired list. +*/ +int ObTableApiSessPool::retire_session_node() { int ret = OB_SUCCESS; int64_t cur_time = ObTimeUtility::current_time(); @@ -367,8 +344,13 @@ int ObTableApiSessPool::move_sess_to_retired_list() for (int64_t i = 0; OB_SUCC(ret) && i < N; ++i) { const ObTableApiSessForeachOp::ObTableApiSessKV &kv = arr.at(i); if (cur_time - kv.node_->get_last_active_ts() >= SESS_RETIRE_TIME) { - if (OB_FAIL(move_sess_to_retired_list(kv.key_))) { - LOG_WARN("fail to move retired session", K(ret), K(kv.key_)); + ObTableApiSessNode *del_node = nullptr; + if (OB_FAIL(key_node_map_.erase_refactored(kv.key_, &del_node))) { + if (OB_HASH_NOT_EXIST != ret) { + LOG_WARN("fail to erase sess from sess hash map", K(ret), K(kv.key_)); + } + } else if (OB_FAIL(move_node_to_retired_list(del_node))) { + LOG_WARN("fail to move session node to retired list", K(ret)); } } } @@ -377,27 +359,7 @@ int ObTableApiSessPool::move_sess_to_retired_list() return ret; } -int ObTableApiSessPool::move_sess_to_retired_list(uint64_t key) -{ - int ret = OB_SUCCESS; - ObTableApiSessNode *del_node = nullptr; - - if (OB_FAIL(key_node_map_.erase_refactored(key, &del_node))) { - if (OB_HASH_NOT_EXIST != ret) { - LOG_WARN("fail to erase sess from sess hash map", K(ret), K(key)); - } - } else { - ObLockGuard guard(lock_); - if (false == (retired_nodes_.add_last(del_node))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("fail to add retired sess node to retired list", K(ret), K(*del_node)); - } - } - - return ret; -} - -int ObTableApiSessPool::move_sess_to_retired_list(ObTableApiSessNode *node) +int ObTableApiSessPool::move_node_to_retired_list(ObTableApiSessNode *node) { int ret = OB_SUCCESS; @@ -413,7 +375,12 @@ int ObTableApiSessPool::move_sess_to_retired_list(ObTableApiSessNode *node) return ret; } -// 清理retired_nodes_中没有引用的node +/* + evit retired session. + 1. remove session val in free_list. + 2. remove session node from retired_nodes_ when node is empty. + 3. free node memory. +*/ int ObTableApiSessPool::evict_retired_sess() { int ret = OB_SUCCESS; @@ -437,32 +404,6 @@ int ObTableApiSessPool::evict_retired_sess() return ret; } -int ObTableApiSessPool::evict_all_session() -{ - int ret = OB_SUCCESS; - ObTableApiSessForeachOp op; - if (OB_FAIL(key_node_map_.foreach_refactored(op))) { - LOG_WARN("fail to foreach sess key node map", K(ret)); - } else { - const ObTableApiSessForeachOp::SessKvArray &arr = op.get_key_value_array(); - const int64_t N = arr.count(); - for (int64_t i = 0; OB_SUCC(ret) && i < N; ++i) { - const ObTableApiSessForeachOp::ObTableApiSessKV &kv = arr.at(i); - ObTableApiSessNode *del_node = nullptr; - if (OB_FAIL(key_node_map_.erase_refactored(kv.key_, &del_node))) { - if (OB_HASH_NOT_EXIST != ret) { - LOG_WARN("fail to erase sess from sess hash map", K(ret), K(kv)); - } - } else { - ObLockGuard guard(lock_); - del_node->~ObTableApiSessNode(); - allocator_.free(del_node); - } - } - } - return ret; -} - int ObTableApiSessPool::get_sess_node(uint64_t key, ObTableApiSessNode *&node) { @@ -489,27 +430,45 @@ int ObTableApiSessPool::get_sess_node(uint64_t key, return ret; } +/* + get session + 1. get session node + 2. create new one if not exist + 3. get session node value + 3.1 if there is no session node val in node list, extend it. + + struct pool { + map: [user_id0:node1][user_id2:node:2] + } + + struct node { + list: node_val0 - node_val1 - node_val2 - ... - node_valn + } +*/ int ObTableApiSessPool::get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard) { int ret = OB_SUCCESS; ObTableApiSessNode *sess_node = nullptr; - ObTableApiSessNodeVal *sess_val = nullptr; bool need_extend = false; - if (OB_FAIL(get_sess_node(credential.user_id_, sess_node))) { - LOG_WARN("fail to get sess node", K(ret), K(credential)); - } - - if (OB_HASH_NOT_EXIST == ret) { - if (OB_FAIL(create_and_add_node(credential))) { - LOG_WARN("fail to create and add session node to session pool", K(ret), K(credential)); - } else if (OB_FAIL(get_sess_node(credential.user_id_, sess_node))) { // get again + if (OB_FAIL(get_sess_node(credential.user_id_, sess_node))) { // first get + if (OB_HASH_NOT_EXIST != ret) { LOG_WARN("fail to get sess node", K(ret), K(credential)); } } - if (OB_SUCC(ret)) { - if (OB_FAIL(sess_node->get_sess_node_val(sess_val))) { // exist + if (OB_FAIL(ret) && OB_HASH_NOT_EXIST != ret) { + // do nothing + } else if (OB_UNLIKELY(OB_HASH_NOT_EXIST == ret) && OB_FAIL(create_and_add_node_safe(credential))) { // not exist, create + LOG_WARN("fail to create and add session node", K(ret), K(credential)); + } else if (OB_UNLIKELY(OB_ISNULL(sess_node)) && OB_FAIL(get_sess_node(credential.user_id_, sess_node))) { // get again + LOG_WARN("fail to get sess node", K(ret), K(credential)); + } else if (OB_UNLIKELY(OB_ISNULL(sess_node))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session is null", K(ret)); + } else { + ObTableApiSessNodeVal *sess_val = nullptr; + if (OB_FAIL(sess_node->get_sess_node_val(sess_val))) { LOG_WARN("fail to get sess node value", K(ret), K(*sess_node)); } else if (OB_ISNULL(sess_val)) { need_extend = true; @@ -519,8 +478,8 @@ int ObTableApiSessPool::get_sess_info(ObTableApiCredential &credential, ObTableA } if (need_extend) { - if (OB_FAIL(sess_node->extend_sess_val(guard))) { - LOG_WARN("fail to extend sess val", K(ret), K(*sess_node), K(credential)); + if (OB_FAIL(sess_node->extend_and_get_sess_val(guard))) { + LOG_WARN("fail to extend and get sess val", K(ret), K(*sess_node), K(credential)); } } @@ -532,7 +491,7 @@ int ObTableApiSessPool::get_sess_info(ObTableApiCredential &credential, ObTableA return ret; } -int ObTableApiSessPool::create_node(ObTableApiCredential &credential, ObTableApiSessNode *&node) +int ObTableApiSessPool::create_node_safe(ObTableApiCredential &credential, ObTableApiSessNode *&node) { int ret = OB_SUCCESS; ObLockGuard guard(lock_); @@ -551,30 +510,32 @@ int ObTableApiSessPool::create_node(ObTableApiCredential &credential, ObTableApi return ret; } -int ObTableApiSessPool::create_and_add_node(ObTableApiCredential &credential) +int ObTableApiSessPool::create_and_add_node_safe(ObTableApiCredential &credential) { int ret = OB_SUCCESS; ObTableApiSessNode *node = nullptr; - if (OB_FAIL(create_node(credential, node))) { + if (OB_FAIL(create_node_safe(credential, node))) { LOG_WARN("fail to create node", K(ret), K(credential)); } else if (OB_FAIL(key_node_map_.set_refactored(credential.user_id_, node))) { if (OB_HASH_EXIST != ret) { LOG_WARN("fail to add sess node to hash map", K(ret), K(credential.user_id_), K(*node)); - } else { // this node has been set by other thread, free it - ObLockGuard guard(lock_); - node->~ObTableApiSessNode(); - allocator_.free(node); - node = nullptr; } + // this node has been set by other thread, free it + ObLockGuard guard(lock_); + node->~ObTableApiSessNode(); + allocator_.free(node); + node = nullptr; } return ret; } -// 1. login时调用 -// 2. 不存在,创建; 存在,旧的移动到淘汰链表, 添加新的node -int ObTableApiSessPool::update_sess(ObTableApiCredential &credential, bool replace_old_node) +/* + 1. only call in login + 2. move old to retired list when node exist, create new node otherwise. +*/ +int ObTableApiSessPool::update_sess(ObTableApiCredential &credential) { int ret = OB_SUCCESS; @@ -582,22 +543,21 @@ int ObTableApiSessPool::update_sess(ObTableApiCredential &credential, bool repla const uint64_t key = credential.user_id_; if (OB_FAIL(get_sess_node(key, node))) { if (OB_HASH_NOT_EXIST == ret) { // not exist, create - if (OB_FAIL(create_and_add_node(credential))) { + if (OB_FAIL(create_and_add_node_safe(credential))) { LOG_WARN("fail to create and add node", K(ret), K(credential)); } } else { LOG_WARN("fail to get session node", K(ret), K(key)); } - } else if (replace_old_node) { // exist, 替换node,old node移动到淘汰链表等待淘汰 - if (OB_FAIL(replace_sess(credential))) { - LOG_WARN("fail to replace session node", K(ret), K(credential)); - } + } else if (OB_FAIL(replace_sess_node_safe(credential))) { // exist, create and replace old node + LOG_WARN("fail to replace session node", K(ret), K(credential)); } return ret; } -int ObTableApiSessPool::replace_sess(ObTableApiCredential &credential) +// create and replace old node in callback function +int ObTableApiSessPool::replace_sess_node_safe(ObTableApiCredential &credential) { int ret = OB_SUCCESS; @@ -609,18 +569,12 @@ int ObTableApiSessPool::replace_sess(ObTableApiCredential &credential) return ret; } -int64_t ObTableApiSessPool::inc_ref_count() +void ObTableApiSessNodeVal::destroy() { - return ATOMIC_AAF((uint64_t *)&ref_count_, 1); -} - -void ObTableApiSessPool::dec_ref_count() -{ - (void)ATOMIC_SAF((uint64_t *)&ref_count_, 1); - if (is_deleted() && 0 == ref_count_) { - this->~ObTableApiSessPool(); - ob_free(this); - } + sess_info_.destroy(); + allocator_.reset(); + is_inited_ = false; + owner_node_ = nullptr; } int ObTableApiSessNodeVal::init_sess_info() @@ -630,19 +584,19 @@ int ObTableApiSessNodeVal::init_sess_info() if (!is_inited_) { share::schema::ObSchemaGetterGuard schema_guard; const ObTenantSchema *tenant_schema = nullptr; - if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id_, schema_guard))) { - LOG_WARN("fail to get schema guard", K(ret), K_(tenant_id)); - } else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id_, tenant_schema))) { - LOG_WARN("fail to get tenant schema", K(ret), K_(tenant_id)); + if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(MTL_ID(), schema_guard))) { + LOG_WARN("fail to get schema guard", K(ret), K(MTL_ID())); + } else if (OB_FAIL(schema_guard.get_tenant_info(MTL_ID(), tenant_schema))) { + LOG_WARN("fail to get tenant schema", K(ret), K(MTL_ID())); } else if (OB_ISNULL(tenant_schema)) { ret = OB_SCHEMA_ERROR; LOG_WARN("tenant schema is null", K(ret)); - } else if (OB_FAIL(ObTableApiSessUtil::init_sess_info(tenant_id_, + } else if (OB_FAIL(ObTableApiSessUtil::init_sess_info(MTL_ID(), tenant_schema->get_tenant_name_str(), &allocator_, schema_guard, sess_info_))) { - LOG_WARN("fail to init sess info", K(ret), K(tenant_id_)); + LOG_WARN("fail to init sess info", K(ret), K(MTL_ID())); } else { is_inited_ = true; } @@ -651,6 +605,11 @@ int ObTableApiSessNodeVal::init_sess_info() return ret; } +/* + move node val to free list from used list. + - remove from used list. + - add to free list. +*/ void ObTableApiSessNodeVal::give_back_to_free_list() { if (OB_NOT_NULL(owner_node_)) { @@ -675,17 +634,16 @@ void ObTableApiSessNode::destroy() DLIST_FOREACH_REMOVESAFE(sess, free_list) { ObTableApiSessNodeVal *rm_sess = free_list.remove(sess); if (OB_NOT_NULL(rm_sess)) { - rm_sess->~ObTableApiSessNodeVal(); - allocator_.free(rm_sess); + rm_sess->destroy(); } } DLIST_FOREACH_REMOVESAFE(sess, used_list) { ObTableApiSessNodeVal *rm_sess = used_list.remove(sess); if (OB_NOT_NULL(rm_sess)) { - rm_sess->~ObTableApiSessNodeVal(); - allocator_.free(rm_sess); + rm_sess->destroy(); } } + allocator_.reset(); } int ObTableApiSessNode::remove_unused_sess() @@ -710,7 +668,11 @@ int ObTableApiSessNode::remove_unused_sess() return ret; } -// 从free list中取出,添加到used list中 +/* + get session node val + - remove and get from free list. + - add to used list. +*/ int ObTableApiSessNode::get_sess_node_val(ObTableApiSessNodeVal *&val) { int ret = OB_SUCCESS; @@ -733,7 +695,13 @@ int ObTableApiSessNode::get_sess_node_val(ObTableApiSessNodeVal *&val) return ret; } -int ObTableApiSessNode::extend_sess_val(ObTableApiSessGuard &guard) +/* + extend a session node val and put it to guard + - alloc new session node val. + - add to use list. + - put to guard. +*/ +int ObTableApiSessNode::extend_and_get_sess_val(ObTableApiSessGuard &guard) { int ret = OB_SUCCESS; @@ -743,7 +711,7 @@ int ObTableApiSessNode::extend_sess_val(ObTableApiSessGuard &guard) ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc mem for ObTableApiSessNodeVal", K(ret), K(sizeof(ObTableApiSessNodeVal))); } else { - ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(tenant_id_, this); + ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(this); if (OB_FAIL(val->init_sess_info())) { LOG_WARN("fail to init sess info", K(ret), K(*val)); } else { @@ -757,6 +725,11 @@ int ObTableApiSessNode::extend_sess_val(ObTableApiSessGuard &guard) } } + if (OB_FAIL(ret) && OB_NOT_NULL(buf)) { + allocator_.free(buf); + buf = nullptr; + } + return ret; } @@ -775,6 +748,12 @@ int ObTableApiSessNodeAtomicOp::get_value(ObTableApiSessNode *&node) return ret; } +/* + replace session node operation + 1. create new node. + 2. replace them. + 3. move old node to retired list. +*/ int ObTableApiSessNodeReplaceOp::operator()(MapKV &entry) { int ret = OB_SUCCESS; @@ -783,16 +762,16 @@ int ObTableApiSessNodeReplaceOp::operator()(MapKV &entry) ret = OB_ERR_UNEXPECTED; LOG_WARN("null session node", K(ret), K(entry.first)); } else { - // 1. 创建新的session node + // 1. create new session node ObTableApiSessNode *new_node = nullptr; - if (OB_FAIL(pool_.create_node(credential_, new_node))) { + if (OB_FAIL(pool_.create_node_safe(credential_, new_node))) { LOG_WARN("fail to create node", K(ret), K_(credential)); } else { - // 2. 替换 + // 2. replace ObTableApiSessNode *old_node = entry.second; entry.second = new_node; - // 3. old node移动到淘汰链表 - pool_.move_sess_to_retired_list(old_node); // 添加到链表末尾,不会出错,故不判断返回值 + // 3. move old node to retired list + pool_.move_node_to_retired_list(old_node); // 添加到链表末尾,不会出错,故不判断返回值 } } @@ -810,17 +789,6 @@ int ObTableApiSessForeachOp::operator()(MapKV &entry) return ret; } -int ObTableApiSessPoolForeachOp::operator()(MapKV &entry) -{ - int ret = common::OB_SUCCESS; - - if (OB_FAIL(telant_ids_.push_back(entry.first))) { - LOG_WARN("fail to push back key", K(ret), K(entry.first)); - } - - return ret; -} - int ObTableApiSessUtil::init_sess_info(uint64_t tenant_id, const common::ObString &tenant_name, ObIAllocator *allocator, diff --git a/src/observer/table/ob_table_session_pool.h b/src/observer/table/ob_table_session_pool.h index 72d9ad18d1..6dd26fbc83 100644 --- a/src/observer/table/ob_table_session_pool.h +++ b/src/observer/table/ob_table_session_pool.h @@ -25,16 +25,14 @@ class ObTableApiSessNode; class ObTableApiSessGuard; class ObTableApiSessNodeVal; class ObTableApiSessNodeAtomicOp; -class ObTableApiSessPoolGuard; class ObTableApiSessPoolMgr final { -public: - // key is tenant_id - typedef common::hash::ObHashMap SessPoolMap; public: ObTableApiSessPoolMgr() - : is_inited_(false) + : is_inited_(false), + allocator_("TbSessPoolMgr", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), + pool_(nullptr) {} virtual ~ObTableApiSessPoolMgr() { destroy(); } TO_STRING_KV(K_(is_inited), @@ -44,11 +42,11 @@ public: { public: ObTableApiSessEliminationTask() - : sess_pool_mgr_(nullptr), - run_task_counter_(0) + : is_inited_(false), + sess_pool_mgr_(nullptr) { } - TO_STRING_KV(K_(run_task_counter)); + TO_STRING_KV(K_(is_inited), KPC_(sess_pool_mgr)); void runTimerTask(void); private: // 回收已经淘汰的session @@ -56,47 +54,33 @@ public: // 淘汰长期未被使用的session int run_retire_sess_task(); public: + bool is_inited_; ObTableApiSessPoolMgr *sess_pool_mgr_; - int64_t run_task_counter_; }; public: - int init(); + static int mtl_init(ObTableApiSessPoolMgr *&mgr); + int start(); void stop(); void wait(); void destroy(); - int get_session_pool(uint64_t tenant_id, ObTableApiSessPoolGuard &guard); + int init(); int get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard); int update_sess(ObTableApiCredential &credential); - int create_pool_if_not_exists(int64_t tenant_id); private: - int extend_sess_pool(uint64_t tenant_id, ObTableApiSessPoolGuard &guard); - int get_or_create_sess_pool(ObTableApiCredential &credential, ObTableApiSessPoolGuard &guard); + int create_session_pool_safe(); + int create_session_pool_unsafe(); private: static const int64_t ELIMINATE_SESSION_DELAY = 60 * 1000 * 1000; // 60s bool is_inited_; - SessPoolMap sess_pool_map_; + common::ObArenaAllocator allocator_; + ObTableApiSessPool *pool_; ObTableApiSessEliminationTask elimination_task_; - common::ObTimer timer_; ObSpinLock lock_; // for get_or_create_sess_pool private: DISALLOW_COPY_AND_ASSIGN(ObTableApiSessPoolMgr); }; -class ObTableApiSessPoolMgrAtomic -{ -public: - typedef common::hash::HashMapPair MapKV; -public: - ObTableApiSessPoolMgrAtomic() - : sess_pool_(nullptr) - {} - int operator() (MapKV &entry); - ObTableApiSessPool *get_session_pool() { return sess_pool_; } -private: - ObTableApiSessPool *sess_pool_; -private: - DISALLOW_COPY_AND_ASSIGN(ObTableApiSessPoolMgrAtomic); -}; +#define TABLEAPI_SESS_POOL_MGR (MTL(ObTableApiSessPoolMgr*)) class ObTableApiSessPool final { @@ -106,45 +90,29 @@ public: static const int64_t SESS_POOL_DEFAULT_BUCKET_NUM = 10; // 取决于客户端登录的用户数量 static const int64_t SESS_RETIRE_TIME = 300 * 1000000; // 超过300s未被访问的session会被标记淘汰 public: - explicit ObTableApiSessPool(uint64_t tenant_id) - : allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id), - is_inited_(false), - tenant_id_(tenant_id), - ref_count_(0), - is_deleted_(false) + explicit ObTableApiSessPool() + : allocator_("TbSessPool", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), + is_inited_(false) {} ~ObTableApiSessPool() { destroy(); }; TO_STRING_KV(K_(is_inited), - K_(tenant_id), - K_(ref_count), - K_(is_deleted)); + K_(retired_nodes)); int init(int64_t hash_bucket = SESS_POOL_DEFAULT_BUCKET_NUM); void destroy(); - int64_t inc_ref_count(); - void dec_ref_count(); - void set_deleted() { ATOMIC_SET(&is_deleted_, true); } - bool is_deleted() { return ATOMIC_LOAD(&is_deleted_); } - bool is_empty() const { return key_node_map_.empty(); } int get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard); - int update_sess(ObTableApiCredential &credential, bool replace_old_node = true); - // 将过期的node移动到retired_nodes_ - int move_sess_to_retired_list(); + int update_sess(ObTableApiCredential &credential); + int retire_session_node(); int evict_retired_sess(); - int create_node(ObTableApiCredential &credential, ObTableApiSessNode *&node); - int move_sess_to_retired_list(ObTableApiSessNode *node); + int create_node_safe(ObTableApiCredential &credential, ObTableApiSessNode *&node); + int move_node_to_retired_list(ObTableApiSessNode *node); private: - int replace_sess(ObTableApiCredential &credential); - int create_and_add_node(ObTableApiCredential &credential); + int replace_sess_node_safe(ObTableApiCredential &credential); + int create_and_add_node_safe(ObTableApiCredential &credential); int get_sess_node(uint64_t key, ObTableApiSessNode *&node); - int evict_all_session(); - int move_sess_to_retired_list(uint64_t key); private: common::ObArenaAllocator allocator_; bool is_inited_; - uint64_t tenant_id_; CacheKeyNodeMap key_node_map_; - volatile int64_t ref_count_; - volatile bool is_deleted_; // 已经淘汰的node,等待被后台删除 // 前台login时、后台淘汰时都会操作retired_nodes_,因此需要加锁 common::ObDList retired_nodes_; @@ -153,44 +121,21 @@ private: DISALLOW_COPY_AND_ASSIGN(ObTableApiSessPool); }; -class ObTableApiSessPoolGuard final -{ -public: - ObTableApiSessPoolGuard() - : pool_(nullptr) - {} - ~ObTableApiSessPoolGuard() - { - if (OB_NOT_NULL(pool_)) { - pool_->dec_ref_count(); - } - } - ObTableApiSessPool *get_sess_pool() - { - return pool_; - } - void set_sess_pool(ObTableApiSessPool *pool) { pool_ = pool; } -private: - ObTableApiSessPool *pool_; -}; - class ObTableApiSessNodeVal : public common::ObDLinkBase { friend class ObTableApiSessNode; friend class ObTableApiSessGuard; public: - explicit ObTableApiSessNodeVal(uint64_t tenant_id, ObTableApiSessNode *owner) - : allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id), + explicit ObTableApiSessNodeVal(ObTableApiSessNode *owner) + : allocator_("TbSessNodeVal", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), is_inited_(false), - tenant_id_(tenant_id), owner_node_(owner) {} TO_STRING_KV(K_(is_inited), - K_(tenant_id), K_(sess_info)); public: + void destroy(); sql::ObSQLSessionInfo& get_sess_info() { return sess_info_; } - const sql::ObSQLSessionInfo& get_sess_info() const { return sess_info_; } int init_sess_info(); void reset_tx_desc() { // 防止异步提交场景在 session 析构的时候 rollback 事务 sql::ObSQLSessionInfo::LockGuard guard(sess_info_.get_thread_data_lock()); @@ -200,7 +145,6 @@ public: private: common::ObArenaAllocator allocator_; bool is_inited_; - uint64_t tenant_id_; sql::ObSQLSessionInfo sess_info_; ObTableApiSessNode *owner_node_; private: @@ -213,16 +157,14 @@ friend class ObTableApiSessPool; friend class ObTableApiSessNodeVal; public: explicit ObTableApiSessNode(ObTableApiCredential &credential) - : allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, credential.tenant_id_), - tenant_id_(credential.tenant_id_), + : allocator_("TbSessNode", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), sess_lists_(), last_active_ts_(0), credential_(credential) { } ~ObTableApiSessNode() { destroy(); } - TO_STRING_KV(K_(tenant_id), - K_(sess_lists), + TO_STRING_KV(K_(sess_lists), K_(last_active_ts), K_(credential)); class SessList @@ -247,10 +189,9 @@ public: OB_INLINE int64_t get_last_active_ts() const { return last_active_ts_; } int remove_unused_sess(); private: - int extend_sess_val(ObTableApiSessGuard &guard); + int extend_and_get_sess_val(ObTableApiSessGuard &guard); private: common::ObArenaAllocator allocator_; - uint64_t tenant_id_; SessList sess_lists_; int64_t last_active_ts_; ObTableApiCredential credential_; @@ -282,7 +223,6 @@ public: ObTableApiSessNodeVal* get_sess_node_val() const { return sess_node_val_; } sql::ObSQLSessionInfo& get_sess_info() { return sess_node_val_->get_sess_info(); } const sql::ObSQLSessionInfo& get_sess_info() const { return sess_node_val_->get_sess_info(); } - ObTableApiSessPoolGuard &get_sess_pool_guard() { return pool_guard_; } int get_credential(const ObTableApiCredential *&credential) const { int ret = OB_SUCCESS; @@ -297,7 +237,6 @@ public: } private: ObTableApiSessNodeVal *sess_node_val_; - ObTableApiSessPoolGuard pool_guard_; }; class ObTableApiSessNodeAtomicOp @@ -367,22 +306,6 @@ private: DISALLOW_COPY_AND_ASSIGN(ObTableApiSessForeachOp); }; -class ObTableApiSessPoolForeachOp -{ -public: - typedef common::hash::HashMapPair MapKV; - typedef common::ObSEArray TelantIdArray; - ObTableApiSessPoolForeachOp() - {} - int operator()(MapKV &entry); - const TelantIdArray &get_telant_id_array() const { return telant_ids_; } - void reset() { telant_ids_.reset(); } -private: - TelantIdArray telant_ids_; -private: - DISALLOW_COPY_AND_ASSIGN(ObTableApiSessPoolForeachOp); -}; - class ObTableApiSessUtil final { public: diff --git a/src/observer/table/ttl/ob_table_ttl_task.cpp b/src/observer/table/ttl/ob_table_ttl_task.cpp index 3c312ffb17..c7c91f3f01 100644 --- a/src/observer/table/ttl/ob_table_ttl_task.cpp +++ b/src/observer/table/ttl/ob_table_ttl_task.cpp @@ -73,8 +73,6 @@ int ObTableTTLDeleteTask::init(ObTenantTabletTTLMgr *ttl_tablet_mgr, if (OB_SUCC(ret)) { if (OB_FAIL(init_credential(ttl_para))) { LOG_WARN("fail to init credential", KR(ret)); - } else if (OB_FAIL(create_session_pool(ttl_para.tenant_id_))) { - LOG_WARN("fail to update session pool"); } else { param_ = ttl_para; info_ = &ttl_info; @@ -85,20 +83,6 @@ int ObTableTTLDeleteTask::init(ObTenantTabletTTLMgr *ttl_tablet_mgr, return ret; } -int ObTableTTLDeleteTask::create_session_pool(int64_t tenant_id) -{ - int ret = OB_SUCCESS; - if (OB_ISNULL(GCTX.table_service_)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("table service is null", KR(ret)); - } else if (OB_FAIL(GCTX.table_service_->get_sess_mgr().create_pool_if_not_exists(tenant_id))) { - LOG_WARN("fait to get session pool", K(ret), K(tenant_id)); - } - - return ret; -} - - int ObTableTTLDeleteTask::init_credential(const ObTTLTaskParam &ttl_param) { int ret = OB_SUCCESS; diff --git a/src/observer/table/ttl/ob_table_ttl_task.h b/src/observer/table/ttl/ob_table_ttl_task.h index 4ef60fe417..87066982e0 100644 --- a/src/observer/table/ttl/ob_table_ttl_task.h +++ b/src/observer/table/ttl/ob_table_ttl_task.h @@ -103,7 +103,6 @@ public: } return tablet_id; } - int create_session_pool(int64_t tenant_id); uint64_t get_table_id() const { return param_.table_id_; } int64_t get_timeout_ts() { return ONE_TASK_TIMEOUT + ObTimeUtility::current_time(); } common::ObRowkey &get_start_rowkey() { return rowkey_; } diff --git a/src/share/rc/ob_tenant_base.h b/src/share/rc/ob_tenant_base.h index 6cd377a82b..9b2037b455 100755 --- a/src/share/rc/ob_tenant_base.h +++ b/src/share/rc/ob_tenant_base.h @@ -107,7 +107,9 @@ namespace concurrency_control { } namespace table { + class ObTTLService; class ObHTableLockMgr; + class ObTableApiSessPoolMgr; } namespace logservice { @@ -180,11 +182,6 @@ namespace storage { class MockTenantModuleEnv; } -namespace table -{ - class ObTTLService; -} - namespace share { class ObCgroupCtrl; @@ -318,7 +315,8 @@ using ObTableScanIteratorObjPool = common::ObServerObjectPool #define private public // 获取私有成员 #include "observer/table/ob_table_session_pool.h" +#include "lib/utility/ob_test_util.h" +#include "share/ob_thread_pool.h" +#include "mtlenv/mock_tenant_module_env.h" +#include "share/rc/ob_tenant_base.h" +using namespace oceanbase; using namespace oceanbase::common; using namespace oceanbase::table; using namespace oceanbase::sql; +using namespace oceanbase::share; +using namespace oceanbase::storage; class TestTableSessPool: public ::testing::Test { -public: - const int64_t TENANT_CNT = 10; - const int64_t USER_CNT = 100; - const int64_t NODE_CNT = 100; - const int64_t SESS_CNT = 10; public: TestTableSessPool() {} virtual ~TestTableSessPool() {} - void prepare_sess_pool(ObTableApiSessPoolMgr &mgr); + static void SetUpTestCase() + { + EXPECT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init()); + } + static void TearDownTestCase() + { + MockTenantModuleEnv::get_instance().destroy(); + } + void SetUp() + { + ASSERT_TRUE(MockTenantModuleEnv::get_instance().is_inited()); + TABLEAPI_SESS_POOL_MGR->init(); + create_credential(1, mock_cred_); + } + void TearDown() + { + TABLEAPI_SESS_POOL_MGR->destroy(); + } private: + void create_credential(uint64_t user_id, ObTableApiCredential *&cred); +private: + ObArenaAllocator allocator_; + ObTableApiCredential *mock_cred_; // disallow copy DISALLOW_COPY_AND_ASSIGN(TestTableSessPool); }; -// 10租户,100用户,每个用户下面挂10个session -void TestTableSessPool::prepare_sess_pool(ObTableApiSessPoolMgr &mgr) +void TestTableSessPool::create_credential(uint64_t user_id, ObTableApiCredential *&cred) { - ASSERT_EQ(OB_SUCCESS, mgr.init()); - uint64_t tenant_ids[TENANT_CNT]; - uint64_t user_ids[USER_CNT]; - ObTableApiSessPoolGuard pool_guards[TENANT_CNT]; - for (uint64_t i = 0; i < TENANT_CNT; i++) { - tenant_ids[i] = i + 1; - ASSERT_EQ(OB_SUCCESS, mgr.extend_sess_pool(tenant_ids[i], pool_guards[i])); - ObTableApiSessPool *tmp_pool = pool_guards[i].get_sess_pool(); - ASSERT_NE(nullptr, tmp_pool); - for (uint64_t j = 0; j < USER_CNT; j++) { - user_ids[j] = j; - ASSERT_EQ(OB_SUCCESS, mgr.get_session_pool(tenant_ids[i], pool_guards[i])); - ObTableApiSessPool *pool = pool_guards[i].get_sess_pool(); - ASSERT_NE(nullptr, pool); - ObTableApiCredential credential; - credential.tenant_id_ = tenant_ids[i]; - credential.user_id_ = user_ids[j]; - ASSERT_EQ(OB_SUCCESS, pool->update_sess(credential)); - ObTableApiSessNode *node = nullptr; - ASSERT_EQ(OB_SUCCESS, tmp_pool->get_sess_node(user_ids[j], node)); - ASSERT_NE(nullptr, node); - for (int64_t k = 0; k < SESS_CNT; k++) { - void *buf = node->allocator_.alloc(sizeof(ObTableApiSessNodeVal)); - ASSERT_NE(nullptr, buf); - ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(tenant_ids[i], node); - val->is_inited_ = true; - ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(val)); - } - } - } + void *buf = nullptr; + buf = allocator_.alloc(sizeof(ObTableApiCredential)); + cred = new (buf) ObTableApiCredential(); + cred->cluster_id_ = 0; + cred->tenant_id_ = 1; + cred->user_id_ = user_id; + cred->database_id_ = 1; + cred->expire_ts_ = 0; + cred->hash(cred->hash_val_); } -TEST_F(TestTableSessPool, get_session) +TEST_F(TestTableSessPool, test_mgr_init) { - uint64_t tenant_id = 1; - uint64_t user_id = 0; - uint64_t key = user_id; ObTableApiSessPoolMgr mgr; - ASSERT_EQ(OB_SUCCESS, mgr.init()); - ObTableApiSessPoolGuard pool_guard; - ASSERT_EQ(OB_HASH_NOT_EXIST, mgr.get_session_pool(tenant_id, pool_guard)); - ASSERT_EQ(nullptr, pool_guard.get_sess_pool()); - ASSERT_EQ(OB_SUCCESS, mgr.extend_sess_pool(tenant_id, pool_guard)); - ObTableApiSessPool *pool = pool_guard.get_sess_pool(); - ASSERT_TRUE(nullptr != pool); - ASSERT_TRUE(pool->is_inited_); - ASSERT_EQ(1, pool->ref_count_); - ASSERT_EQ(false, pool->is_deleted_); - ASSERT_EQ(tenant_id, pool->tenant_id_); - ObTableApiSessGuard sess_guard; - ObTableApiCredential credential; - credential.tenant_id_ = tenant_id; - credential.user_id_ = user_id; - ASSERT_EQ(OB_SUCCESS, pool->update_sess(credential)); - ObTableApiSessNode *node = nullptr; - ASSERT_EQ(OB_SUCCESS, pool->get_sess_node(key, node)); - ASSERT_TRUE(nullptr != node); - ASSERT_TRUE(node->is_empty()); - void *buf = node->allocator_.alloc(sizeof(ObTableApiSessNodeVal)); - ASSERT_NE(nullptr, buf); - ObTableApiSessNodeVal *new_val = new (buf) ObTableApiSessNodeVal(tenant_id, node); - new_val->is_inited_ = true; - ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(new_val)); + ASSERT_FALSE(mgr.is_inited_); + ASSERT_EQ(nullptr, mgr.pool_); + ASSERT_EQ(nullptr, mgr.elimination_task_.sess_pool_mgr_); + + ASSERT_EQ(OB_SYS_TENANT_ID, MTL_ID()); + ASSERT_TRUE(TABLEAPI_SESS_POOL_MGR->is_inited_); + ASSERT_EQ(nullptr, TABLEAPI_SESS_POOL_MGR->pool_); + ASSERT_EQ(TABLEAPI_SESS_POOL_MGR, TABLEAPI_SESS_POOL_MGR->elimination_task_.sess_pool_mgr_); } -TEST_F(TestTableSessPool, remove_session) +TEST_F(TestTableSessPool, test_pool_init) { - int64_t tenant_id = 1; - uint64_t user_id = 0; - ObTableApiCredential credential; - credential.tenant_id_ = tenant_id; - credential.user_id_ = user_id; - ObTableApiSessNode node(credential); - for (int64_t i = 0; i < SESS_CNT; i++) { - void *buf = node.allocator_.alloc(sizeof(ObTableApiSessNodeVal)); - ASSERT_NE(nullptr, buf); - ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(tenant_id, &node); - val->is_inited_ = true; - ASSERT_EQ(true, node.sess_lists_.free_list_.add_last(val)); - } - ASSERT_EQ(false, node.is_empty()); - ASSERT_EQ(SESS_CNT, node.sess_lists_.free_list_.get_size()); - node.remove_unused_sess(); - ASSERT_EQ(0, node.sess_lists_.free_list_.get_size()); + ObTableApiSessPool pool; + ASSERT_FALSE(pool.is_inited_); + ASSERT_TRUE(pool.key_node_map_.empty()); + ASSERT_TRUE(pool.retired_nodes_.is_empty()); } -TEST_F(TestTableSessPool, retire_session) +TEST_F(TestTableSessPool, test_node_init) { - int ret = 0; - ObTableApiSessPoolMgr mgr; - prepare_sess_pool(mgr); - ObTableApiSessPoolForeachOp op; - ASSERT_EQ(OB_SUCCESS, mgr.sess_pool_map_.foreach_refactored(op)); - const ObTableApiSessPoolForeachOp::TelantIdArray &tenant_ids = op.get_telant_id_array(); - ASSERT_EQ(TENANT_CNT, tenant_ids.count()); - const int64_t N = tenant_ids.count(); - // 1. 标记淘汰 - for (int64_t i = 0; i < N; i++) { - uint64_t tenant_id = tenant_ids.at(i); - ObTableApiSessPoolGuard pool_guard; - ASSERT_EQ(OB_SUCCESS, mgr.get_session_pool(tenant_id, pool_guard)); - ObTableApiSessPool *pool = pool_guard.get_sess_pool(); - ASSERT_NE(nullptr, pool); - ObTableApiSessForeachOp op; - ASSERT_EQ(OB_SUCCESS, pool->key_node_map_.foreach_refactored(op)); - const ObTableApiSessForeachOp::SessKvArray &kvs = op.get_key_value_array(); - ASSERT_EQ(NODE_CNT, kvs.count()); - for (int64_t j = 0; j < kvs.count(); j++) { - if (j % 2 == 0) { - const ObTableApiSessForeachOp::ObTableApiSessKV &kv = kvs.at(j); - ASSERT_EQ(OB_SUCCESS, pool->move_sess_to_retired_list(kv.key_)); - } - } - } - // 2. 触发淘汰 - ASSERT_EQ(OB_SUCCESS, mgr.elimination_task_.run_recycle_retired_sess_task()); - // 3. 检查 - for (int64_t i = 0; i < N; i++) { - uint64_t tenant_id = tenant_ids.at(i); - ObTableApiSessPoolGuard pool_guard; - ASSERT_EQ(OB_SUCCESS, mgr.get_session_pool(tenant_id, pool_guard)); - ObTableApiSessPool *pool = pool_guard.get_sess_pool(); - ASSERT_NE(nullptr, pool); - ObTableApiSessForeachOp op; - ASSERT_EQ(OB_SUCCESS, pool->key_node_map_.foreach_refactored(op)); - const ObTableApiSessForeachOp::SessKvArray &kvs = op.get_key_value_array(); - ASSERT_EQ(NODE_CNT/2, kvs.count()); - for (int64_t j = 0; j < kvs.count(); j++) { - const ObTableApiSessForeachOp::ObTableApiSessKV &kv = kvs.at(j); - ASSERT_EQ(SESS_CNT, kv.node_->sess_lists_.free_list_.get_size()); - } - } + ObTableApiSessNode node(*mock_cred_); + ASSERT_TRUE(node.sess_lists_.free_list_.is_empty()); + ASSERT_TRUE(node.sess_lists_.used_list_.is_empty()); + ASSERT_EQ(0, node.last_active_ts_); } -TEST_F(TestTableSessPool, reference_session) +TEST_F(TestTableSessPool, test_node_val_init) { - // prepare - ObTableApiSessPoolMgr mgr; - ASSERT_EQ(OB_SUCCESS, mgr.init()); - uint64_t tenant_id = 1; - uint64_t user_id = 0; - ObTableApiSessPoolGuard pool_guard; - ASSERT_EQ(OB_SUCCESS, mgr.extend_sess_pool(tenant_id, pool_guard)); - ObTableApiSessPool *pool = pool_guard.get_sess_pool(); - ASSERT_NE(nullptr, pool); - ObTableApiCredential credential; - credential.tenant_id_ = tenant_id; - credential.user_id_ = user_id; - ASSERT_EQ(OB_SUCCESS, pool->update_sess(credential)); - ObTableApiSessNode *node = nullptr; - ASSERT_EQ(OB_SUCCESS, pool->get_sess_node(user_id, node)); + ObTableApiSessNode node(*mock_cred_); + ObTableApiSessNodeVal val(&node); + ASSERT_FALSE(val.is_inited_); + ASSERT_EQ(&node, val.owner_node_); +} + +TEST_F(TestTableSessPool, test_sess_guard_init) +{ + ObTableApiSessGuard guard; + ASSERT_EQ(nullptr, guard.sess_node_val_); +} + +TEST_F(TestTableSessPool, mgr_get_session) +{ + ASSERT_EQ(OB_SYS_TENANT_ID, MTL_ID()); + ObTableApiSessPoolMgr *mgr = TABLEAPI_SESS_POOL_MGR; + + // first time will create a new node + ASSERT_EQ(OB_SUCCESS, mgr->update_sess(*mock_cred_)); + ASSERT_NE(nullptr, mgr->pool_); + ASSERT_TRUE(mgr->pool_->is_inited_); + ASSERT_EQ(1, mgr->pool_->key_node_map_.size()); + ASSERT_EQ(0, mgr->pool_->retired_nodes_.size_); + ObTableApiSessNode *node; + ASSERT_EQ(OB_SUCCESS, mgr->pool_->get_sess_node(mock_cred_->user_id_, node)); ASSERT_NE(nullptr, node); - void *buf = node->allocator_.alloc(sizeof(ObTableApiSessNodeVal)); - ASSERT_NE(nullptr, buf); - ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(tenant_id, node); - val->is_inited_ = true; - ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(val)); - // get and retire - { - // get - ObTableApiSessGuard guard; - ASSERT_EQ(OB_SUCCESS, pool->get_sess_info(credential, guard)); - ASSERT_NE(nullptr, guard.get_sess_node_val()); - // mark retire - ObTableApiSessForeachOp op; - ASSERT_EQ(OB_SUCCESS, pool->key_node_map_.foreach_refactored(op)); - const ObTableApiSessForeachOp::SessKvArray &kvs = op.get_key_value_array(); - ASSERT_EQ(1, kvs.count()); - const ObTableApiSessForeachOp::ObTableApiSessKV &kv = kvs.at(0); - // run retire task - ASSERT_EQ(OB_SUCCESS, mgr.elimination_task_.run_recycle_retired_sess_task()); - // check - op.reset(); - ASSERT_EQ(OB_SUCCESS, pool->key_node_map_.foreach_refactored(op)); - const ObTableApiSessForeachOp::SessKvArray &new_kvs = op.get_key_value_array(); - ASSERT_EQ(1, new_kvs.count()); - } - // retire after def ref - ASSERT_EQ(OB_SUCCESS, mgr.elimination_task_.run_recycle_retired_sess_task()); - // check - ObTableApiSessPoolForeachOp op; - ASSERT_EQ(OB_SUCCESS, mgr.sess_pool_map_.foreach_refactored(op)); - const ObTableApiSessPoolForeachOp::TelantIdArray &arr = op.get_telant_id_array(); - ASSERT_EQ(1, arr.count()); + ASSERT_TRUE(node->sess_lists_.free_list_.is_empty()); + ASSERT_TRUE(node->sess_lists_.used_list_.is_empty()); + ASSERT_NE(0, node->last_active_ts_); + + // add mock val to node + ObTableApiSessNodeVal val(node); + val.is_inited_ = true; + ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(&val)); + + ObTableApiSessGuard guard; + ASSERT_EQ(OB_SUCCESS, mgr->get_sess_info(*mock_cred_, guard)); + ASSERT_NE(nullptr, guard.sess_node_val_); + ASSERT_NE(nullptr, guard.get_sess_node_val()); + const ObTableApiCredential *cred = nullptr; + ASSERT_EQ(OB_SUCCESS, guard.get_credential(cred)); + ASSERT_NE(nullptr, cred); } -TEST_F(TestTableSessPool, retire_session_then_get_session) +TEST_F(TestTableSessPool, mgr_update_session) { - uint64_t tenant_id = 1; - uint64_t user_id = 0; - ObTableApiSessPoolMgr mgr; - ASSERT_EQ(OB_SUCCESS, mgr.init()); - ObTableApiSessPoolGuard pool_guard; - ASSERT_EQ(OB_SUCCESS, mgr.extend_sess_pool(tenant_id, pool_guard)); - ObTableApiCredential credential; - credential.tenant_id_ = tenant_id; - credential.user_id_ = user_id; - ASSERT_EQ(OB_SUCCESS, mgr.update_sess(credential)); + ASSERT_EQ(OB_SYS_TENANT_ID, MTL_ID()); + ObTableApiSessPoolMgr *mgr = TABLEAPI_SESS_POOL_MGR; + // first time will create a new node + ASSERT_EQ(OB_SUCCESS, mgr->update_sess(*mock_cred_)); + ASSERT_NE(nullptr, mgr->pool_); + ASSERT_TRUE(mgr->pool_->is_inited_); + ASSERT_EQ(1, mgr->pool_->key_node_map_.size()); + ASSERT_EQ(0, mgr->pool_->retired_nodes_.size_); + ObTableApiSessNode *node; + ASSERT_EQ(OB_SUCCESS, mgr->pool_->get_sess_node(mock_cred_->user_id_, node)); + ASSERT_NE(nullptr, node); + ASSERT_TRUE(node->sess_lists_.free_list_.is_empty()); + ASSERT_TRUE(node->sess_lists_.used_list_.is_empty()); + ASSERT_NE(0, node->last_active_ts_); + // second time will do replace + ObTableApiCredential *new_cred = nullptr; + create_credential(1, new_cred); + ASSERT_NE(nullptr, new_cred); + ASSERT_EQ(OB_SUCCESS, mgr->update_sess(*new_cred)); + ASSERT_NE(nullptr, mgr->pool_); + ASSERT_TRUE(mgr->pool_->is_inited_); + ASSERT_EQ(1, mgr->pool_->key_node_map_.size()); + ASSERT_EQ(1, mgr->pool_->retired_nodes_.size_); + ASSERT_EQ(node, mgr->pool_->retired_nodes_.get_last()); + ASSERT_EQ(OB_SUCCESS, mgr->pool_->get_sess_node(new_cred->user_id_, node)); + ASSERT_NE(nullptr, node); + ASSERT_TRUE(node->sess_lists_.free_list_.is_empty()); + ASSERT_TRUE(node->sess_lists_.used_list_.is_empty()); + ASSERT_NE(0, node->last_active_ts_); + // update another key is 2 node. + create_credential(2, new_cred); + ASSERT_NE(nullptr, new_cred); + ASSERT_EQ(OB_SUCCESS, mgr->update_sess(*new_cred)); + ASSERT_NE(nullptr, mgr->pool_); + ASSERT_TRUE(mgr->pool_->is_inited_); + ASSERT_EQ(2, mgr->pool_->key_node_map_.size()); + ASSERT_EQ(1, mgr->pool_->retired_nodes_.size_); + ASSERT_EQ(OB_SUCCESS, mgr->pool_->get_sess_node(new_cred->user_id_, node)); + ASSERT_NE(nullptr, node); + ASSERT_TRUE(node->sess_lists_.free_list_.is_empty()); + ASSERT_TRUE(node->sess_lists_.used_list_.is_empty()); + ASSERT_NE(0, node->last_active_ts_); +} - // 塞一个ObTableApiSessNodeVal - ObTableApiSessPool *pool = pool_guard.get_sess_pool(); - ObTableApiSessNode *node = nullptr; - ASSERT_EQ(OB_SUCCESS, pool->get_sess_node(user_id, node)); - void *buf = node->allocator_.alloc(sizeof(ObTableApiSessNodeVal)); - ASSERT_NE(nullptr, buf); - ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(tenant_id, node); - val->is_inited_ = true; - ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(val)); +TEST_F(TestTableSessPool, mgr_destroy) +{ + ASSERT_EQ(OB_SYS_TENANT_ID, MTL_ID()); + ObTableApiSessPoolMgr *mgr = TABLEAPI_SESS_POOL_MGR; + ASSERT_EQ(OB_SUCCESS, mgr->update_sess(*mock_cred_)); + ASSERT_NE(nullptr, mgr->pool_); + ObTableApiSessNode *node; + ASSERT_EQ(OB_SUCCESS, mgr->pool_->get_sess_node(mock_cred_->user_id_, node)); + mgr->destroy(); + ASSERT_FALSE(mgr->is_inited_); + ASSERT_EQ(nullptr, mgr->pool_); + ASSERT_EQ(0, mgr->allocator_.total()); + ASSERT_EQ(0, mgr->allocator_.used()); +} - // 第一次获取了session - ObTableApiSessGuard sess_guard; - ASSERT_EQ(OB_SUCCESS, mgr.get_sess_info(credential, sess_guard)); - sess_guard.~ObTableApiSessGuard(); // 模仿访问结束,析构 +TEST_F(TestTableSessPool, mgr_sess_recycle) +{ + ASSERT_EQ(OB_SYS_TENANT_ID, MTL_ID()); + ObTableApiSessPoolMgr *mgr = TABLEAPI_SESS_POOL_MGR; + ASSERT_EQ(OB_SUCCESS, mgr->update_sess(*mock_cred_)); + ASSERT_NE(nullptr, mgr->pool_); - // 长时间没有访问ob,session被放到淘汰链表,后台定时回收 - ASSERT_EQ(OB_SUCCESS, pool->get_sess_node(user_id, node)); - ASSERT_EQ(OB_SUCCESS, pool->move_sess_to_retired_list(user_id)); - ASSERT_EQ(1, pool->retired_nodes_.size_); - ASSERT_EQ(OB_SUCCESS, mgr.elimination_task_.run_recycle_retired_sess_task()); - ASSERT_EQ(0, pool->retired_nodes_.size_); + // add mock val to node + ObTableApiSessNode *node; + ASSERT_EQ(OB_SUCCESS, mgr->pool_->get_sess_node(mock_cred_->user_id_, node)); + ObTableApiSessNodeVal val(node); + val.is_inited_ = true; + ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(&val)); - // 连接隔了很长时间,突然又访问db - ASSERT_EQ(OB_HASH_NOT_EXIST, pool->get_sess_node(user_id, node)); + ObTableApiSessGuard guard; + ASSERT_EQ(OB_SUCCESS, mgr->get_sess_info(*mock_cred_, guard)); + mgr->elimination_task_.runTimerTask(); + ASSERT_EQ(1, mgr->pool_->key_node_map_.size()); + ASSERT_EQ(0, mgr->pool_->retired_nodes_.size_); + guard.~ObTableApiSessGuard(); - // 新的访问会创建新的池子 - ASSERT_EQ(OB_SUCCESS, mgr.get_or_create_sess_pool(credential, pool_guard)); + // 3min not access + ASSERT_EQ(OB_SUCCESS, mgr->pool_->get_sess_node(mock_cred_->user_id_, node)); + node->last_active_ts_ = node->last_active_ts_ - ObTableApiSessPool::SESS_RETIRE_TIME; + mgr->elimination_task_.run_retire_sess_task(); + ASSERT_EQ(0, mgr->pool_->key_node_map_.size()); + ASSERT_EQ(1, mgr->pool_->retired_nodes_.size_); + mgr->elimination_task_.run_recycle_retired_sess_task(); + ASSERT_EQ(0, mgr->pool_->key_node_map_.size()); + ASSERT_EQ(0, mgr->pool_->retired_nodes_.size_); } int main(int argc, char **argv)