[CP] add MTL_WAIT for tenant_session_mgr
This commit is contained in:
@ -552,7 +552,7 @@ int ObMultiTenant::init(ObAddr myaddr,
|
|||||||
MTL_BIND2(server_obj_pool_mtl_new<ObTableScanIterator>, nullptr, nullptr, nullptr, nullptr, server_obj_pool_mtl_destroy<ObTableScanIterator>);
|
MTL_BIND2(server_obj_pool_mtl_new<ObTableScanIterator>, nullptr, nullptr, nullptr, nullptr, server_obj_pool_mtl_destroy<ObTableScanIterator>);
|
||||||
MTL_BIND2(mtl_new_default, ObTenantDirectLoadMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
|
MTL_BIND2(mtl_new_default, ObTenantDirectLoadMgr::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
|
||||||
MTL_BIND2(ObDetectManager::mtl_new, ObDetectManager::mtl_init, nullptr, nullptr, nullptr, ObDetectManager::mtl_destroy);
|
MTL_BIND2(ObDetectManager::mtl_new, ObDetectManager::mtl_init, nullptr, nullptr, nullptr, ObDetectManager::mtl_destroy);
|
||||||
MTL_BIND2(ObTenantSQLSessionMgr::mtl_new, ObTenantSQLSessionMgr::mtl_init, nullptr, nullptr, nullptr, ObTenantSQLSessionMgr::mtl_destroy);
|
MTL_BIND2(ObTenantSQLSessionMgr::mtl_new, ObTenantSQLSessionMgr::mtl_init, nullptr, nullptr, ObTenantSQLSessionMgr::mtl_wait, ObTenantSQLSessionMgr::mtl_destroy);
|
||||||
MTL_BIND2(mtl_new_default, ObDTLIntermResultManager::mtl_init, ObDTLIntermResultManager::mtl_start,
|
MTL_BIND2(mtl_new_default, ObDTLIntermResultManager::mtl_init, ObDTLIntermResultManager::mtl_start,
|
||||||
ObDTLIntermResultManager::mtl_stop, ObDTLIntermResultManager::mtl_wait, ObDTLIntermResultManager::mtl_destroy);
|
ObDTLIntermResultManager::mtl_stop, ObDTLIntermResultManager::mtl_wait, ObDTLIntermResultManager::mtl_destroy);
|
||||||
if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread) {
|
if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread) {
|
||||||
|
|||||||
@ -96,7 +96,7 @@ int64_t ObTenantSQLSessionMgr::SessionPool::count() const
|
|||||||
}
|
}
|
||||||
|
|
||||||
ObTenantSQLSessionMgr::ObTenantSQLSessionMgr(const int64_t tenant_id)
|
ObTenantSQLSessionMgr::ObTenantSQLSessionMgr(const int64_t tenant_id)
|
||||||
: tenant_id_(tenant_id),
|
: tenant_id_(tenant_id), count_(0),
|
||||||
session_allocator_(lib::ObMemAttr(tenant_id, "SQLSessionInfo"), MTL_CPU_COUNT(), 4)
|
session_allocator_(lib::ObMemAttr(tenant_id, "SQLSessionInfo"), MTL_CPU_COUNT(), 4)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
@ -137,6 +137,16 @@ int ObTenantSQLSessionMgr::mtl_init(ObTenantSQLSessionMgr *&t_session_mgr)
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ObTenantSQLSessionMgr::mtl_wait(ObTenantSQLSessionMgr *&t_session_mgr)
|
||||||
|
{
|
||||||
|
while (t_session_mgr->count() != 0) {
|
||||||
|
LOG_WARN_RET(OB_NEED_RETRY, "tenant session mgr should be empty",
|
||||||
|
K(t_session_mgr->count()));
|
||||||
|
usleep(1000 * 1000);
|
||||||
|
}
|
||||||
|
LOG_INFO("success to wait tenant session mgr");
|
||||||
|
}
|
||||||
|
|
||||||
void ObTenantSQLSessionMgr::mtl_destroy(ObTenantSQLSessionMgr *&t_session_mgr)
|
void ObTenantSQLSessionMgr::mtl_destroy(ObTenantSQLSessionMgr *&t_session_mgr)
|
||||||
{
|
{
|
||||||
if (nullptr != t_session_mgr) {
|
if (nullptr != t_session_mgr) {
|
||||||
@ -155,6 +165,7 @@ ObSQLSessionInfo *ObTenantSQLSessionMgr::alloc_session()
|
|||||||
OX (session = op_instance_alloc_args(&session_allocator_,
|
OX (session = op_instance_alloc_args(&session_allocator_,
|
||||||
ObSQLSessionInfo,
|
ObSQLSessionInfo,
|
||||||
tenant_id_));
|
tenant_id_));
|
||||||
|
OX (ATOMIC_FAA(&count_, 1));
|
||||||
}
|
}
|
||||||
OV (OB_NOT_NULL(session));
|
OV (OB_NOT_NULL(session));
|
||||||
OX (session->set_tenant_session_mgr(this));
|
OX (session->set_tenant_session_mgr(this));
|
||||||
@ -183,6 +194,7 @@ void ObTenantSQLSessionMgr::free_session(ObSQLSessionInfo *session)
|
|||||||
}
|
}
|
||||||
if (OB_NOT_NULL(session)) {
|
if (OB_NOT_NULL(session)) {
|
||||||
OX (op_free(session));
|
OX (op_free(session));
|
||||||
|
OX (ATOMIC_FAA(&count_, -1));
|
||||||
OX (session = NULL);
|
OX (session = NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -201,6 +213,7 @@ void ObTenantSQLSessionMgr::clean_session_pool()
|
|||||||
OX (session_pool_.pop_session(session));
|
OX (session_pool_.pop_session(session));
|
||||||
if (OB_NOT_NULL(session)) {
|
if (OB_NOT_NULL(session)) {
|
||||||
OX (op_free(session));
|
OX (op_free(session));
|
||||||
|
OX (ATOMIC_FAA(&count_, -1));
|
||||||
OX (session = NULL);
|
OX (session = NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -378,10 +378,12 @@ public:
|
|||||||
void destroy();
|
void destroy();
|
||||||
static int mtl_new(ObTenantSQLSessionMgr *&tenant_session_mgr);
|
static int mtl_new(ObTenantSQLSessionMgr *&tenant_session_mgr);
|
||||||
static int mtl_init(ObTenantSQLSessionMgr *&tenant_session_mgr);
|
static int mtl_init(ObTenantSQLSessionMgr *&tenant_session_mgr);
|
||||||
|
static void mtl_wait(ObTenantSQLSessionMgr *&tenant_session_mgr);
|
||||||
static void mtl_destroy(ObTenantSQLSessionMgr *&tenant_session_mgr);
|
static void mtl_destroy(ObTenantSQLSessionMgr *&tenant_session_mgr);
|
||||||
ObSQLSessionInfo *alloc_session();
|
ObSQLSessionInfo *alloc_session();
|
||||||
void free_session(ObSQLSessionInfo *session);
|
void free_session(ObSQLSessionInfo *session);
|
||||||
void clean_session_pool();
|
void clean_session_pool();
|
||||||
|
int64_t count() const { return ATOMIC_LOAD(&count_); }
|
||||||
private:
|
private:
|
||||||
class SessionPool
|
class SessionPool
|
||||||
{
|
{
|
||||||
@ -404,6 +406,7 @@ private:
|
|||||||
private:
|
private:
|
||||||
const int64_t tenant_id_;
|
const int64_t tenant_id_;
|
||||||
SessionPool session_pool_;
|
SessionPool session_pool_;
|
||||||
|
int64_t count_;
|
||||||
ObFixedClassAllocator<ObSQLSessionInfo> session_allocator_;
|
ObFixedClassAllocator<ObSQLSessionInfo> session_allocator_;
|
||||||
DISALLOW_COPY_AND_ASSIGN(ObTenantSQLSessionMgr);
|
DISALLOW_COPY_AND_ASSIGN(ObTenantSQLSessionMgr);
|
||||||
}; // end of class ObSQLSessionMgr
|
}; // end of class ObSQLSessionMgr
|
||||||
|
|||||||
Reference in New Issue
Block a user