split memory of tenant connections resource and fix ref count leak

This commit is contained in:
sdc
2024-03-21 09:56:19 +00:00
committed by ob-robot
parent 880ee60e2a
commit 2365f7b369
3 changed files with 87 additions and 58 deletions

View File

@ -1810,6 +1810,12 @@ int ObMultiTenant::remove_tenant(const uint64_t tenant_id, bool &remove_tenant_s
LOG_WARN("failed to clean dblink connection", K(ret), K(tenant_id));
}
}
if (OB_SUCC(ret)) {
if (OB_NOT_NULL(GCTX.conn_res_mgr_)
&& OB_FAIL(GCTX.conn_res_mgr_->erase_tenant_conn_res_map(tenant_id))) {
LOG_WARN("erase tenant conn res map failed", K(ret));
}
}
return ret;
}

View File

@ -44,9 +44,8 @@ void ObConnectResAlloc::free_value(ObConnectResource* tz_info)
ObConnectResHashNode* ObConnectResAlloc::alloc_node(ObConnectResource* value)
{
UNUSED(value);
ObMemAttr attr(OB_SERVER_TENANT_ID, MEMORY_LABEL);
SET_USE_500(attr);
int64_t tenant_id = OB_ISNULL(value) ? OB_SERVER_TENANT_ID : value->tenant_id_;
ObMemAttr attr(tenant_id, MEMORY_LABEL);
return OB_NEW(ObConnectResHashNode, attr);
}
@ -96,37 +95,36 @@ int ObConnectResourceMgr::apply_for_tenant_conn_resource(const uint64_t tenant_i
int ret = OB_SUCCESS;
ObConnectResource *tenant_res = NULL;
ObTenantUserKey tenant_key(tenant_id, 0);
bool has_insert = false;
if (OB_FAIL(tenant_res_map_.get(tenant_key, tenant_res))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
// not exist, alloc and insert
ObMemAttr attr(OB_SERVER_TENANT_ID, MEMORY_LABEL);
SET_USE_500(attr);
ObMemAttr attr(tenant_id, MEMORY_LABEL);
if (OB_ISNULL(tenant_res = OB_NEW(ObConnectResource, attr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate tenant resource failed", K(ret));
} else {
tenant_res->cur_connections_ = 1;
tenant_res->cur_connections_ = 0;
tenant_res->tenant_id_ = tenant_id;
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(tenant_res_map_.insert_and_get(tenant_key, tenant_res))) {
LOG_WARN("insert and get failed", K(ret));
OB_DELETE(ObConnectResource, MEMORY_LABEL, tenant_res);
// tenant resouce already exist because of concurrent insert, just get it.
if (OB_FAIL(tenant_res_map_.get(tenant_key, tenant_res))) {
tenant_res = NULL;
// 1. tenant resouce already exist because of concurrent insert, just get it.
// 2. may also fail because of oom.
if (OB_ENTRY_EXIST == ret && OB_FAIL(tenant_res_map_.get(tenant_key, tenant_res))) {
// may happen with very very little probability: insert failed and then tenant is dropped
// and value in the map is deleted by periodly task.
LOG_WARN("tenant not exists", K(ret));
LOG_WARN("get tenant conn res map failed", K(ret), K(tenant_id));
}
} else {
has_insert = true;
tenant_res_map_.revert(tenant_res);
}
} else {
LOG_WARN("get tenant resource failed", K(ret));
}
}
if (OB_SUCC(ret) && !has_insert) {
if (OB_SUCC(ret)) {
if (OB_ISNULL(tenant_res)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tenant resource is null", K(ret));
@ -143,9 +141,11 @@ int ObConnectResourceMgr::apply_for_tenant_conn_resource(const uint64_t tenant_i
LOG_WARN("too many connections", K(ret), K(tenant_res->cur_connections_),
K(max_tenant_connections));
}
tenant_res_map_.revert(tenant_res);
}
}
if (OB_NOT_NULL(tenant_res)) {
tenant_res_map_.revert(tenant_res);
}
return ret;
}
@ -164,6 +164,7 @@ void ObConnectResourceMgr::release_tenant_conn_resource(const uint64_t tenant_id
} else {
tenant_res->cur_connections_--;
}
tenant_res_map_.revert(tenant_res);
}
}
@ -202,38 +203,37 @@ int ObConnectResourceMgr::get_or_insert_user_resource(const uint64_t tenant_id,
const uint64_t user_id,
const uint64_t max_user_connections,
const uint64_t max_connections_per_hour,
ObConnectResource *&user_res, bool &has_insert, bool &user_conn_increased)
ObConnectResource *&user_res)
{
int ret = OB_SUCCESS;
user_res = NULL;
ObTenantUserKey user_key(tenant_id, user_id);
has_insert = false;
if (OB_FAIL(user_res_map_.get(user_key, user_res))) {
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_SUCCESS;
ObMemAttr attr(OB_SERVER_TENANT_ID, MEMORY_LABEL);
SET_USE_500(attr);
// not exist, alloc and insert
ObMemAttr attr(tenant_id, MEMORY_LABEL);
if (OB_ISNULL(user_res = OB_NEW(ObConnectResource, attr))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate user resource failed", K(ret));
} else {
user_res->cur_connections_ = 0 == max_user_connections ? 0 : 1;
user_res->history_connections_ = 0 == max_connections_per_hour ? 0 : 1;
user_res->start_time_ = 0 == max_connections_per_hour ? 0 : ObTimeUtility::current_time();
user_res->cur_connections_ = 0;
user_res->history_connections_ = 0;
user_res->start_time_ = 0;
user_res->tenant_id_ = tenant_id;
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(user_res_map_.insert_and_get(user_key, user_res))) {
// user resouce already exist because of concurrent insert, just get it.
if (OB_FAIL(user_res_map_.get(user_key, user_res))) {
LOG_WARN("insert and get failed", K(ret));
OB_DELETE(ObConnectResource, MEMORY_LABEL, user_res);
user_res = NULL;
// 1. user resouce already exist because of concurrent insert, just get it.
// 2. may also fail because of oom.
if (OB_ENTRY_EXIST == ret && OB_FAIL(user_res_map_.get(user_key, user_res))) {
// may happen with very very little probability: insert failed and then user is dropped
// and value in the map is deleted by periodly task.
LOG_WARN("user not exists", K(ret));
}
} else {
has_insert = true;
user_conn_increased = max_user_connections != 0;
user_res_map_.revert(user_res);
}
} else {
LOG_WARN("get user resource failed", K(ret));
@ -283,9 +283,6 @@ int ObConnectResourceMgr::increase_user_connections_count(
user_conn_increased = 0 != max_user_connections;
}
}
if (OB_SUCC(ret)) {
user_res_map_.revert(user_res);
}
return ret;
}
@ -325,23 +322,22 @@ int ObConnectResourceMgr::on_user_connect(
// only increase cur_connections_ if max_user_connections is not zero
// only record history_connections_ if max_connections_per_hour is not zero.
ObConnectResource *user_res = NULL;
bool has_insert = false;
bool user_conn_increased = false;
if (OB_FAIL(get_or_insert_user_resource(tenant_id, user_id, max_user_connections,
max_connections_per_hour, user_res,
has_insert, user_conn_increased))) {
max_connections_per_hour, user_res))) {
LOG_WARN("get or insert user resource failed", K(ret));
} else if (!has_insert) {
// if user resource already exists in the hash map, increase its connections count.
if (OB_FAIL(increase_user_connections_count(max_user_connections, max_connections_per_hour,
} else if (OB_FAIL(increase_user_connections_count(max_user_connections, max_connections_per_hour,
user_name, user_res, user_conn_increased))) {
LOG_WARN("increase user connection count failed", K(ret));
}
}
if (user_conn_increased) {
session.set_got_user_conn_res(true);
session.set_conn_res_user_id(user_id);
}
if (OB_NOT_NULL(user_res)) {
user_res_map_.revert(user_res);
user_res = NULL;
}
}
} else {
if (!session.has_got_tenant_conn_res()) {
@ -399,6 +395,26 @@ int ObConnectResourceMgr::on_user_disconnect(ObSQLSessionInfo &session)
return ret;
}
int ObConnectResourceMgr::erase_tenant_conn_res_map(int64_t tenant_id)
{
int ret = OB_SUCCESS;
EraseTenantMapFunc func(tenant_id);
int64_t erase_user_cnt = 0;
int64_t erase_tenant_cnt = 0;
if (OB_FAIL(user_res_map_.remove_if(func))) {
LOG_WARN("remove_if failed", K(ret), K(tenant_id));
} else if (FALSE_IT(erase_user_cnt = func.erase_cnt_)) {
} else if (OB_FAIL(tenant_res_map_.remove_if(func))) {
LOG_WARN("remove_if failed", K(ret), K(tenant_id));
} else {
erase_tenant_cnt = func.erase_cnt_ - erase_user_cnt;
user_res_map_.purge();
tenant_res_map_.purge();
}
LOG_INFO("erase tenant conn result map", K(tenant_id), K(erase_tenant_cnt), K(erase_user_cnt));
return ret;
}
bool ObConnectResourceMgr::CleanUpConnResourceFunc::operator() (
ObTenantUserKey key, ObConnectResource *conn_res)
{
@ -406,7 +422,7 @@ bool ObConnectResourceMgr::CleanUpConnResourceFunc::operator() (
if (OB_ISNULL(conn_res)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("user res is NULL", K(ret), K(conn_res));
} else if (is_user_) {
} else {
const ObUserInfo *user_info = NULL;
if (OB_FAIL(schema_guard_.get_user_info(key.tenant_id_, key.user_id_, user_info))) {
if (OB_TENANT_NOT_EXIST != ret) {
@ -418,13 +434,6 @@ bool ObConnectResourceMgr::CleanUpConnResourceFunc::operator() (
} else if (OB_ISNULL(user_info)) {
conn_res_map_.del(key);
}
} else {
const ObTenantSchema *tenant_schema = NULL;
if (OB_FAIL(schema_guard_.get_tenant_info(key.tenant_id_, tenant_schema))) {
LOG_ERROR("get tenant info failed", K(ret), K(key.tenant_id_));
} else if (OB_ISNULL(tenant_schema)) {
conn_res_map_.del(key);
}
}
return OB_SUCCESS == ret;
}
@ -444,12 +453,9 @@ void ObConnectResourceMgr::ConnResourceCleanUpTask::runTimerTask()
} else {
LOG_INFO("clean up connection resource", K(schema_guard.get_tenant_id()),
K(conn_res_mgr_.user_res_map_.size()), K(conn_res_mgr_.tenant_res_map_.size()));
CleanUpConnResourceFunc user_func(schema_guard, conn_res_mgr_.user_res_map_, true);
CleanUpConnResourceFunc tenant_func(schema_guard, conn_res_mgr_.tenant_res_map_, false);
CleanUpConnResourceFunc user_func(schema_guard, conn_res_mgr_.user_res_map_);
if (OB_FAIL(conn_res_mgr_.user_res_map_.for_each(user_func))) {
LOG_WARN("cleanup dropped user failed", K(ret));
} else if (OB_FAIL(conn_res_mgr_.tenant_res_map_.for_each(tenant_func))) {
LOG_WARN("cleanup dropped tenant failed", K(ret));
}
}
const int64_t delay = SLEEP_USECONDS;

View File

@ -72,13 +72,16 @@ typedef common::LinkHashValue<ObTenantUserKey> ObConnectResHashValue;
class ObConnectResource : public ObConnectResHashValue {
public:
ObConnectResource()
: rwlock_(), cur_connections_(0), history_connections_(0), start_time_(0)
: rwlock_(), cur_connections_(0), history_connections_(0), start_time_(0),
tenant_id_(OB_SERVER_TENANT_ID)
{
}
ObConnectResource(uint64_t cur_connections, uint64_t history_connections, int64_t cur_time)
ObConnectResource(uint64_t cur_connections, uint64_t history_connections, int64_t cur_time,
int64_t tenant_id)
: rwlock_(), cur_connections_(cur_connections),
history_connections_(history_connections),
start_time_(cur_time)
start_time_(cur_time),
tenant_id_(tenant_id)
{
}
virtual ~ObConnectResource()
@ -96,6 +99,7 @@ public:
// number of connections from this time, and don't have to record 1:10 or 1:20.
int64_t start_time_;
// TODO: count of update and query in one hour.
int64_t tenant_id_;
};
class ObConnectResAlloc {
@ -130,7 +134,7 @@ public:
const uint64_t user_id,
const uint64_t max_user_connections,
const uint64_t max_connections_per_hour,
ObConnectResource *&user_res, bool &has_insert, bool &user_conn_increased);
ObConnectResource *&user_res);
int increase_user_connections_count(
const uint64_t max_user_connections,
const uint64_t max_connections_per_hour,
@ -146,19 +150,32 @@ public:
const uint64_t max_global_connections,
ObSQLSessionInfo& session);
int on_user_disconnect(ObSQLSessionInfo &session);
int erase_tenant_conn_res_map(int64_t tenant_id);
private:
struct EraseTenantMapFunc
{
EraseTenantMapFunc(int64_t tenant_id)
: tenant_id_(tenant_id), erase_cnt_(0) {}
~EraseTenantMapFunc() {}
bool operator()(const ObTenantUserKey &key, const ObConnectResource *value) {
bool res = key.tenant_id_ == tenant_id_;
erase_cnt_ += res ? 1 : 0;
return res;
}
int64_t tenant_id_;
int64_t erase_cnt_;
};
class CleanUpConnResourceFunc
{
public:
CleanUpConnResourceFunc(share::schema::ObSchemaGetterGuard &schema_guard,
ObConnResMap &conn_res_map, const bool is_user)
: schema_guard_(schema_guard), conn_res_map_(conn_res_map), is_user_(is_user)
ObConnResMap &conn_res_map)
: schema_guard_(schema_guard), conn_res_map_(conn_res_map)
{}
bool operator() (ObTenantUserKey key, ObConnectResource *user_res);
private:
share::schema::ObSchemaGetterGuard &schema_guard_;
ObConnResMap &conn_res_map_;
const bool is_user_;
};
class ConnResourceCleanUpTask : public common::ObTimerTask
{