[Bugfix]tableapi session pool split to tenant

This commit is contained in:
WeiXinChan
2023-09-13 06:19:37 +00:00
committed by ob-robot
parent 1238115c82
commit 68c5f0ce3e
13 changed files with 555 additions and 713 deletions

View File

@ -85,6 +85,7 @@
#include "logservice/palf/log_define.h" #include "logservice/palf/log_define.h"
#include "storage/high_availability/ob_rebuild_service.h" #include "storage/high_availability/ob_rebuild_service.h"
#include "observer/table/ob_htable_lock_mgr.h" #include "observer/table/ob_htable_lock_mgr.h"
#include "observer/table/ob_table_session_pool.h"
namespace oceanbase namespace oceanbase
{ {
@ -703,6 +704,7 @@ int MockTenantModuleEnv::init()
MTL_BIND2(mtl_new_default, ObRebuildService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, ObRebuildService::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
MTL_BIND(table::ObHTableLockMgr::mtl_init, table::ObHTableLockMgr::mtl_destroy); MTL_BIND(table::ObHTableLockMgr::mtl_init, table::ObHTableLockMgr::mtl_destroy);
MTL_BIND2(mtl_new_default, omt::ObTenantSrs::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, omt::ObTenantSrs::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
MTL_BIND2(mtl_new_default, table::ObTableApiSessPoolMgr::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {

View File

@ -137,6 +137,7 @@
#include "share/errsim_module/ob_tenant_errsim_event_mgr.h" #include "share/errsim_module/ob_tenant_errsim_event_mgr.h"
#endif #endif
#include "observer/table/ob_htable_lock_mgr.h" #include "observer/table/ob_htable_lock_mgr.h"
#include "observer/table/ob_table_session_pool.h"
using namespace oceanbase; using namespace oceanbase;
using namespace oceanbase::lib; using namespace oceanbase::lib;
@ -531,6 +532,7 @@ int ObMultiTenant::init(ObAddr myaddr,
MTL_BIND2(mtl_new_default, ObSharedTimer::mtl_init, ObSharedTimer::mtl_start, ObSharedTimer::mtl_stop, ObSharedTimer::mtl_wait, mtl_destroy_default); MTL_BIND2(mtl_new_default, ObSharedTimer::mtl_init, ObSharedTimer::mtl_start, ObSharedTimer::mtl_stop, ObSharedTimer::mtl_wait, mtl_destroy_default);
MTL_BIND2(mtl_new_default, ObOptStatMonitorManager::mtl_init, ObOptStatMonitorManager::mtl_start, ObOptStatMonitorManager::mtl_stop, ObOptStatMonitorManager::mtl_wait, mtl_destroy_default); MTL_BIND2(mtl_new_default, ObOptStatMonitorManager::mtl_init, ObOptStatMonitorManager::mtl_start, ObOptStatMonitorManager::mtl_stop, ObOptStatMonitorManager::mtl_wait, mtl_destroy_default);
MTL_BIND2(mtl_new_default, ObTenantSrs::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default); MTL_BIND2(mtl_new_default, ObTenantSrs::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
MTL_BIND2(mtl_new_default, table::ObTableApiSessPoolMgr::mtl_init, mtl_start_default, mtl_stop_default, mtl_wait_default, mtl_destroy_default);
} }
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {

View File

@ -65,7 +65,7 @@ int ObTableCtx::init_sess_info(ObTableApiCredential &credential)
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
// try get session from session pool // try get session from session pool
if (OB_FAIL(GCTX.table_service_->get_sess_mgr().get_sess_info(credential, sess_guard_))) { if (OB_FAIL(TABLEAPI_SESS_POOL_MGR->get_sess_info(credential, sess_guard_))) {
LOG_WARN("fail to get session info", K(ret), K(credential)); LOG_WARN("fail to get session info", K(ret), K(credential));
} else if (OB_ISNULL(sess_guard_.get_sess_node_val())) { } else if (OB_ISNULL(sess_guard_.get_sess_node_val())) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;

View File

@ -83,9 +83,12 @@ int ObTableLoginP::process()
} else if (OB_FAIL(generate_credential(result_.tenant_id_, result_.user_id_, result_.database_id_, } else if (OB_FAIL(generate_credential(result_.tenant_id_, result_.user_id_, result_.database_id_,
login.ttl_us_, user_token, result_.credential_))) { login.ttl_us_, user_token, result_.credential_))) {
LOG_WARN("failed to generate credential", K(ret), K(login)); LOG_WARN("failed to generate credential", K(ret), K(login));
} else if (OB_FAIL(GCTX.table_service_->get_sess_mgr().update_sess(credential_))) {
LOG_WARN("failed to update session pool", K(ret), K_(credential));
} else { } else {
MTL_SWITCH(credential_.tenant_id_) {
if (OB_FAIL(TABLEAPI_SESS_POOL_MGR->update_sess(credential_))) {
LOG_WARN("failed to update session pool", K(ret), K_(credential));
}
}
result_.reserved1_ = 0; result_.reserved1_ = 0;
result_.reserved2_ = 0; result_.reserved2_ = 0;
result_.server_version_ = ObString::make_string(PACKAGE_STRING); result_.server_version_ = ObString::make_string(PACKAGE_STRING);
@ -258,7 +261,7 @@ int ObTableApiProcessorBase::check_user_access(const ObString &credential_str)
const ObTableApiCredential *sess_credetial = nullptr; const ObTableApiCredential *sess_credetial = nullptr;
if (OB_FAIL(serialization::decode(credential_str.ptr(), credential_str.length(), pos, credential_))) { if (OB_FAIL(serialization::decode(credential_str.ptr(), credential_str.length(), pos, credential_))) {
LOG_WARN("failed to serialize credential", K(ret), K(pos)); LOG_WARN("failed to serialize credential", K(ret), K(pos));
} else if (OB_FAIL(gctx_.table_service_->get_sess_mgr().get_sess_info(credential_, guard))) { } else if (OB_FAIL(TABLEAPI_SESS_POOL_MGR->get_sess_info(credential_, guard))) {
LOG_WARN("fail to get session info", K(ret), K_(credential)); LOG_WARN("fail to get session info", K(ret), K_(credential));
} else if (OB_FAIL(guard.get_credential(sess_credetial))) { } else if (OB_FAIL(guard.get_credential(sess_credetial))) {
LOG_WARN("fail to get credential", K(ret)); LOG_WARN("fail to get credential", K(ret));

View File

@ -18,24 +18,17 @@ using namespace oceanbase::table;
int ObTableService::init() int ObTableService::init()
{ {
int ret = OB_SUCCESS; return OB_SUCCESS;
if (OB_FAIL(sess_pool_mgr_.init())) {
LOG_WARN("fail to init tableapi session pool manager", K(ret));
}
return ret;
} }
void ObTableService::stop() void ObTableService::stop()
{ {
sess_pool_mgr_.stop();
} }
void ObTableService::wait() void ObTableService::wait()
{ {
sess_pool_mgr_.wait();
} }
void ObTableService::destroy() void ObTableService::destroy()
{ {
sess_pool_mgr_.destroy();
} }

View File

@ -28,9 +28,6 @@ public:
void stop(); void stop();
void wait(); void wait();
void destroy(); void destroy();
table::ObTableApiSessPoolMgr& get_sess_mgr() { return sess_pool_mgr_; }
private:
table::ObTableApiSessPoolMgr sess_pool_mgr_;
private: private:
// disallow copy // disallow copy
DISALLOW_COPY_AND_ASSIGN(ObTableService); DISALLOW_COPY_AND_ASSIGN(ObTableService);

View File

@ -12,101 +12,148 @@
#define USING_LOG_PREFIX SERVER #define USING_LOG_PREFIX SERVER
#include "ob_table_session_pool.h" #include "ob_table_session_pool.h"
#include "observer/omt/ob_multi_tenant.h"
using namespace oceanbase::share;
using namespace oceanbase::common;
namespace oceanbase namespace oceanbase
{ {
namespace table namespace table
{ {
/*
init session pool manager when create tenant
- we just obly init the metadata when mtl_init.
*/
int ObTableApiSessPoolMgr::mtl_init(ObTableApiSessPoolMgr *&mgr)
{
return mgr->init();
}
/*
start tableapi retired session task
- 60 second interval
- repeated
*/
int ObTableApiSessPoolMgr::start()
{
int ret = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("table api session pool mgr isn't inited", K(ret));
} else if (OB_FAIL(TG_SCHEDULE(MTL(omt::ObSharedTimer*)->get_tg_id(),
elimination_task_,
ELIMINATE_SESSION_DELAY/* 60s */,
true/* repeat */))) {
LOG_WARN("failed to schedule tableapi retired session task", K(ret));
} else {
elimination_task_.is_inited_ = true;
}
return ret;
}
// stop tableapi retired session task
void ObTableApiSessPoolMgr::stop()
{
if (OB_LIKELY(elimination_task_.is_inited_)) {
TG_CANCEL_TASK(MTL(omt::ObSharedTimer*)->get_tg_id(), elimination_task_);
}
}
// tableapi retired session task wait
void ObTableApiSessPoolMgr::wait()
{
if (OB_LIKELY(elimination_task_.is_inited_)) {
TG_WAIT_TASK(MTL(omt::ObSharedTimer*)->get_tg_id(), elimination_task_);
}
}
/*
destroy session pool manager.
- cancel timer task.
- destroy session pool.
*/
void ObTableApiSessPoolMgr::destroy()
{
int ret = OB_SUCCESS;
if (is_inited_) {
// 1. cancel timer task
if (elimination_task_.is_inited_) {
bool is_exist = true;
if (OB_SUCC(TG_TASK_EXIST(MTL(omt::ObSharedTimer*)->get_tg_id(), elimination_task_, is_exist))) {
if (is_exist) {
TG_CANCEL_TASK(MTL(omt::ObSharedTimer*)->get_tg_id(), elimination_task_);
TG_WAIT_TASK(MTL(omt::ObSharedTimer*)->get_tg_id(), elimination_task_);
elimination_task_.is_inited_ = false;
}
}
}
// 2. destroy session pool
if (OB_NOT_NULL(pool_)) {
pool_->destroy();
pool_ = nullptr;
}
allocator_.reset();
is_inited_ = false;
}
}
// init session pool manager.
int ObTableApiSessPoolMgr::init() int ObTableApiSessPoolMgr::init()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (!is_inited_) { if (!is_inited_) {
ObMemAttr bucket_attr, node_attr; elimination_task_.sess_pool_mgr_ = this;
bucket_attr.label_ = "HashBucApiSessM"; is_inited_ = true;
node_attr.label_ = "HasNodApiSessP";
if (OB_FAIL(sess_pool_map_.create(hash::cal_next_prime(16),
bucket_attr,
node_attr))) {
LOG_WARN("failed to init tableapi session pool", K(ret));
} else {
elimination_task_.sess_pool_mgr_ = this;
timer_.set_run_wrapper(MTL_CTX());
if (OB_FAIL(timer_.init())) {
LOG_WARN("fail to init timer", K(ret));
} else if (OB_FAIL(timer_.schedule(elimination_task_, ELIMINATE_SESSION_DELAY, true))) {
LOG_WARN("fail to schedule session pool elimination task. ", K(ret));
} else if (OB_FAIL(timer_.start())) {
LOG_WARN("fail to start session pool elimination task timer.", K(ret));
} else {
is_inited_ = true;
}
}
} }
return ret; return ret;
} }
void ObTableApiSessPoolMgr::stop() /*
{ get a session or create a new one if it doesn't exist
timer_.stop(); - 1. the user should access the current tenant, so we check tenant id.
} - 2. ObTableApiSessGuard holds the reference count of session.
void ObTableApiSessPoolMgr::wait() - 3. pool_ have been created when login normally,
{ but some inner operation did not login, such as ttl operation, so we create a new pool for ttl.
timer_.wait(); */
} int ObTableApiSessPoolMgr::get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard)
void ObTableApiSessPoolMgr::destroy()
{
if (is_inited_) {
is_inited_ = false;
timer_.destroy();
for (SessPoolMap::iterator it = sess_pool_map_.begin();
it != sess_pool_map_.end();
it++) {
if (OB_ISNULL(it->second)) {
BACKTRACE_RET(ERROR, OB_ERR_UNEXPECTED, true, "session pool is null");
} else {
it->second->~ObTableApiSessPool();
ob_free(it->second);
}
}
}
}
int ObTableApiSessPoolMgr::get_sess_info(ObTableApiCredential &credential,
ObTableApiSessGuard &guard)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObTableApiSessPoolGuard &pool_guard = guard.get_sess_pool_guard();
if (OB_FAIL(get_or_create_sess_pool(credential, pool_guard))) { if (credential.tenant_id_ != MTL_ID()) {
LOG_WARN("fail to get session or create pool", K(ret)); ret = OB_NOT_SUPPORTED;
} else if (OB_FAIL(pool_guard.get_sess_pool()->get_sess_info(credential, guard))) { LOG_WARN("access wrong tenant", K(ret), K(credential.tenant_id_), K(MTL_ID()));
LOG_WARN("fail to get sess info", K(ret), K(credential)); } else if (OB_UNLIKELY(OB_ISNULL(pool_)) && OB_FAIL(create_session_pool_safe())) {
LOG_WARN("fail to create session pool", K(ret), K(credential));
} else if (OB_ISNULL(pool_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session pool is null", K(ret), K(credential));
} else if (OB_FAIL(pool_->get_sess_info(credential, guard))) {
LOG_WARN("fail to get session info", K(ret), K(credential));
} }
return ret; return ret;
} }
int ObTableApiSessPoolMgr::get_or_create_sess_pool(ObTableApiCredential &credential, /*
ObTableApiSessPoolGuard &guard) create session pool safely.
- lock for allocator concurrency.
*/
int ObTableApiSessPoolMgr::create_session_pool_safe()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
const uint64_t tenant_id = credential.tenant_id_;
if (OB_FAIL(get_session_pool(tenant_id, guard))) { if (OB_ISNULL(pool_)) {
if (OB_HASH_NOT_EXIST != ret) { ObLockGuard<ObSpinLock> guard(lock_);
LOG_WARN("fail to get session pool", K(ret), K(credential)); if (OB_ISNULL(pool_)) { // double check
} else { // ret = OB_HASH_NOT_EXIST if (OB_FAIL(create_session_pool_unsafe())) {
ObLockGuard<ObSpinLock> lock_guard(lock_); // lock first LOG_WARN("fail to create session pool", K(ret));
if (OB_FAIL(get_session_pool(tenant_id, guard))) { // double check
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("fail to get session pool", K(ret), K(credential));
} else { // ret = OB_HASH_NOT_EXIST
if (OB_FAIL(extend_sess_pool(tenant_id, guard))) {
LOG_WARN("fail to extend sess pool", K(ret), K(tenant_id));
}
}
} }
} }
} }
@ -114,161 +161,90 @@ int ObTableApiSessPoolMgr::get_or_create_sess_pool(ObTableApiCredential &credent
return ret; return ret;
} }
int ObTableApiSessPoolMgr::update_sess(ObTableApiCredential &credential) int ObTableApiSessPoolMgr::create_session_pool_unsafe()
{
int ret = OB_SUCCESS;
ObTableApiSessPoolGuard guard;
const uint64_t tenant_id = credential.tenant_id_;
if (OB_FAIL(get_session_pool(tenant_id, guard))) {
if (OB_HASH_NOT_EXIST == ret) {
if (OB_FAIL(extend_sess_pool(tenant_id, guard))) {
LOG_WARN("fail to extend sess pool", K(ret), K(tenant_id));
}
} else {
LOG_WARN("fait to get session pool", K(ret), K(tenant_id));
}
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(guard.get_sess_pool()->update_sess(credential))) {
LOG_WARN("fail to update sess pool", K(ret), K(tenant_id), K(credential));
}
return ret;
}
int ObTableApiSessPoolMgr::create_pool_if_not_exists(int64_t tenant_id)
{
int ret = OB_SUCCESS;
ObTableApiSessPoolGuard guard;
if (OB_FAIL(get_session_pool(tenant_id, guard))) {
if (OB_HASH_NOT_EXIST == ret) {
if (OB_FAIL(extend_sess_pool(tenant_id, guard))) {
LOG_WARN("fail to extend sess pool", K(ret), K(tenant_id));
} else {
LOG_INFO("success to extend sess pool", K(ret), K(tenant_id));
}
} else {
LOG_WARN("fait to get session pool", K(ret), K(tenant_id));
}
}
return ret;
}
int ObTableApiSessPoolMgr::extend_sess_pool(uint64_t tenant_id,
ObTableApiSessPoolGuard &guard)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
void *buf = nullptr; void *buf = nullptr;
ObTableApiSessPool *tmp_pool = nullptr; ObTableApiSessPool *tmp_pool = nullptr;
if (OB_ISNULL(buf = ob_malloc(sizeof(ObTableApiSessPool), "ApiSessPool"))) { if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObTableApiSessPool)))) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc mem for ObTableApiSessPool", K(ret)); LOG_WARN("fail to alloc mem for ObTableApiSessPool", K(ret));
} else if (FALSE_IT(tmp_pool = new (buf) ObTableApiSessPool(tenant_id))) { } else if (FALSE_IT(tmp_pool = new (buf) ObTableApiSessPool())) {
} else if (OB_FAIL(tmp_pool->init())) {
LOG_WARN("fail to init sess pool", K(ret));
allocator_.free(tmp_pool);
tmp_pool = nullptr;
} else { } else {
if (OB_FAIL(tmp_pool->init())) { pool_ = tmp_pool;
LOG_WARN("fail to init sess pool", K(ret), K(tenant_id));
} else if (OB_FAIL(sess_pool_map_.set_refactored(tenant_id, tmp_pool))) {
if (OB_HASH_EXIST != ret) {
LOG_WARN("fail to add sess pool to hash map", K(ret), K(*tmp_pool));
} else { // this pool has been set by other thread, free it
tmp_pool->~ObTableApiSessPool();
ob_free(tmp_pool);
tmp_pool = nullptr;
// get sess pool
ObTableApiSessPoolMgrAtomic op;
if (OB_FAIL(sess_pool_map_.read_atomic(tenant_id, op))) {
LOG_WARN("fail to get sess pool", K(ret), K(tenant_id));
} else {
ObTableApiSessPool *pool = op.get_session_pool();
pool->inc_ref_count();
guard.set_sess_pool(pool);
}
}
} else {
tmp_pool->inc_ref_count();
guard.set_sess_pool(tmp_pool);
}
if (OB_FAIL(ret) && OB_NOT_NULL(tmp_pool)) {
tmp_pool->~ObTableApiSessPool();
ob_free(tmp_pool);
tmp_pool = nullptr;
}
} }
return ret; return ret;
} }
int ObTableApiSessPoolMgr::get_session_pool(uint64_t tenant_id, ObTableApiSessPoolGuard &guard) /*
update session when login.
- 1. because tableapi is not aware of changes to system variables,
users need to log in again to get the latest system variables.
- 2. we will create a new session node which has the latest system variables
to replace the old session node.
- 3. login is handled by sys tenant.
- 4. login has concurrency, many thread will login together.
*/
int ObTableApiSessPoolMgr::update_sess(ObTableApiCredential &credential)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (is_inited_) { if (OB_UNLIKELY(OB_ISNULL(pool_)) && OB_FAIL(create_session_pool_safe())) {
ObTableApiSessPoolMgrAtomic op; LOG_WARN("fail to create session pool", K(ret), K(credential));
ret = sess_pool_map_.read_atomic(tenant_id, op); } else if (OB_FAIL(pool_->update_sess(credential))) {
if (OB_SUCC(ret)) { // exist LOG_WARN("fail to update sess pool", K(ret), K(credential));
ObTableApiSessPool *pool = op.get_session_pool();
pool->inc_ref_count();
guard.set_sess_pool(pool);
} else if (OB_HASH_NOT_EXIST == ret) {
// do nothing
} else {
LOG_WARN("fait to atomic get session pool", K(ret), K(tenant_id));
}
} }
return ret; return ret;
} }
// 1. 淘汰长期未被使用的session
// 2. 回收淘汰的session /*
The background timer tasks to delete session node.
- retire session node that have not been accessed for more than 3 minutes.
- recycle session node in retired node list.
*/
void ObTableApiSessPoolMgr::ObTableApiSessEliminationTask::runTimerTask() void ObTableApiSessPoolMgr::ObTableApiSessEliminationTask::runTimerTask()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_ISNULL(sess_pool_mgr_)) { if (OB_FAIL(run_retire_sess_task())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("sess_pool_mgr_ not inited", K(ret));
} else if (OB_FAIL(run_retire_sess_task())) {
LOG_WARN("fail to run retire sess task", K(ret)); LOG_WARN("fail to run retire sess task", K(ret));
} else if (OB_FAIL(run_recycle_retired_sess_task())) { } else if (OB_FAIL(run_recycle_retired_sess_task())) {
LOG_WARN("fail to run recycle retired sess task", K(ret)); LOG_WARN("fail to run recycle retired sess task", K(ret));
} }
} }
/*
retire session node that have not been accessed for more than 3 minutes.
- move session node which have not been accessed for more than 3 minutes to retired node list.
*/
int ObTableApiSessPoolMgr::ObTableApiSessEliminationTask::run_retire_sess_task() int ObTableApiSessPoolMgr::ObTableApiSessEliminationTask::run_retire_sess_task()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_ISNULL(sess_pool_mgr_)) { if (OB_ISNULL(sess_pool_mgr_)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("sess_pool_mgr_ not inited", K(ret)); LOG_WARN("sess_pool_mgr_ is null", K(ret));
} else { } else if (OB_ISNULL(sess_pool_mgr_->pool_)) {
ObTableApiSessPoolForeachOp op; ret = OB_ERR_UNEXPECTED;
if (OB_FAIL(sess_pool_mgr_->sess_pool_map_.foreach_refactored(op))) { LOG_WARN("session pool is null", K(ret));
LOG_WARN("fail to foreach sess pool hash map", K(ret)); } else if (sess_pool_mgr_->pool_->retire_session_node()) {
} else { LOG_WARN("fail to retire session node", K(ret));
const ObTableApiSessPoolForeachOp::TelantIdArray &arr = op.get_telant_id_array();
const int64_t N = arr.count();
for (int64_t i = 0; i < N && OB_SUCC(ret); i++) {
uint64_t tenant_id = arr.at(i);
ObTableApiSessPoolGuard pool_guard;
if (OB_FAIL(sess_pool_mgr_->get_session_pool(tenant_id, pool_guard))) {
LOG_WARN("fail to get sess pool", K(ret), K(tenant_id));
} else if (OB_FAIL(pool_guard.get_sess_pool()->move_sess_to_retired_list())) {
LOG_WARN("fail to move retired session", K(ret));
}
}
}
} }
return ret; return ret;
} }
/*
evict retired session node from retired node list.
*/
int ObTableApiSessPoolMgr::ObTableApiSessEliminationTask::run_recycle_retired_sess_task() int ObTableApiSessPoolMgr::ObTableApiSessEliminationTask::run_recycle_retired_sess_task()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -276,54 +252,20 @@ int ObTableApiSessPoolMgr::ObTableApiSessEliminationTask::run_recycle_retired_se
if (OB_ISNULL(sess_pool_mgr_)) { if (OB_ISNULL(sess_pool_mgr_)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("sess_pool_mgr_ not inited", K(ret)); LOG_WARN("sess_pool_mgr_ not inited", K(ret));
} else { } else if (OB_ISNULL(sess_pool_mgr_->pool_)) {
ObTableApiSessPoolForeachOp op; ret = OB_ERR_UNEXPECTED;
if (OB_FAIL(sess_pool_mgr_->sess_pool_map_.foreach_refactored(op))) { LOG_WARN("session pool is null", K(ret));
LOG_WARN("fail to foreach sess pool hash map", K(ret)); } else if (OB_FAIL(sess_pool_mgr_->pool_->evict_retired_sess())) {
} else { LOG_WARN("fail to evict retired sess", K(ret));
const ObTableApiSessPoolForeachOp::TelantIdArray &arr = op.get_telant_id_array();
const int64_t N = arr.count();
for (int64_t i = 0; i < N && OB_SUCC(ret); i++) {
uint64_t tenant_id = arr.at(i);
ObTableApiSessPoolGuard pool_guard;
ObTableApiSessPool *pool = nullptr;
if (OB_FAIL(sess_pool_mgr_->get_session_pool(tenant_id, pool_guard))) {
LOG_WARN("fail to get sess pool", K(ret), K(tenant_id));
} else if (FALSE_IT(pool = pool_guard.get_sess_pool())) {
} else if (OB_FAIL(pool->evict_retired_sess())) {
LOG_WARN("fail to evict retired sess", K(ret));
} else if (pool->is_empty()) {
// 1. 标记delete,由最后一个持有者释放内存
// 2. 从hash表中摘出,避免新的请求获取到这个pool
pool->set_deleted();
ObTableApiSessPool *del_pool = nullptr;
if (OB_FAIL(sess_pool_mgr_->sess_pool_map_.erase_refactored(tenant_id, &del_pool))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("fail to erase sess pool from sess pool hash map", K(ret), K(tenant_id));
}
}
}
}
}
}
return ret;
}
int ObTableApiSessPoolMgrAtomic::operator() (MapKV &entry)
{
int ret = common::OB_SUCCESS;
if (OB_ISNULL(entry.second)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret));
} else {
sess_pool_ = entry.second;
} }
return ret; return ret;
} }
/*
init session pool
- init key_node_map_ which is a hashmap, key is user_id, value is ObTableApiSessNode*
*/
int ObTableApiSessPool::init(int64_t hash_bucket/* = SESS_POOL_DEFAULT_BUCKET_NUM */) int ObTableApiSessPool::init(int64_t hash_bucket/* = SESS_POOL_DEFAULT_BUCKET_NUM */)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -332,8 +274,8 @@ int ObTableApiSessPool::init(int64_t hash_bucket/* = SESS_POOL_DEFAULT_BUCKET_NU
if (OB_FAIL(key_node_map_.create(hash::cal_next_prime(hash_bucket), if (OB_FAIL(key_node_map_.create(hash::cal_next_prime(hash_bucket),
"HashBucApiSessP", "HashBucApiSessP",
"HasNodApiSess", "HasNodApiSess",
tenant_id_))) { MTL_ID()))) {
LOG_WARN("fail to init sess pool", K(ret), K(hash_bucket), K_(tenant_id)); LOG_WARN("fail to init sess pool", K(ret), K(hash_bucket), K(MTL_ID()));
} else { } else {
is_inited_ = true; is_inited_ = true;
} }
@ -342,18 +284,53 @@ int ObTableApiSessPool::init(int64_t hash_bucket/* = SESS_POOL_DEFAULT_BUCKET_NU
return ret; return ret;
} }
/*
destroy session pool.
- free all session.
*/
void ObTableApiSessPool::destroy() void ObTableApiSessPool::destroy()
{ {
if (is_inited_) { int ret = OB_SUCCESS;
if (OB_SUCCESS != evict_all_session()) { ObTableApiSessForeachOp op;
LOG_WARN_RET(OB_ERR_UNEXPECTED, "fail to evict all seesion");
// clear map
if (OB_FAIL(key_node_map_.foreach_refactored(op))) {
LOG_WARN("fail to foreach sess key node map", K(ret));
} else {
const ObTableApiSessForeachOp::SessKvArray &arr = op.get_key_value_array();
const int64_t N = arr.count();
for (int64_t i = 0; OB_SUCC(ret) && i < N; ++i) {
const ObTableApiSessForeachOp::ObTableApiSessKV &kv = arr.at(i);
ObTableApiSessNode *del_node = nullptr;
if (OB_FAIL(key_node_map_.erase_refactored(kv.key_, &del_node))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("fail to erase sess from sess hash map", K(ret), K(kv));
}
} else if (OB_NOT_NULL(del_node)) {
del_node->destroy();
}
} }
is_inited_ = false;
} }
// clear retired_nodes_
DLIST_FOREACH(node, retired_nodes_) {
if (OB_NOT_NULL(node)) {
node->destroy();
}
}
retired_nodes_.clear();
key_node_map_.destroy();
allocator_.reset();
is_inited_ = false;
} }
// 将过期的node从hash map中摘掉,加入retired_nodes_链表中 /*
int ObTableApiSessPool::move_sess_to_retired_list() loop all session node to retire.
- nodes which have not been visited for more than 5 minutes will be retired.
- move retired node to retired list.
*/
int ObTableApiSessPool::retire_session_node()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
int64_t cur_time = ObTimeUtility::current_time(); int64_t cur_time = ObTimeUtility::current_time();
@ -367,8 +344,13 @@ int ObTableApiSessPool::move_sess_to_retired_list()
for (int64_t i = 0; OB_SUCC(ret) && i < N; ++i) { for (int64_t i = 0; OB_SUCC(ret) && i < N; ++i) {
const ObTableApiSessForeachOp::ObTableApiSessKV &kv = arr.at(i); const ObTableApiSessForeachOp::ObTableApiSessKV &kv = arr.at(i);
if (cur_time - kv.node_->get_last_active_ts() >= SESS_RETIRE_TIME) { if (cur_time - kv.node_->get_last_active_ts() >= SESS_RETIRE_TIME) {
if (OB_FAIL(move_sess_to_retired_list(kv.key_))) { ObTableApiSessNode *del_node = nullptr;
LOG_WARN("fail to move retired session", K(ret), K(kv.key_)); if (OB_FAIL(key_node_map_.erase_refactored(kv.key_, &del_node))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("fail to erase sess from sess hash map", K(ret), K(kv.key_));
}
} else if (OB_FAIL(move_node_to_retired_list(del_node))) {
LOG_WARN("fail to move session node to retired list", K(ret));
} }
} }
} }
@ -377,27 +359,7 @@ int ObTableApiSessPool::move_sess_to_retired_list()
return ret; return ret;
} }
int ObTableApiSessPool::move_sess_to_retired_list(uint64_t key) int ObTableApiSessPool::move_node_to_retired_list(ObTableApiSessNode *node)
{
int ret = OB_SUCCESS;
ObTableApiSessNode *del_node = nullptr;
if (OB_FAIL(key_node_map_.erase_refactored(key, &del_node))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("fail to erase sess from sess hash map", K(ret), K(key));
}
} else {
ObLockGuard<ObSpinLock> guard(lock_);
if (false == (retired_nodes_.add_last(del_node))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to add retired sess node to retired list", K(ret), K(*del_node));
}
}
return ret;
}
int ObTableApiSessPool::move_sess_to_retired_list(ObTableApiSessNode *node)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -413,7 +375,12 @@ int ObTableApiSessPool::move_sess_to_retired_list(ObTableApiSessNode *node)
return ret; return ret;
} }
// 清理retired_nodes_中没有引用的node /*
evit retired session.
1. remove session val in free_list.
2. remove session node from retired_nodes_ when node is empty.
3. free node memory.
*/
int ObTableApiSessPool::evict_retired_sess() int ObTableApiSessPool::evict_retired_sess()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -437,32 +404,6 @@ int ObTableApiSessPool::evict_retired_sess()
return ret; return ret;
} }
int ObTableApiSessPool::evict_all_session()
{
int ret = OB_SUCCESS;
ObTableApiSessForeachOp op;
if (OB_FAIL(key_node_map_.foreach_refactored(op))) {
LOG_WARN("fail to foreach sess key node map", K(ret));
} else {
const ObTableApiSessForeachOp::SessKvArray &arr = op.get_key_value_array();
const int64_t N = arr.count();
for (int64_t i = 0; OB_SUCC(ret) && i < N; ++i) {
const ObTableApiSessForeachOp::ObTableApiSessKV &kv = arr.at(i);
ObTableApiSessNode *del_node = nullptr;
if (OB_FAIL(key_node_map_.erase_refactored(kv.key_, &del_node))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("fail to erase sess from sess hash map", K(ret), K(kv));
}
} else {
ObLockGuard<ObSpinLock> guard(lock_);
del_node->~ObTableApiSessNode();
allocator_.free(del_node);
}
}
}
return ret;
}
int ObTableApiSessPool::get_sess_node(uint64_t key, int ObTableApiSessPool::get_sess_node(uint64_t key,
ObTableApiSessNode *&node) ObTableApiSessNode *&node)
{ {
@ -489,27 +430,45 @@ int ObTableApiSessPool::get_sess_node(uint64_t key,
return ret; return ret;
} }
/*
get session
1. get session node
2. create new one if not exist
3. get session node value
3.1 if there is no session node val in node list, extend it.
struct pool {
map: [user_id0:node1][user_id2:node:2]
}
struct node {
list: node_val0 - node_val1 - node_val2 - ... - node_valn
}
*/
int ObTableApiSessPool::get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard) int ObTableApiSessPool::get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObTableApiSessNode *sess_node = nullptr; ObTableApiSessNode *sess_node = nullptr;
ObTableApiSessNodeVal *sess_val = nullptr;
bool need_extend = false; bool need_extend = false;
if (OB_FAIL(get_sess_node(credential.user_id_, sess_node))) { if (OB_FAIL(get_sess_node(credential.user_id_, sess_node))) { // first get
LOG_WARN("fail to get sess node", K(ret), K(credential)); if (OB_HASH_NOT_EXIST != ret) {
}
if (OB_HASH_NOT_EXIST == ret) {
if (OB_FAIL(create_and_add_node(credential))) {
LOG_WARN("fail to create and add session node to session pool", K(ret), K(credential));
} else if (OB_FAIL(get_sess_node(credential.user_id_, sess_node))) { // get again
LOG_WARN("fail to get sess node", K(ret), K(credential)); LOG_WARN("fail to get sess node", K(ret), K(credential));
} }
} }
if (OB_SUCC(ret)) { if (OB_FAIL(ret) && OB_HASH_NOT_EXIST != ret) {
if (OB_FAIL(sess_node->get_sess_node_val(sess_val))) { // exist // do nothing
} else if (OB_UNLIKELY(OB_HASH_NOT_EXIST == ret) && OB_FAIL(create_and_add_node_safe(credential))) { // not exist, create
LOG_WARN("fail to create and add session node", K(ret), K(credential));
} else if (OB_UNLIKELY(OB_ISNULL(sess_node)) && OB_FAIL(get_sess_node(credential.user_id_, sess_node))) { // get again
LOG_WARN("fail to get sess node", K(ret), K(credential));
} else if (OB_UNLIKELY(OB_ISNULL(sess_node))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session is null", K(ret));
} else {
ObTableApiSessNodeVal *sess_val = nullptr;
if (OB_FAIL(sess_node->get_sess_node_val(sess_val))) {
LOG_WARN("fail to get sess node value", K(ret), K(*sess_node)); LOG_WARN("fail to get sess node value", K(ret), K(*sess_node));
} else if (OB_ISNULL(sess_val)) { } else if (OB_ISNULL(sess_val)) {
need_extend = true; need_extend = true;
@ -519,8 +478,8 @@ int ObTableApiSessPool::get_sess_info(ObTableApiCredential &credential, ObTableA
} }
if (need_extend) { if (need_extend) {
if (OB_FAIL(sess_node->extend_sess_val(guard))) { if (OB_FAIL(sess_node->extend_and_get_sess_val(guard))) {
LOG_WARN("fail to extend sess val", K(ret), K(*sess_node), K(credential)); LOG_WARN("fail to extend and get sess val", K(ret), K(*sess_node), K(credential));
} }
} }
@ -532,7 +491,7 @@ int ObTableApiSessPool::get_sess_info(ObTableApiCredential &credential, ObTableA
return ret; return ret;
} }
int ObTableApiSessPool::create_node(ObTableApiCredential &credential, ObTableApiSessNode *&node) int ObTableApiSessPool::create_node_safe(ObTableApiCredential &credential, ObTableApiSessNode *&node)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObLockGuard<ObSpinLock> guard(lock_); ObLockGuard<ObSpinLock> guard(lock_);
@ -551,30 +510,32 @@ int ObTableApiSessPool::create_node(ObTableApiCredential &credential, ObTableApi
return ret; return ret;
} }
int ObTableApiSessPool::create_and_add_node(ObTableApiCredential &credential) int ObTableApiSessPool::create_and_add_node_safe(ObTableApiCredential &credential)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObTableApiSessNode *node = nullptr; ObTableApiSessNode *node = nullptr;
if (OB_FAIL(create_node(credential, node))) { if (OB_FAIL(create_node_safe(credential, node))) {
LOG_WARN("fail to create node", K(ret), K(credential)); LOG_WARN("fail to create node", K(ret), K(credential));
} else if (OB_FAIL(key_node_map_.set_refactored(credential.user_id_, node))) { } else if (OB_FAIL(key_node_map_.set_refactored(credential.user_id_, node))) {
if (OB_HASH_EXIST != ret) { if (OB_HASH_EXIST != ret) {
LOG_WARN("fail to add sess node to hash map", K(ret), K(credential.user_id_), K(*node)); LOG_WARN("fail to add sess node to hash map", K(ret), K(credential.user_id_), K(*node));
} else { // this node has been set by other thread, free it
ObLockGuard<ObSpinLock> guard(lock_);
node->~ObTableApiSessNode();
allocator_.free(node);
node = nullptr;
} }
// this node has been set by other thread, free it
ObLockGuard<ObSpinLock> guard(lock_);
node->~ObTableApiSessNode();
allocator_.free(node);
node = nullptr;
} }
return ret; return ret;
} }
// 1. login时调用 /*
// 2. 不存在,创建; 存在,旧的移动到淘汰链表, 添加新的node 1. only call in login
int ObTableApiSessPool::update_sess(ObTableApiCredential &credential, bool replace_old_node) 2. move old to retired list when node exist, create new node otherwise.
*/
int ObTableApiSessPool::update_sess(ObTableApiCredential &credential)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -582,22 +543,21 @@ int ObTableApiSessPool::update_sess(ObTableApiCredential &credential, bool repla
const uint64_t key = credential.user_id_; const uint64_t key = credential.user_id_;
if (OB_FAIL(get_sess_node(key, node))) { if (OB_FAIL(get_sess_node(key, node))) {
if (OB_HASH_NOT_EXIST == ret) { // not exist, create if (OB_HASH_NOT_EXIST == ret) { // not exist, create
if (OB_FAIL(create_and_add_node(credential))) { if (OB_FAIL(create_and_add_node_safe(credential))) {
LOG_WARN("fail to create and add node", K(ret), K(credential)); LOG_WARN("fail to create and add node", K(ret), K(credential));
} }
} else { } else {
LOG_WARN("fail to get session node", K(ret), K(key)); LOG_WARN("fail to get session node", K(ret), K(key));
} }
} else if (replace_old_node) { // exist, 替换node,old node移动到淘汰链表等待淘汰 } else if (OB_FAIL(replace_sess_node_safe(credential))) { // exist, create and replace old node
if (OB_FAIL(replace_sess(credential))) { LOG_WARN("fail to replace session node", K(ret), K(credential));
LOG_WARN("fail to replace session node", K(ret), K(credential));
}
} }
return ret; return ret;
} }
int ObTableApiSessPool::replace_sess(ObTableApiCredential &credential) // create and replace old node in callback function
int ObTableApiSessPool::replace_sess_node_safe(ObTableApiCredential &credential)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -609,18 +569,12 @@ int ObTableApiSessPool::replace_sess(ObTableApiCredential &credential)
return ret; return ret;
} }
int64_t ObTableApiSessPool::inc_ref_count() void ObTableApiSessNodeVal::destroy()
{ {
return ATOMIC_AAF((uint64_t *)&ref_count_, 1); sess_info_.destroy();
} allocator_.reset();
is_inited_ = false;
void ObTableApiSessPool::dec_ref_count() owner_node_ = nullptr;
{
(void)ATOMIC_SAF((uint64_t *)&ref_count_, 1);
if (is_deleted() && 0 == ref_count_) {
this->~ObTableApiSessPool();
ob_free(this);
}
} }
int ObTableApiSessNodeVal::init_sess_info() int ObTableApiSessNodeVal::init_sess_info()
@ -630,19 +584,19 @@ int ObTableApiSessNodeVal::init_sess_info()
if (!is_inited_) { if (!is_inited_) {
share::schema::ObSchemaGetterGuard schema_guard; share::schema::ObSchemaGetterGuard schema_guard;
const ObTenantSchema *tenant_schema = nullptr; const ObTenantSchema *tenant_schema = nullptr;
if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id_, schema_guard))) { if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(MTL_ID(), schema_guard))) {
LOG_WARN("fail to get schema guard", K(ret), K_(tenant_id)); LOG_WARN("fail to get schema guard", K(ret), K(MTL_ID()));
} else if (OB_FAIL(schema_guard.get_tenant_info(tenant_id_, tenant_schema))) { } else if (OB_FAIL(schema_guard.get_tenant_info(MTL_ID(), tenant_schema))) {
LOG_WARN("fail to get tenant schema", K(ret), K_(tenant_id)); LOG_WARN("fail to get tenant schema", K(ret), K(MTL_ID()));
} else if (OB_ISNULL(tenant_schema)) { } else if (OB_ISNULL(tenant_schema)) {
ret = OB_SCHEMA_ERROR; ret = OB_SCHEMA_ERROR;
LOG_WARN("tenant schema is null", K(ret)); LOG_WARN("tenant schema is null", K(ret));
} else if (OB_FAIL(ObTableApiSessUtil::init_sess_info(tenant_id_, } else if (OB_FAIL(ObTableApiSessUtil::init_sess_info(MTL_ID(),
tenant_schema->get_tenant_name_str(), tenant_schema->get_tenant_name_str(),
&allocator_, &allocator_,
schema_guard, schema_guard,
sess_info_))) { sess_info_))) {
LOG_WARN("fail to init sess info", K(ret), K(tenant_id_)); LOG_WARN("fail to init sess info", K(ret), K(MTL_ID()));
} else { } else {
is_inited_ = true; is_inited_ = true;
} }
@ -651,6 +605,11 @@ int ObTableApiSessNodeVal::init_sess_info()
return ret; return ret;
} }
/*
move node val to free list from used list.
- remove from used list.
- add to free list.
*/
void ObTableApiSessNodeVal::give_back_to_free_list() void ObTableApiSessNodeVal::give_back_to_free_list()
{ {
if (OB_NOT_NULL(owner_node_)) { if (OB_NOT_NULL(owner_node_)) {
@ -675,17 +634,16 @@ void ObTableApiSessNode::destroy()
DLIST_FOREACH_REMOVESAFE(sess, free_list) { DLIST_FOREACH_REMOVESAFE(sess, free_list) {
ObTableApiSessNodeVal *rm_sess = free_list.remove(sess); ObTableApiSessNodeVal *rm_sess = free_list.remove(sess);
if (OB_NOT_NULL(rm_sess)) { if (OB_NOT_NULL(rm_sess)) {
rm_sess->~ObTableApiSessNodeVal(); rm_sess->destroy();
allocator_.free(rm_sess);
} }
} }
DLIST_FOREACH_REMOVESAFE(sess, used_list) { DLIST_FOREACH_REMOVESAFE(sess, used_list) {
ObTableApiSessNodeVal *rm_sess = used_list.remove(sess); ObTableApiSessNodeVal *rm_sess = used_list.remove(sess);
if (OB_NOT_NULL(rm_sess)) { if (OB_NOT_NULL(rm_sess)) {
rm_sess->~ObTableApiSessNodeVal(); rm_sess->destroy();
allocator_.free(rm_sess);
} }
} }
allocator_.reset();
} }
int ObTableApiSessNode::remove_unused_sess() int ObTableApiSessNode::remove_unused_sess()
@ -710,7 +668,11 @@ int ObTableApiSessNode::remove_unused_sess()
return ret; return ret;
} }
// 从free list中取出,添加到used list中 /*
get session node val
- remove and get from free list.
- add to used list.
*/
int ObTableApiSessNode::get_sess_node_val(ObTableApiSessNodeVal *&val) int ObTableApiSessNode::get_sess_node_val(ObTableApiSessNodeVal *&val)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -733,7 +695,13 @@ int ObTableApiSessNode::get_sess_node_val(ObTableApiSessNodeVal *&val)
return ret; return ret;
} }
int ObTableApiSessNode::extend_sess_val(ObTableApiSessGuard &guard) /*
extend a session node val and put it to guard
- alloc new session node val.
- add to use list.
- put to guard.
*/
int ObTableApiSessNode::extend_and_get_sess_val(ObTableApiSessGuard &guard)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -743,7 +711,7 @@ int ObTableApiSessNode::extend_sess_val(ObTableApiSessGuard &guard)
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc mem for ObTableApiSessNodeVal", K(ret), K(sizeof(ObTableApiSessNodeVal))); LOG_WARN("fail to alloc mem for ObTableApiSessNodeVal", K(ret), K(sizeof(ObTableApiSessNodeVal)));
} else { } else {
ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(tenant_id_, this); ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(this);
if (OB_FAIL(val->init_sess_info())) { if (OB_FAIL(val->init_sess_info())) {
LOG_WARN("fail to init sess info", K(ret), K(*val)); LOG_WARN("fail to init sess info", K(ret), K(*val));
} else { } else {
@ -757,6 +725,11 @@ int ObTableApiSessNode::extend_sess_val(ObTableApiSessGuard &guard)
} }
} }
if (OB_FAIL(ret) && OB_NOT_NULL(buf)) {
allocator_.free(buf);
buf = nullptr;
}
return ret; return ret;
} }
@ -775,6 +748,12 @@ int ObTableApiSessNodeAtomicOp::get_value(ObTableApiSessNode *&node)
return ret; return ret;
} }
/*
replace session node operation
1. create new node.
2. replace them.
3. move old node to retired list.
*/
int ObTableApiSessNodeReplaceOp::operator()(MapKV &entry) int ObTableApiSessNodeReplaceOp::operator()(MapKV &entry)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -783,16 +762,16 @@ int ObTableApiSessNodeReplaceOp::operator()(MapKV &entry)
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
LOG_WARN("null session node", K(ret), K(entry.first)); LOG_WARN("null session node", K(ret), K(entry.first));
} else { } else {
// 1. 创建新的session node // 1. create new session node
ObTableApiSessNode *new_node = nullptr; ObTableApiSessNode *new_node = nullptr;
if (OB_FAIL(pool_.create_node(credential_, new_node))) { if (OB_FAIL(pool_.create_node_safe(credential_, new_node))) {
LOG_WARN("fail to create node", K(ret), K_(credential)); LOG_WARN("fail to create node", K(ret), K_(credential));
} else { } else {
// 2. 替换 // 2. replace
ObTableApiSessNode *old_node = entry.second; ObTableApiSessNode *old_node = entry.second;
entry.second = new_node; entry.second = new_node;
// 3. old node移动到淘汰链表 // 3. move old node to retired list
pool_.move_sess_to_retired_list(old_node); // 添加到链表末尾,不会出错,故不判断返回值 pool_.move_node_to_retired_list(old_node); // 添加到链表末尾,不会出错,故不判断返回值
} }
} }
@ -810,17 +789,6 @@ int ObTableApiSessForeachOp::operator()(MapKV &entry)
return ret; return ret;
} }
int ObTableApiSessPoolForeachOp::operator()(MapKV &entry)
{
int ret = common::OB_SUCCESS;
if (OB_FAIL(telant_ids_.push_back(entry.first))) {
LOG_WARN("fail to push back key", K(ret), K(entry.first));
}
return ret;
}
int ObTableApiSessUtil::init_sess_info(uint64_t tenant_id, int ObTableApiSessUtil::init_sess_info(uint64_t tenant_id,
const common::ObString &tenant_name, const common::ObString &tenant_name,
ObIAllocator *allocator, ObIAllocator *allocator,

View File

@ -25,16 +25,14 @@ class ObTableApiSessNode;
class ObTableApiSessGuard; class ObTableApiSessGuard;
class ObTableApiSessNodeVal; class ObTableApiSessNodeVal;
class ObTableApiSessNodeAtomicOp; class ObTableApiSessNodeAtomicOp;
class ObTableApiSessPoolGuard;
class ObTableApiSessPoolMgr final class ObTableApiSessPoolMgr final
{ {
public:
// key is tenant_id
typedef common::hash::ObHashMap<uint64_t, ObTableApiSessPool*> SessPoolMap;
public: public:
ObTableApiSessPoolMgr() ObTableApiSessPoolMgr()
: is_inited_(false) : is_inited_(false),
allocator_("TbSessPoolMgr", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
pool_(nullptr)
{} {}
virtual ~ObTableApiSessPoolMgr() { destroy(); } virtual ~ObTableApiSessPoolMgr() { destroy(); }
TO_STRING_KV(K_(is_inited), TO_STRING_KV(K_(is_inited),
@ -44,11 +42,11 @@ public:
{ {
public: public:
ObTableApiSessEliminationTask() ObTableApiSessEliminationTask()
: sess_pool_mgr_(nullptr), : is_inited_(false),
run_task_counter_(0) sess_pool_mgr_(nullptr)
{ {
} }
TO_STRING_KV(K_(run_task_counter)); TO_STRING_KV(K_(is_inited), KPC_(sess_pool_mgr));
void runTimerTask(void); void runTimerTask(void);
private: private:
// 回收已经淘汰的session // 回收已经淘汰的session
@ -56,47 +54,33 @@ public:
// 淘汰长期未被使用的session // 淘汰长期未被使用的session
int run_retire_sess_task(); int run_retire_sess_task();
public: public:
bool is_inited_;
ObTableApiSessPoolMgr *sess_pool_mgr_; ObTableApiSessPoolMgr *sess_pool_mgr_;
int64_t run_task_counter_;
}; };
public: public:
int init(); static int mtl_init(ObTableApiSessPoolMgr *&mgr);
int start();
void stop(); void stop();
void wait(); void wait();
void destroy(); void destroy();
int get_session_pool(uint64_t tenant_id, ObTableApiSessPoolGuard &guard); int init();
int get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard); int get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard);
int update_sess(ObTableApiCredential &credential); int update_sess(ObTableApiCredential &credential);
int create_pool_if_not_exists(int64_t tenant_id);
private: private:
int extend_sess_pool(uint64_t tenant_id, ObTableApiSessPoolGuard &guard); int create_session_pool_safe();
int get_or_create_sess_pool(ObTableApiCredential &credential, ObTableApiSessPoolGuard &guard); int create_session_pool_unsafe();
private: private:
static const int64_t ELIMINATE_SESSION_DELAY = 60 * 1000 * 1000; // 60s static const int64_t ELIMINATE_SESSION_DELAY = 60 * 1000 * 1000; // 60s
bool is_inited_; bool is_inited_;
SessPoolMap sess_pool_map_; common::ObArenaAllocator allocator_;
ObTableApiSessPool *pool_;
ObTableApiSessEliminationTask elimination_task_; ObTableApiSessEliminationTask elimination_task_;
common::ObTimer timer_;
ObSpinLock lock_; // for get_or_create_sess_pool ObSpinLock lock_; // for get_or_create_sess_pool
private: private:
DISALLOW_COPY_AND_ASSIGN(ObTableApiSessPoolMgr); DISALLOW_COPY_AND_ASSIGN(ObTableApiSessPoolMgr);
}; };
class ObTableApiSessPoolMgrAtomic #define TABLEAPI_SESS_POOL_MGR (MTL(ObTableApiSessPoolMgr*))
{
public:
typedef common::hash::HashMapPair<uint64_t, ObTableApiSessPool *> MapKV;
public:
ObTableApiSessPoolMgrAtomic()
: sess_pool_(nullptr)
{}
int operator() (MapKV &entry);
ObTableApiSessPool *get_session_pool() { return sess_pool_; }
private:
ObTableApiSessPool *sess_pool_;
private:
DISALLOW_COPY_AND_ASSIGN(ObTableApiSessPoolMgrAtomic);
};
class ObTableApiSessPool final class ObTableApiSessPool final
{ {
@ -106,45 +90,29 @@ public:
static const int64_t SESS_POOL_DEFAULT_BUCKET_NUM = 10; // 取决于客户端登录的用户数量 static const int64_t SESS_POOL_DEFAULT_BUCKET_NUM = 10; // 取决于客户端登录的用户数量
static const int64_t SESS_RETIRE_TIME = 300 * 1000000; // 超过300s未被访问的session会被标记淘汰 static const int64_t SESS_RETIRE_TIME = 300 * 1000000; // 超过300s未被访问的session会被标记淘汰
public: public:
explicit ObTableApiSessPool(uint64_t tenant_id) explicit ObTableApiSessPool()
: allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id), : allocator_("TbSessPool", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
is_inited_(false), is_inited_(false)
tenant_id_(tenant_id),
ref_count_(0),
is_deleted_(false)
{} {}
~ObTableApiSessPool() { destroy(); }; ~ObTableApiSessPool() { destroy(); };
TO_STRING_KV(K_(is_inited), TO_STRING_KV(K_(is_inited),
K_(tenant_id), K_(retired_nodes));
K_(ref_count),
K_(is_deleted));
int init(int64_t hash_bucket = SESS_POOL_DEFAULT_BUCKET_NUM); int init(int64_t hash_bucket = SESS_POOL_DEFAULT_BUCKET_NUM);
void destroy(); void destroy();
int64_t inc_ref_count();
void dec_ref_count();
void set_deleted() { ATOMIC_SET(&is_deleted_, true); }
bool is_deleted() { return ATOMIC_LOAD(&is_deleted_); }
bool is_empty() const { return key_node_map_.empty(); }
int get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard); int get_sess_info(ObTableApiCredential &credential, ObTableApiSessGuard &guard);
int update_sess(ObTableApiCredential &credential, bool replace_old_node = true); int update_sess(ObTableApiCredential &credential);
// 将过期的node移动到retired_nodes_ int retire_session_node();
int move_sess_to_retired_list();
int evict_retired_sess(); int evict_retired_sess();
int create_node(ObTableApiCredential &credential, ObTableApiSessNode *&node); int create_node_safe(ObTableApiCredential &credential, ObTableApiSessNode *&node);
int move_sess_to_retired_list(ObTableApiSessNode *node); int move_node_to_retired_list(ObTableApiSessNode *node);
private: private:
int replace_sess(ObTableApiCredential &credential); int replace_sess_node_safe(ObTableApiCredential &credential);
int create_and_add_node(ObTableApiCredential &credential); int create_and_add_node_safe(ObTableApiCredential &credential);
int get_sess_node(uint64_t key, ObTableApiSessNode *&node); int get_sess_node(uint64_t key, ObTableApiSessNode *&node);
int evict_all_session();
int move_sess_to_retired_list(uint64_t key);
private: private:
common::ObArenaAllocator allocator_; common::ObArenaAllocator allocator_;
bool is_inited_; bool is_inited_;
uint64_t tenant_id_;
CacheKeyNodeMap key_node_map_; CacheKeyNodeMap key_node_map_;
volatile int64_t ref_count_;
volatile bool is_deleted_;
// 已经淘汰的node,等待被后台删除 // 已经淘汰的node,等待被后台删除
// 前台login时、后台淘汰时都会操作retired_nodes_,因此需要加锁 // 前台login时、后台淘汰时都会操作retired_nodes_,因此需要加锁
common::ObDList<ObTableApiSessNode> retired_nodes_; common::ObDList<ObTableApiSessNode> retired_nodes_;
@ -153,44 +121,21 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObTableApiSessPool); DISALLOW_COPY_AND_ASSIGN(ObTableApiSessPool);
}; };
class ObTableApiSessPoolGuard final
{
public:
ObTableApiSessPoolGuard()
: pool_(nullptr)
{}
~ObTableApiSessPoolGuard()
{
if (OB_NOT_NULL(pool_)) {
pool_->dec_ref_count();
}
}
ObTableApiSessPool *get_sess_pool()
{
return pool_;
}
void set_sess_pool(ObTableApiSessPool *pool) { pool_ = pool; }
private:
ObTableApiSessPool *pool_;
};
class ObTableApiSessNodeVal : public common::ObDLinkBase<ObTableApiSessNodeVal> class ObTableApiSessNodeVal : public common::ObDLinkBase<ObTableApiSessNodeVal>
{ {
friend class ObTableApiSessNode; friend class ObTableApiSessNode;
friend class ObTableApiSessGuard; friend class ObTableApiSessGuard;
public: public:
explicit ObTableApiSessNodeVal(uint64_t tenant_id, ObTableApiSessNode *owner) explicit ObTableApiSessNodeVal(ObTableApiSessNode *owner)
: allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id), : allocator_("TbSessNodeVal", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
is_inited_(false), is_inited_(false),
tenant_id_(tenant_id),
owner_node_(owner) owner_node_(owner)
{} {}
TO_STRING_KV(K_(is_inited), TO_STRING_KV(K_(is_inited),
K_(tenant_id),
K_(sess_info)); K_(sess_info));
public: public:
void destroy();
sql::ObSQLSessionInfo& get_sess_info() { return sess_info_; } sql::ObSQLSessionInfo& get_sess_info() { return sess_info_; }
const sql::ObSQLSessionInfo& get_sess_info() const { return sess_info_; }
int init_sess_info(); int init_sess_info();
void reset_tx_desc() { // 防止异步提交场景在 session 析构的时候 rollback 事务 void reset_tx_desc() { // 防止异步提交场景在 session 析构的时候 rollback 事务
sql::ObSQLSessionInfo::LockGuard guard(sess_info_.get_thread_data_lock()); sql::ObSQLSessionInfo::LockGuard guard(sess_info_.get_thread_data_lock());
@ -200,7 +145,6 @@ public:
private: private:
common::ObArenaAllocator allocator_; common::ObArenaAllocator allocator_;
bool is_inited_; bool is_inited_;
uint64_t tenant_id_;
sql::ObSQLSessionInfo sess_info_; sql::ObSQLSessionInfo sess_info_;
ObTableApiSessNode *owner_node_; ObTableApiSessNode *owner_node_;
private: private:
@ -213,16 +157,14 @@ friend class ObTableApiSessPool;
friend class ObTableApiSessNodeVal; friend class ObTableApiSessNodeVal;
public: public:
explicit ObTableApiSessNode(ObTableApiCredential &credential) explicit ObTableApiSessNode(ObTableApiCredential &credential)
: allocator_(ObModIds::TABLE_PROC, OB_MALLOC_NORMAL_BLOCK_SIZE, credential.tenant_id_), : allocator_("TbSessNode", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
tenant_id_(credential.tenant_id_),
sess_lists_(), sess_lists_(),
last_active_ts_(0), last_active_ts_(0),
credential_(credential) credential_(credential)
{ {
} }
~ObTableApiSessNode() { destroy(); } ~ObTableApiSessNode() { destroy(); }
TO_STRING_KV(K_(tenant_id), TO_STRING_KV(K_(sess_lists),
K_(sess_lists),
K_(last_active_ts), K_(last_active_ts),
K_(credential)); K_(credential));
class SessList class SessList
@ -247,10 +189,9 @@ public:
OB_INLINE int64_t get_last_active_ts() const { return last_active_ts_; } OB_INLINE int64_t get_last_active_ts() const { return last_active_ts_; }
int remove_unused_sess(); int remove_unused_sess();
private: private:
int extend_sess_val(ObTableApiSessGuard &guard); int extend_and_get_sess_val(ObTableApiSessGuard &guard);
private: private:
common::ObArenaAllocator allocator_; common::ObArenaAllocator allocator_;
uint64_t tenant_id_;
SessList sess_lists_; SessList sess_lists_;
int64_t last_active_ts_; int64_t last_active_ts_;
ObTableApiCredential credential_; ObTableApiCredential credential_;
@ -282,7 +223,6 @@ public:
ObTableApiSessNodeVal* get_sess_node_val() const { return sess_node_val_; } ObTableApiSessNodeVal* get_sess_node_val() const { return sess_node_val_; }
sql::ObSQLSessionInfo& get_sess_info() { return sess_node_val_->get_sess_info(); } sql::ObSQLSessionInfo& get_sess_info() { return sess_node_val_->get_sess_info(); }
const sql::ObSQLSessionInfo& get_sess_info() const { return sess_node_val_->get_sess_info(); } const sql::ObSQLSessionInfo& get_sess_info() const { return sess_node_val_->get_sess_info(); }
ObTableApiSessPoolGuard &get_sess_pool_guard() { return pool_guard_; }
int get_credential(const ObTableApiCredential *&credential) const int get_credential(const ObTableApiCredential *&credential) const
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -297,7 +237,6 @@ public:
} }
private: private:
ObTableApiSessNodeVal *sess_node_val_; ObTableApiSessNodeVal *sess_node_val_;
ObTableApiSessPoolGuard pool_guard_;
}; };
class ObTableApiSessNodeAtomicOp class ObTableApiSessNodeAtomicOp
@ -367,22 +306,6 @@ private:
DISALLOW_COPY_AND_ASSIGN(ObTableApiSessForeachOp); DISALLOW_COPY_AND_ASSIGN(ObTableApiSessForeachOp);
}; };
class ObTableApiSessPoolForeachOp
{
public:
typedef common::hash::HashMapPair<uint64_t, ObTableApiSessPool*> MapKV;
typedef common::ObSEArray<uint64_t, 16> TelantIdArray;
ObTableApiSessPoolForeachOp()
{}
int operator()(MapKV &entry);
const TelantIdArray &get_telant_id_array() const { return telant_ids_; }
void reset() { telant_ids_.reset(); }
private:
TelantIdArray telant_ids_;
private:
DISALLOW_COPY_AND_ASSIGN(ObTableApiSessPoolForeachOp);
};
class ObTableApiSessUtil final class ObTableApiSessUtil final
{ {
public: public:

View File

@ -73,8 +73,6 @@ int ObTableTTLDeleteTask::init(ObTenantTabletTTLMgr *ttl_tablet_mgr,
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
if (OB_FAIL(init_credential(ttl_para))) { if (OB_FAIL(init_credential(ttl_para))) {
LOG_WARN("fail to init credential", KR(ret)); LOG_WARN("fail to init credential", KR(ret));
} else if (OB_FAIL(create_session_pool(ttl_para.tenant_id_))) {
LOG_WARN("fail to update session pool");
} else { } else {
param_ = ttl_para; param_ = ttl_para;
info_ = &ttl_info; info_ = &ttl_info;
@ -85,20 +83,6 @@ int ObTableTTLDeleteTask::init(ObTenantTabletTTLMgr *ttl_tablet_mgr,
return ret; return ret;
} }
int ObTableTTLDeleteTask::create_session_pool(int64_t tenant_id)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(GCTX.table_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("table service is null", KR(ret));
} else if (OB_FAIL(GCTX.table_service_->get_sess_mgr().create_pool_if_not_exists(tenant_id))) {
LOG_WARN("fait to get session pool", K(ret), K(tenant_id));
}
return ret;
}
int ObTableTTLDeleteTask::init_credential(const ObTTLTaskParam &ttl_param) int ObTableTTLDeleteTask::init_credential(const ObTTLTaskParam &ttl_param)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;

View File

@ -103,7 +103,6 @@ public:
} }
return tablet_id; return tablet_id;
} }
int create_session_pool(int64_t tenant_id);
uint64_t get_table_id() const { return param_.table_id_; } uint64_t get_table_id() const { return param_.table_id_; }
int64_t get_timeout_ts() { return ONE_TASK_TIMEOUT + ObTimeUtility::current_time(); } int64_t get_timeout_ts() { return ONE_TASK_TIMEOUT + ObTimeUtility::current_time(); }
common::ObRowkey &get_start_rowkey() { return rowkey_; } common::ObRowkey &get_start_rowkey() { return rowkey_; }

View File

@ -107,7 +107,9 @@ namespace concurrency_control {
} }
namespace table namespace table
{ {
class ObTTLService;
class ObHTableLockMgr; class ObHTableLockMgr;
class ObTableApiSessPoolMgr;
} }
namespace logservice namespace logservice
{ {
@ -180,11 +182,6 @@ namespace storage {
class MockTenantModuleEnv; class MockTenantModuleEnv;
} }
namespace table
{
class ObTTLService;
}
namespace share namespace share
{ {
class ObCgroupCtrl; class ObCgroupCtrl;
@ -318,7 +315,8 @@ using ObTableScanIteratorObjPool = common::ObServerObjectPool<oceanbase::storage
oceanbase::common::ObOptStatMonitorManager*, \ oceanbase::common::ObOptStatMonitorManager*, \
omt::ObTenantSrs*, \ omt::ObTenantSrs*, \
table::ObHTableLockMgr*, \ table::ObHTableLockMgr*, \
table::ObTTLService* \ table::ObTTLService*, \
table::ObTableApiSessPoolMgr* \
) )

View File

@ -152,7 +152,7 @@ void TestCreateExecutor::TearDown()
{ {
} }
ObTableApiSessNodeVal g_sess_node_val(1, NULL); ObTableApiSessNodeVal g_sess_node_val(NULL);
void TestCreateExecutor::fake_ctx_init_common(ObTableCtx &fake_ctx, ObTableSchema *table_schema) void TestCreateExecutor::fake_ctx_init_common(ObTableCtx &fake_ctx, ObTableSchema *table_schema)
{ {
fake_ctx.table_schema_ = table_schema; fake_ctx.table_schema_ = table_schema;

View File

@ -1,256 +1,229 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#define private public // 获取私有成员 #define private public // 获取私有成员
#include "observer/table/ob_table_session_pool.h" #include "observer/table/ob_table_session_pool.h"
#include "lib/utility/ob_test_util.h"
#include "share/ob_thread_pool.h"
#include "mtlenv/mock_tenant_module_env.h"
#include "share/rc/ob_tenant_base.h"
using namespace oceanbase;
using namespace oceanbase::common; using namespace oceanbase::common;
using namespace oceanbase::table; using namespace oceanbase::table;
using namespace oceanbase::sql; using namespace oceanbase::sql;
using namespace oceanbase::share;
using namespace oceanbase::storage;
class TestTableSessPool: public ::testing::Test class TestTableSessPool: public ::testing::Test
{ {
public:
const int64_t TENANT_CNT = 10;
const int64_t USER_CNT = 100;
const int64_t NODE_CNT = 100;
const int64_t SESS_CNT = 10;
public: public:
TestTableSessPool() {} TestTableSessPool() {}
virtual ~TestTableSessPool() {} virtual ~TestTableSessPool() {}
void prepare_sess_pool(ObTableApiSessPoolMgr &mgr); static void SetUpTestCase()
{
EXPECT_EQ(OB_SUCCESS, MockTenantModuleEnv::get_instance().init());
}
static void TearDownTestCase()
{
MockTenantModuleEnv::get_instance().destroy();
}
void SetUp()
{
ASSERT_TRUE(MockTenantModuleEnv::get_instance().is_inited());
TABLEAPI_SESS_POOL_MGR->init();
create_credential(1, mock_cred_);
}
void TearDown()
{
TABLEAPI_SESS_POOL_MGR->destroy();
}
private: private:
void create_credential(uint64_t user_id, ObTableApiCredential *&cred);
private:
ObArenaAllocator allocator_;
ObTableApiCredential *mock_cred_;
// disallow copy // disallow copy
DISALLOW_COPY_AND_ASSIGN(TestTableSessPool); DISALLOW_COPY_AND_ASSIGN(TestTableSessPool);
}; };
// 10租户,100用户,每个用户下面挂10个session void TestTableSessPool::create_credential(uint64_t user_id, ObTableApiCredential *&cred)
void TestTableSessPool::prepare_sess_pool(ObTableApiSessPoolMgr &mgr)
{ {
ASSERT_EQ(OB_SUCCESS, mgr.init()); void *buf = nullptr;
uint64_t tenant_ids[TENANT_CNT]; buf = allocator_.alloc(sizeof(ObTableApiCredential));
uint64_t user_ids[USER_CNT]; cred = new (buf) ObTableApiCredential();
ObTableApiSessPoolGuard pool_guards[TENANT_CNT]; cred->cluster_id_ = 0;
for (uint64_t i = 0; i < TENANT_CNT; i++) { cred->tenant_id_ = 1;
tenant_ids[i] = i + 1; cred->user_id_ = user_id;
ASSERT_EQ(OB_SUCCESS, mgr.extend_sess_pool(tenant_ids[i], pool_guards[i])); cred->database_id_ = 1;
ObTableApiSessPool *tmp_pool = pool_guards[i].get_sess_pool(); cred->expire_ts_ = 0;
ASSERT_NE(nullptr, tmp_pool); cred->hash(cred->hash_val_);
for (uint64_t j = 0; j < USER_CNT; j++) {
user_ids[j] = j;
ASSERT_EQ(OB_SUCCESS, mgr.get_session_pool(tenant_ids[i], pool_guards[i]));
ObTableApiSessPool *pool = pool_guards[i].get_sess_pool();
ASSERT_NE(nullptr, pool);
ObTableApiCredential credential;
credential.tenant_id_ = tenant_ids[i];
credential.user_id_ = user_ids[j];
ASSERT_EQ(OB_SUCCESS, pool->update_sess(credential));
ObTableApiSessNode *node = nullptr;
ASSERT_EQ(OB_SUCCESS, tmp_pool->get_sess_node(user_ids[j], node));
ASSERT_NE(nullptr, node);
for (int64_t k = 0; k < SESS_CNT; k++) {
void *buf = node->allocator_.alloc(sizeof(ObTableApiSessNodeVal));
ASSERT_NE(nullptr, buf);
ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(tenant_ids[i], node);
val->is_inited_ = true;
ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(val));
}
}
}
} }
TEST_F(TestTableSessPool, get_session) TEST_F(TestTableSessPool, test_mgr_init)
{ {
uint64_t tenant_id = 1;
uint64_t user_id = 0;
uint64_t key = user_id;
ObTableApiSessPoolMgr mgr; ObTableApiSessPoolMgr mgr;
ASSERT_EQ(OB_SUCCESS, mgr.init()); ASSERT_FALSE(mgr.is_inited_);
ObTableApiSessPoolGuard pool_guard; ASSERT_EQ(nullptr, mgr.pool_);
ASSERT_EQ(OB_HASH_NOT_EXIST, mgr.get_session_pool(tenant_id, pool_guard)); ASSERT_EQ(nullptr, mgr.elimination_task_.sess_pool_mgr_);
ASSERT_EQ(nullptr, pool_guard.get_sess_pool());
ASSERT_EQ(OB_SUCCESS, mgr.extend_sess_pool(tenant_id, pool_guard)); ASSERT_EQ(OB_SYS_TENANT_ID, MTL_ID());
ObTableApiSessPool *pool = pool_guard.get_sess_pool(); ASSERT_TRUE(TABLEAPI_SESS_POOL_MGR->is_inited_);
ASSERT_TRUE(nullptr != pool); ASSERT_EQ(nullptr, TABLEAPI_SESS_POOL_MGR->pool_);
ASSERT_TRUE(pool->is_inited_); ASSERT_EQ(TABLEAPI_SESS_POOL_MGR, TABLEAPI_SESS_POOL_MGR->elimination_task_.sess_pool_mgr_);
ASSERT_EQ(1, pool->ref_count_);
ASSERT_EQ(false, pool->is_deleted_);
ASSERT_EQ(tenant_id, pool->tenant_id_);
ObTableApiSessGuard sess_guard;
ObTableApiCredential credential;
credential.tenant_id_ = tenant_id;
credential.user_id_ = user_id;
ASSERT_EQ(OB_SUCCESS, pool->update_sess(credential));
ObTableApiSessNode *node = nullptr;
ASSERT_EQ(OB_SUCCESS, pool->get_sess_node(key, node));
ASSERT_TRUE(nullptr != node);
ASSERT_TRUE(node->is_empty());
void *buf = node->allocator_.alloc(sizeof(ObTableApiSessNodeVal));
ASSERT_NE(nullptr, buf);
ObTableApiSessNodeVal *new_val = new (buf) ObTableApiSessNodeVal(tenant_id, node);
new_val->is_inited_ = true;
ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(new_val));
} }
TEST_F(TestTableSessPool, remove_session) TEST_F(TestTableSessPool, test_pool_init)
{ {
int64_t tenant_id = 1; ObTableApiSessPool pool;
uint64_t user_id = 0; ASSERT_FALSE(pool.is_inited_);
ObTableApiCredential credential; ASSERT_TRUE(pool.key_node_map_.empty());
credential.tenant_id_ = tenant_id; ASSERT_TRUE(pool.retired_nodes_.is_empty());
credential.user_id_ = user_id;
ObTableApiSessNode node(credential);
for (int64_t i = 0; i < SESS_CNT; i++) {
void *buf = node.allocator_.alloc(sizeof(ObTableApiSessNodeVal));
ASSERT_NE(nullptr, buf);
ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(tenant_id, &node);
val->is_inited_ = true;
ASSERT_EQ(true, node.sess_lists_.free_list_.add_last(val));
}
ASSERT_EQ(false, node.is_empty());
ASSERT_EQ(SESS_CNT, node.sess_lists_.free_list_.get_size());
node.remove_unused_sess();
ASSERT_EQ(0, node.sess_lists_.free_list_.get_size());
} }
TEST_F(TestTableSessPool, retire_session) TEST_F(TestTableSessPool, test_node_init)
{ {
int ret = 0; ObTableApiSessNode node(*mock_cred_);
ObTableApiSessPoolMgr mgr; ASSERT_TRUE(node.sess_lists_.free_list_.is_empty());
prepare_sess_pool(mgr); ASSERT_TRUE(node.sess_lists_.used_list_.is_empty());
ObTableApiSessPoolForeachOp op; ASSERT_EQ(0, node.last_active_ts_);
ASSERT_EQ(OB_SUCCESS, mgr.sess_pool_map_.foreach_refactored(op));
const ObTableApiSessPoolForeachOp::TelantIdArray &tenant_ids = op.get_telant_id_array();
ASSERT_EQ(TENANT_CNT, tenant_ids.count());
const int64_t N = tenant_ids.count();
// 1. 标记淘汰
for (int64_t i = 0; i < N; i++) {
uint64_t tenant_id = tenant_ids.at(i);
ObTableApiSessPoolGuard pool_guard;
ASSERT_EQ(OB_SUCCESS, mgr.get_session_pool(tenant_id, pool_guard));
ObTableApiSessPool *pool = pool_guard.get_sess_pool();
ASSERT_NE(nullptr, pool);
ObTableApiSessForeachOp op;
ASSERT_EQ(OB_SUCCESS, pool->key_node_map_.foreach_refactored(op));
const ObTableApiSessForeachOp::SessKvArray &kvs = op.get_key_value_array();
ASSERT_EQ(NODE_CNT, kvs.count());
for (int64_t j = 0; j < kvs.count(); j++) {
if (j % 2 == 0) {
const ObTableApiSessForeachOp::ObTableApiSessKV &kv = kvs.at(j);
ASSERT_EQ(OB_SUCCESS, pool->move_sess_to_retired_list(kv.key_));
}
}
}
// 2. 触发淘汰
ASSERT_EQ(OB_SUCCESS, mgr.elimination_task_.run_recycle_retired_sess_task());
// 3. 检查
for (int64_t i = 0; i < N; i++) {
uint64_t tenant_id = tenant_ids.at(i);
ObTableApiSessPoolGuard pool_guard;
ASSERT_EQ(OB_SUCCESS, mgr.get_session_pool(tenant_id, pool_guard));
ObTableApiSessPool *pool = pool_guard.get_sess_pool();
ASSERT_NE(nullptr, pool);
ObTableApiSessForeachOp op;
ASSERT_EQ(OB_SUCCESS, pool->key_node_map_.foreach_refactored(op));
const ObTableApiSessForeachOp::SessKvArray &kvs = op.get_key_value_array();
ASSERT_EQ(NODE_CNT/2, kvs.count());
for (int64_t j = 0; j < kvs.count(); j++) {
const ObTableApiSessForeachOp::ObTableApiSessKV &kv = kvs.at(j);
ASSERT_EQ(SESS_CNT, kv.node_->sess_lists_.free_list_.get_size());
}
}
} }
TEST_F(TestTableSessPool, reference_session) TEST_F(TestTableSessPool, test_node_val_init)
{ {
// prepare ObTableApiSessNode node(*mock_cred_);
ObTableApiSessPoolMgr mgr; ObTableApiSessNodeVal val(&node);
ASSERT_EQ(OB_SUCCESS, mgr.init()); ASSERT_FALSE(val.is_inited_);
uint64_t tenant_id = 1; ASSERT_EQ(&node, val.owner_node_);
uint64_t user_id = 0; }
ObTableApiSessPoolGuard pool_guard;
ASSERT_EQ(OB_SUCCESS, mgr.extend_sess_pool(tenant_id, pool_guard)); TEST_F(TestTableSessPool, test_sess_guard_init)
ObTableApiSessPool *pool = pool_guard.get_sess_pool(); {
ASSERT_NE(nullptr, pool); ObTableApiSessGuard guard;
ObTableApiCredential credential; ASSERT_EQ(nullptr, guard.sess_node_val_);
credential.tenant_id_ = tenant_id; }
credential.user_id_ = user_id;
ASSERT_EQ(OB_SUCCESS, pool->update_sess(credential)); TEST_F(TestTableSessPool, mgr_get_session)
ObTableApiSessNode *node = nullptr; {
ASSERT_EQ(OB_SUCCESS, pool->get_sess_node(user_id, node)); ASSERT_EQ(OB_SYS_TENANT_ID, MTL_ID());
ObTableApiSessPoolMgr *mgr = TABLEAPI_SESS_POOL_MGR;
// first time will create a new node
ASSERT_EQ(OB_SUCCESS, mgr->update_sess(*mock_cred_));
ASSERT_NE(nullptr, mgr->pool_);
ASSERT_TRUE(mgr->pool_->is_inited_);
ASSERT_EQ(1, mgr->pool_->key_node_map_.size());
ASSERT_EQ(0, mgr->pool_->retired_nodes_.size_);
ObTableApiSessNode *node;
ASSERT_EQ(OB_SUCCESS, mgr->pool_->get_sess_node(mock_cred_->user_id_, node));
ASSERT_NE(nullptr, node); ASSERT_NE(nullptr, node);
void *buf = node->allocator_.alloc(sizeof(ObTableApiSessNodeVal)); ASSERT_TRUE(node->sess_lists_.free_list_.is_empty());
ASSERT_NE(nullptr, buf); ASSERT_TRUE(node->sess_lists_.used_list_.is_empty());
ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(tenant_id, node); ASSERT_NE(0, node->last_active_ts_);
val->is_inited_ = true;
ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(val)); // add mock val to node
// get and retire ObTableApiSessNodeVal val(node);
{ val.is_inited_ = true;
// get ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(&val));
ObTableApiSessGuard guard;
ASSERT_EQ(OB_SUCCESS, pool->get_sess_info(credential, guard)); ObTableApiSessGuard guard;
ASSERT_NE(nullptr, guard.get_sess_node_val()); ASSERT_EQ(OB_SUCCESS, mgr->get_sess_info(*mock_cred_, guard));
// mark retire ASSERT_NE(nullptr, guard.sess_node_val_);
ObTableApiSessForeachOp op; ASSERT_NE(nullptr, guard.get_sess_node_val());
ASSERT_EQ(OB_SUCCESS, pool->key_node_map_.foreach_refactored(op)); const ObTableApiCredential *cred = nullptr;
const ObTableApiSessForeachOp::SessKvArray &kvs = op.get_key_value_array(); ASSERT_EQ(OB_SUCCESS, guard.get_credential(cred));
ASSERT_EQ(1, kvs.count()); ASSERT_NE(nullptr, cred);
const ObTableApiSessForeachOp::ObTableApiSessKV &kv = kvs.at(0);
// run retire task
ASSERT_EQ(OB_SUCCESS, mgr.elimination_task_.run_recycle_retired_sess_task());
// check
op.reset();
ASSERT_EQ(OB_SUCCESS, pool->key_node_map_.foreach_refactored(op));
const ObTableApiSessForeachOp::SessKvArray &new_kvs = op.get_key_value_array();
ASSERT_EQ(1, new_kvs.count());
}
// retire after def ref
ASSERT_EQ(OB_SUCCESS, mgr.elimination_task_.run_recycle_retired_sess_task());
// check
ObTableApiSessPoolForeachOp op;
ASSERT_EQ(OB_SUCCESS, mgr.sess_pool_map_.foreach_refactored(op));
const ObTableApiSessPoolForeachOp::TelantIdArray &arr = op.get_telant_id_array();
ASSERT_EQ(1, arr.count());
} }
TEST_F(TestTableSessPool, retire_session_then_get_session) TEST_F(TestTableSessPool, mgr_update_session)
{ {
uint64_t tenant_id = 1; ASSERT_EQ(OB_SYS_TENANT_ID, MTL_ID());
uint64_t user_id = 0; ObTableApiSessPoolMgr *mgr = TABLEAPI_SESS_POOL_MGR;
ObTableApiSessPoolMgr mgr; // first time will create a new node
ASSERT_EQ(OB_SUCCESS, mgr.init()); ASSERT_EQ(OB_SUCCESS, mgr->update_sess(*mock_cred_));
ObTableApiSessPoolGuard pool_guard; ASSERT_NE(nullptr, mgr->pool_);
ASSERT_EQ(OB_SUCCESS, mgr.extend_sess_pool(tenant_id, pool_guard)); ASSERT_TRUE(mgr->pool_->is_inited_);
ObTableApiCredential credential; ASSERT_EQ(1, mgr->pool_->key_node_map_.size());
credential.tenant_id_ = tenant_id; ASSERT_EQ(0, mgr->pool_->retired_nodes_.size_);
credential.user_id_ = user_id; ObTableApiSessNode *node;
ASSERT_EQ(OB_SUCCESS, mgr.update_sess(credential)); ASSERT_EQ(OB_SUCCESS, mgr->pool_->get_sess_node(mock_cred_->user_id_, node));
ASSERT_NE(nullptr, node);
ASSERT_TRUE(node->sess_lists_.free_list_.is_empty());
ASSERT_TRUE(node->sess_lists_.used_list_.is_empty());
ASSERT_NE(0, node->last_active_ts_);
// second time will do replace
ObTableApiCredential *new_cred = nullptr;
create_credential(1, new_cred);
ASSERT_NE(nullptr, new_cred);
ASSERT_EQ(OB_SUCCESS, mgr->update_sess(*new_cred));
ASSERT_NE(nullptr, mgr->pool_);
ASSERT_TRUE(mgr->pool_->is_inited_);
ASSERT_EQ(1, mgr->pool_->key_node_map_.size());
ASSERT_EQ(1, mgr->pool_->retired_nodes_.size_);
ASSERT_EQ(node, mgr->pool_->retired_nodes_.get_last());
ASSERT_EQ(OB_SUCCESS, mgr->pool_->get_sess_node(new_cred->user_id_, node));
ASSERT_NE(nullptr, node);
ASSERT_TRUE(node->sess_lists_.free_list_.is_empty());
ASSERT_TRUE(node->sess_lists_.used_list_.is_empty());
ASSERT_NE(0, node->last_active_ts_);
// update another key is 2 node.
create_credential(2, new_cred);
ASSERT_NE(nullptr, new_cred);
ASSERT_EQ(OB_SUCCESS, mgr->update_sess(*new_cred));
ASSERT_NE(nullptr, mgr->pool_);
ASSERT_TRUE(mgr->pool_->is_inited_);
ASSERT_EQ(2, mgr->pool_->key_node_map_.size());
ASSERT_EQ(1, mgr->pool_->retired_nodes_.size_);
ASSERT_EQ(OB_SUCCESS, mgr->pool_->get_sess_node(new_cred->user_id_, node));
ASSERT_NE(nullptr, node);
ASSERT_TRUE(node->sess_lists_.free_list_.is_empty());
ASSERT_TRUE(node->sess_lists_.used_list_.is_empty());
ASSERT_NE(0, node->last_active_ts_);
}
// 塞一个ObTableApiSessNodeVal TEST_F(TestTableSessPool, mgr_destroy)
ObTableApiSessPool *pool = pool_guard.get_sess_pool(); {
ObTableApiSessNode *node = nullptr; ASSERT_EQ(OB_SYS_TENANT_ID, MTL_ID());
ASSERT_EQ(OB_SUCCESS, pool->get_sess_node(user_id, node)); ObTableApiSessPoolMgr *mgr = TABLEAPI_SESS_POOL_MGR;
void *buf = node->allocator_.alloc(sizeof(ObTableApiSessNodeVal)); ASSERT_EQ(OB_SUCCESS, mgr->update_sess(*mock_cred_));
ASSERT_NE(nullptr, buf); ASSERT_NE(nullptr, mgr->pool_);
ObTableApiSessNodeVal *val = new (buf) ObTableApiSessNodeVal(tenant_id, node); ObTableApiSessNode *node;
val->is_inited_ = true; ASSERT_EQ(OB_SUCCESS, mgr->pool_->get_sess_node(mock_cred_->user_id_, node));
ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(val)); mgr->destroy();
ASSERT_FALSE(mgr->is_inited_);
ASSERT_EQ(nullptr, mgr->pool_);
ASSERT_EQ(0, mgr->allocator_.total());
ASSERT_EQ(0, mgr->allocator_.used());
}
// 第一次获取了session TEST_F(TestTableSessPool, mgr_sess_recycle)
ObTableApiSessGuard sess_guard; {
ASSERT_EQ(OB_SUCCESS, mgr.get_sess_info(credential, sess_guard)); ASSERT_EQ(OB_SYS_TENANT_ID, MTL_ID());
sess_guard.~ObTableApiSessGuard(); // 模仿访问结束,析构 ObTableApiSessPoolMgr *mgr = TABLEAPI_SESS_POOL_MGR;
ASSERT_EQ(OB_SUCCESS, mgr->update_sess(*mock_cred_));
ASSERT_NE(nullptr, mgr->pool_);
// 长时间没有访问ob,session被放到淘汰链表,后台定时回收 // add mock val to node
ASSERT_EQ(OB_SUCCESS, pool->get_sess_node(user_id, node)); ObTableApiSessNode *node;
ASSERT_EQ(OB_SUCCESS, pool->move_sess_to_retired_list(user_id)); ASSERT_EQ(OB_SUCCESS, mgr->pool_->get_sess_node(mock_cred_->user_id_, node));
ASSERT_EQ(1, pool->retired_nodes_.size_); ObTableApiSessNodeVal val(node);
ASSERT_EQ(OB_SUCCESS, mgr.elimination_task_.run_recycle_retired_sess_task()); val.is_inited_ = true;
ASSERT_EQ(0, pool->retired_nodes_.size_); ASSERT_EQ(true, node->sess_lists_.free_list_.add_last(&val));
// 连接隔了很长时间,突然又访问db ObTableApiSessGuard guard;
ASSERT_EQ(OB_HASH_NOT_EXIST, pool->get_sess_node(user_id, node)); ASSERT_EQ(OB_SUCCESS, mgr->get_sess_info(*mock_cred_, guard));
mgr->elimination_task_.runTimerTask();
ASSERT_EQ(1, mgr->pool_->key_node_map_.size());
ASSERT_EQ(0, mgr->pool_->retired_nodes_.size_);
guard.~ObTableApiSessGuard();
// 新的访问会创建新的池子 // 3min not access
ASSERT_EQ(OB_SUCCESS, mgr.get_or_create_sess_pool(credential, pool_guard)); ASSERT_EQ(OB_SUCCESS, mgr->pool_->get_sess_node(mock_cred_->user_id_, node));
node->last_active_ts_ = node->last_active_ts_ - ObTableApiSessPool::SESS_RETIRE_TIME;
mgr->elimination_task_.run_retire_sess_task();
ASSERT_EQ(0, mgr->pool_->key_node_map_.size());
ASSERT_EQ(1, mgr->pool_->retired_nodes_.size_);
mgr->elimination_task_.run_recycle_retired_sess_task();
ASSERT_EQ(0, mgr->pool_->key_node_map_.size());
ASSERT_EQ(0, mgr->pool_->retired_nodes_.size_);
} }
int main(int argc, char **argv) int main(int argc, char **argv)