diff --git a/src/observer/omt/ob_multi_tenant.cpp b/src/observer/omt/ob_multi_tenant.cpp index 5b2a4f69c7..241e3a9ddc 100644 --- a/src/observer/omt/ob_multi_tenant.cpp +++ b/src/observer/omt/ob_multi_tenant.cpp @@ -1810,6 +1810,12 @@ int ObMultiTenant::remove_tenant(const uint64_t tenant_id, bool &remove_tenant_s LOG_WARN("failed to clean dblink connection", K(ret), K(tenant_id)); } } + if (OB_SUCC(ret)) { + if (OB_NOT_NULL(GCTX.conn_res_mgr_) + && OB_FAIL(GCTX.conn_res_mgr_->erase_tenant_conn_res_map(tenant_id))) { + LOG_WARN("erase tenant conn res map failed", K(ret)); + } + } return ret; } diff --git a/src/sql/session/ob_user_resource_mgr.cpp b/src/sql/session/ob_user_resource_mgr.cpp index 3498480e93..86789f014e 100644 --- a/src/sql/session/ob_user_resource_mgr.cpp +++ b/src/sql/session/ob_user_resource_mgr.cpp @@ -44,9 +44,8 @@ void ObConnectResAlloc::free_value(ObConnectResource* tz_info) ObConnectResHashNode* ObConnectResAlloc::alloc_node(ObConnectResource* value) { - UNUSED(value); - ObMemAttr attr(OB_SERVER_TENANT_ID, MEMORY_LABEL); - SET_USE_500(attr); + int64_t tenant_id = OB_ISNULL(value) ? OB_SERVER_TENANT_ID : value->tenant_id_; + ObMemAttr attr(tenant_id, MEMORY_LABEL); return OB_NEW(ObConnectResHashNode, attr); } @@ -96,37 +95,36 @@ int ObConnectResourceMgr::apply_for_tenant_conn_resource(const uint64_t tenant_i int ret = OB_SUCCESS; ObConnectResource *tenant_res = NULL; ObTenantUserKey tenant_key(tenant_id, 0); - bool has_insert = false; if (OB_FAIL(tenant_res_map_.get(tenant_key, tenant_res))) { if (OB_ENTRY_NOT_EXIST == ret) { ret = OB_SUCCESS; // not exist, alloc and insert - ObMemAttr attr(OB_SERVER_TENANT_ID, MEMORY_LABEL); - SET_USE_500(attr); + ObMemAttr attr(tenant_id, MEMORY_LABEL); if (OB_ISNULL(tenant_res = OB_NEW(ObConnectResource, attr))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate tenant resource failed", K(ret)); } else { - tenant_res->cur_connections_ = 1; + tenant_res->cur_connections_ = 0; + tenant_res->tenant_id_ = tenant_id; } if (OB_FAIL(ret)) { } else if (OB_FAIL(tenant_res_map_.insert_and_get(tenant_key, tenant_res))) { + LOG_WARN("insert and get failed", K(ret)); OB_DELETE(ObConnectResource, MEMORY_LABEL, tenant_res); - // tenant resouce already exist because of concurrent insert, just get it. - if (OB_FAIL(tenant_res_map_.get(tenant_key, tenant_res))) { + tenant_res = NULL; + // 1. tenant resouce already exist because of concurrent insert, just get it. + // 2. may also fail because of oom. + if (OB_ENTRY_EXIST == ret && OB_FAIL(tenant_res_map_.get(tenant_key, tenant_res))) { // may happen with very very little probability: insert failed and then tenant is dropped // and value in the map is deleted by periodly task. - LOG_WARN("tenant not exists", K(ret)); + LOG_WARN("get tenant conn res map failed", K(ret), K(tenant_id)); } - } else { - has_insert = true; - tenant_res_map_.revert(tenant_res); } } else { LOG_WARN("get tenant resource failed", K(ret)); } } - if (OB_SUCC(ret) && !has_insert) { + if (OB_SUCC(ret)) { if (OB_ISNULL(tenant_res)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tenant resource is null", K(ret)); @@ -143,9 +141,11 @@ int ObConnectResourceMgr::apply_for_tenant_conn_resource(const uint64_t tenant_i LOG_WARN("too many connections", K(ret), K(tenant_res->cur_connections_), K(max_tenant_connections)); } - tenant_res_map_.revert(tenant_res); } } + if (OB_NOT_NULL(tenant_res)) { + tenant_res_map_.revert(tenant_res); + } return ret; } @@ -164,6 +164,7 @@ void ObConnectResourceMgr::release_tenant_conn_resource(const uint64_t tenant_id } else { tenant_res->cur_connections_--; } + tenant_res_map_.revert(tenant_res); } } @@ -202,38 +203,37 @@ int ObConnectResourceMgr::get_or_insert_user_resource(const uint64_t tenant_id, const uint64_t user_id, const uint64_t max_user_connections, const uint64_t max_connections_per_hour, - ObConnectResource *&user_res, bool &has_insert, bool &user_conn_increased) + ObConnectResource *&user_res) { int ret = OB_SUCCESS; user_res = NULL; ObTenantUserKey user_key(tenant_id, user_id); - has_insert = false; if (OB_FAIL(user_res_map_.get(user_key, user_res))) { if (OB_ENTRY_NOT_EXIST == ret) { ret = OB_SUCCESS; - ObMemAttr attr(OB_SERVER_TENANT_ID, MEMORY_LABEL); - SET_USE_500(attr); // not exist, alloc and insert + ObMemAttr attr(tenant_id, MEMORY_LABEL); if (OB_ISNULL(user_res = OB_NEW(ObConnectResource, attr))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate user resource failed", K(ret)); } else { - user_res->cur_connections_ = 0 == max_user_connections ? 0 : 1; - user_res->history_connections_ = 0 == max_connections_per_hour ? 0 : 1; - user_res->start_time_ = 0 == max_connections_per_hour ? 0 : ObTimeUtility::current_time(); + user_res->cur_connections_ = 0; + user_res->history_connections_ = 0; + user_res->start_time_ = 0; + user_res->tenant_id_ = tenant_id; } if (OB_FAIL(ret)) { } else if (OB_FAIL(user_res_map_.insert_and_get(user_key, user_res))) { - // user resouce already exist because of concurrent insert, just get it. - if (OB_FAIL(user_res_map_.get(user_key, user_res))) { + LOG_WARN("insert and get failed", K(ret)); + OB_DELETE(ObConnectResource, MEMORY_LABEL, user_res); + user_res = NULL; + // 1. user resouce already exist because of concurrent insert, just get it. + // 2. may also fail because of oom. + if (OB_ENTRY_EXIST == ret && OB_FAIL(user_res_map_.get(user_key, user_res))) { // may happen with very very little probability: insert failed and then user is dropped // and value in the map is deleted by periodly task. LOG_WARN("user not exists", K(ret)); } - } else { - has_insert = true; - user_conn_increased = max_user_connections != 0; - user_res_map_.revert(user_res); } } else { LOG_WARN("get user resource failed", K(ret)); @@ -283,9 +283,6 @@ int ObConnectResourceMgr::increase_user_connections_count( user_conn_increased = 0 != max_user_connections; } } - if (OB_SUCC(ret)) { - user_res_map_.revert(user_res); - } return ret; } @@ -325,23 +322,22 @@ int ObConnectResourceMgr::on_user_connect( // only increase cur_connections_ if max_user_connections is not zero // only record history_connections_ if max_connections_per_hour is not zero. ObConnectResource *user_res = NULL; - bool has_insert = false; bool user_conn_increased = false; if (OB_FAIL(get_or_insert_user_resource(tenant_id, user_id, max_user_connections, - max_connections_per_hour, user_res, - has_insert, user_conn_increased))) { + max_connections_per_hour, user_res))) { LOG_WARN("get or insert user resource failed", K(ret)); - } else if (!has_insert) { - // if user resource already exists in the hash map, increase its connections count. - if (OB_FAIL(increase_user_connections_count(max_user_connections, max_connections_per_hour, + } else if (OB_FAIL(increase_user_connections_count(max_user_connections, max_connections_per_hour, user_name, user_res, user_conn_increased))) { - LOG_WARN("increase user connection count failed", K(ret)); - } + LOG_WARN("increase user connection count failed", K(ret)); } if (user_conn_increased) { session.set_got_user_conn_res(true); session.set_conn_res_user_id(user_id); } + if (OB_NOT_NULL(user_res)) { + user_res_map_.revert(user_res); + user_res = NULL; + } } } else { if (!session.has_got_tenant_conn_res()) { @@ -399,6 +395,26 @@ int ObConnectResourceMgr::on_user_disconnect(ObSQLSessionInfo &session) return ret; } +int ObConnectResourceMgr::erase_tenant_conn_res_map(int64_t tenant_id) +{ + int ret = OB_SUCCESS; + EraseTenantMapFunc func(tenant_id); + int64_t erase_user_cnt = 0; + int64_t erase_tenant_cnt = 0; + if (OB_FAIL(user_res_map_.remove_if(func))) { + LOG_WARN("remove_if failed", K(ret), K(tenant_id)); + } else if (FALSE_IT(erase_user_cnt = func.erase_cnt_)) { + } else if (OB_FAIL(tenant_res_map_.remove_if(func))) { + LOG_WARN("remove_if failed", K(ret), K(tenant_id)); + } else { + erase_tenant_cnt = func.erase_cnt_ - erase_user_cnt; + user_res_map_.purge(); + tenant_res_map_.purge(); + } + LOG_INFO("erase tenant conn result map", K(tenant_id), K(erase_tenant_cnt), K(erase_user_cnt)); + return ret; +} + bool ObConnectResourceMgr::CleanUpConnResourceFunc::operator() ( ObTenantUserKey key, ObConnectResource *conn_res) { @@ -406,7 +422,7 @@ bool ObConnectResourceMgr::CleanUpConnResourceFunc::operator() ( if (OB_ISNULL(conn_res)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("user res is NULL", K(ret), K(conn_res)); - } else if (is_user_) { + } else { const ObUserInfo *user_info = NULL; if (OB_FAIL(schema_guard_.get_user_info(key.tenant_id_, key.user_id_, user_info))) { if (OB_TENANT_NOT_EXIST != ret) { @@ -418,13 +434,6 @@ bool ObConnectResourceMgr::CleanUpConnResourceFunc::operator() ( } else if (OB_ISNULL(user_info)) { conn_res_map_.del(key); } - } else { - const ObTenantSchema *tenant_schema = NULL; - if (OB_FAIL(schema_guard_.get_tenant_info(key.tenant_id_, tenant_schema))) { - LOG_ERROR("get tenant info failed", K(ret), K(key.tenant_id_)); - } else if (OB_ISNULL(tenant_schema)) { - conn_res_map_.del(key); - } } return OB_SUCCESS == ret; } @@ -444,12 +453,9 @@ void ObConnectResourceMgr::ConnResourceCleanUpTask::runTimerTask() } else { LOG_INFO("clean up connection resource", K(schema_guard.get_tenant_id()), K(conn_res_mgr_.user_res_map_.size()), K(conn_res_mgr_.tenant_res_map_.size())); - CleanUpConnResourceFunc user_func(schema_guard, conn_res_mgr_.user_res_map_, true); - CleanUpConnResourceFunc tenant_func(schema_guard, conn_res_mgr_.tenant_res_map_, false); + CleanUpConnResourceFunc user_func(schema_guard, conn_res_mgr_.user_res_map_); if (OB_FAIL(conn_res_mgr_.user_res_map_.for_each(user_func))) { LOG_WARN("cleanup dropped user failed", K(ret)); - } else if (OB_FAIL(conn_res_mgr_.tenant_res_map_.for_each(tenant_func))) { - LOG_WARN("cleanup dropped tenant failed", K(ret)); } } const int64_t delay = SLEEP_USECONDS; diff --git a/src/sql/session/ob_user_resource_mgr.h b/src/sql/session/ob_user_resource_mgr.h index b3fe411c02..9e34139a2a 100644 --- a/src/sql/session/ob_user_resource_mgr.h +++ b/src/sql/session/ob_user_resource_mgr.h @@ -72,13 +72,16 @@ typedef common::LinkHashValue ObConnectResHashValue; class ObConnectResource : public ObConnectResHashValue { public: ObConnectResource() - : rwlock_(), cur_connections_(0), history_connections_(0), start_time_(0) + : rwlock_(), cur_connections_(0), history_connections_(0), start_time_(0), + tenant_id_(OB_SERVER_TENANT_ID) { } - ObConnectResource(uint64_t cur_connections, uint64_t history_connections, int64_t cur_time) + ObConnectResource(uint64_t cur_connections, uint64_t history_connections, int64_t cur_time, + int64_t tenant_id) : rwlock_(), cur_connections_(cur_connections), history_connections_(history_connections), - start_time_(cur_time) + start_time_(cur_time), + tenant_id_(tenant_id) { } virtual ~ObConnectResource() @@ -96,6 +99,7 @@ public: // number of connections from this time, and don't have to record 1:10 or 1:20. int64_t start_time_; // TODO: count of update and query in one hour. + int64_t tenant_id_; }; class ObConnectResAlloc { @@ -130,7 +134,7 @@ public: const uint64_t user_id, const uint64_t max_user_connections, const uint64_t max_connections_per_hour, - ObConnectResource *&user_res, bool &has_insert, bool &user_conn_increased); + ObConnectResource *&user_res); int increase_user_connections_count( const uint64_t max_user_connections, const uint64_t max_connections_per_hour, @@ -146,19 +150,32 @@ public: const uint64_t max_global_connections, ObSQLSessionInfo& session); int on_user_disconnect(ObSQLSessionInfo &session); + int erase_tenant_conn_res_map(int64_t tenant_id); private: + struct EraseTenantMapFunc + { + EraseTenantMapFunc(int64_t tenant_id) + : tenant_id_(tenant_id), erase_cnt_(0) {} + ~EraseTenantMapFunc() {} + bool operator()(const ObTenantUserKey &key, const ObConnectResource *value) { + bool res = key.tenant_id_ == tenant_id_; + erase_cnt_ += res ? 1 : 0; + return res; + } + int64_t tenant_id_; + int64_t erase_cnt_; + }; class CleanUpConnResourceFunc { public: CleanUpConnResourceFunc(share::schema::ObSchemaGetterGuard &schema_guard, - ObConnResMap &conn_res_map, const bool is_user) - : schema_guard_(schema_guard), conn_res_map_(conn_res_map), is_user_(is_user) + ObConnResMap &conn_res_map) + : schema_guard_(schema_guard), conn_res_map_(conn_res_map) {} bool operator() (ObTenantUserKey key, ObConnectResource *user_res); private: share::schema::ObSchemaGetterGuard &schema_guard_; ObConnResMap &conn_res_map_; - const bool is_user_; }; class ConnResourceCleanUpTask : public common::ObTimerTask {