[CP] fix mysqlqueue leak

This commit is contained in:
zhjc1124
2023-04-27 12:41:59 +00:00
committed by ob-robot
parent f745b47cbc
commit 0b2493a5cf
4 changed files with 112 additions and 63 deletions

View File

@ -33,6 +33,7 @@ using namespace oceanbase::lib;
ObReqQueue::ObReqQueue(int queue_capacity)
: wait_finish_(true),
push_worker_count_(0),
queue_(),
qhandler_(NULL),
host_()
@ -73,6 +74,17 @@ bool ObReqQueue::push(ObRequest *req, int max_queue_len, bool block)
return bret;
}
oceanbase::rpc::ObRequest *ObReqQueue::pop()
{
void *task = NULL;
int64_t timeout = 0;
ObRequest *req = NULL;
if (queue_.size() > 0 && OB_LIKELY(OB_SUCCESS == queue_.pop(task, timeout)) && OB_NOT_NULL(task)) {
req = reinterpret_cast<ObRequest *>(task);
}
return req;
}
void ObReqQueue::set_host(const ObAddr &host)
{
host_ = host;
@ -165,6 +177,7 @@ void ObReqQueue::loop()
if (!wait_finish_) {
LOG_INFO("exiting queue thread without wait finish", K(queue_.size()));
} else {
while(get_push_worker_count() != 0); // wait to push finish
LOG_INFO("exiting queue thread and wait remain finish", K(queue_.size()));
// Process remains if we should wait until all task has been
// processed before exiting this thread. Previous return code

View File

@ -41,7 +41,7 @@ public:
void set_qhandler(ObiReqQHandler *handler);
bool push(ObRequest *req, int max_queue_len, bool block = true);
ObRequest * pop(); // only for dispatch_req of mysql queue. pop req when tenant is stopped.
void set_host(const common::ObAddr &host);
void loop();
@ -50,13 +50,28 @@ public:
return queue_.size();
}
void inc_push_worker_count()
{
ATOMIC_INC(&push_worker_count_);
}
void dec_push_worker_count()
{
ATOMIC_DEC(&push_worker_count_);
}
bool get_push_worker_count()
{
return ATOMIC_LOAD(&push_worker_count_);
}
private:
int process_task(void *task);
DISALLOW_COPY_AND_ASSIGN(ObReqQueue);
protected:
bool wait_finish_;
int push_worker_count_;
common::ObLightyQueue queue_;
ObiReqQHandler *qhandler_;

View File

@ -107,31 +107,42 @@ int extract_tenant_id(ObRequest &req, uint64_t &tenant_id)
return ret;
}
int dispatch_req(ObRequest& req)
int dispatch_req(ObRequest &req, QueueThread *global_mysql_queue)
{
int ret = OB_SUCCESS;
static const int64_t MAX_QUEUE_LEN = 10000;
uint64_t tenant_id = OB_INVALID_ID;
if (OB_FAIL(extract_tenant_id(req, tenant_id))) {
LOG_WARN("extract tenant_id fail", K(ret), K(tenant_id), K(req));
// handle all error by OB_TENANT_NOT_IN_SERVER
ret = OB_TENANT_NOT_IN_SERVER;
} else if (is_meta_tenant(tenant_id)) {
// cannot login meta tenant
ret = OB_ERR_UNEXPECTED;
LOG_WARN("cannot login meta tenant", K(ret), K(tenant_id));
ret = OB_TENANT_NOT_IN_SERVER;
} else if (is_sys_tenant(tenant_id) || is_user_tenant(tenant_id)) {
MTL_SWITCH(tenant_id) {
QueueThread *mysql_queue = MTL(QueueThread *);
if (!mysql_queue->queue_.push(&req,
10000)) { // MAX_QUEUE_LEN = 10000;
ObTenant *tenant = (ObTenant *)MTL_CTX();
mysql_queue->queue_.inc_push_worker_count();
if (OB_ISNULL(tenant)) {
ret = OB_TENANT_NOT_IN_SERVER;
LOG_WARN("tenant is NULL", K(ret), K(tenant_id));
} else if (tenant->has_stopped()) {
ret = OB_TENANT_NOT_IN_SERVER;
LOG_WARN("tenant is stopped", K(ret), K(tenant_id));
} else if (OB_ISNULL(mysql_queue)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("mysql_queue is NULL", K(ret), K(tenant_id));
} else if (!mysql_queue->queue_.push(&req, MAX_QUEUE_LEN)) { // MAX_QUEUE_LEN = 10000;
ret = OB_QUEUE_OVERFLOW;
EVENT_INC(MYSQL_DELIVER_FAIL);
LOG_ERROR("deliver request fail", K(ret), K(tenant_id), K(req));
} else {
LOG_INFO("succeed to dispatch to tenant mysql queue", K(tenant_id));
}
mysql_queue->queue_.dec_push_worker_count();
// print queue length per 10s
if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
LOG_INFO("mysql login queue", K(tenant_id),
K(mysql_queue->queue_.size()));
LOG_INFO("mysql login queue", K(mysql_queue->queue_.size()));
}
// if (0 != MTL(obmysql::ObSqlNioServer *)
@ -144,6 +155,18 @@ int dispatch_req(ObRequest& req)
LOG_WARN("cannot switch to tenant", K(ret), K(tenant_id));
}
}
// failed to dispatch, push to global mysql queue
if (OB_FAIL(ret)) {
if (!global_mysql_queue->queue_.push(&req, MAX_QUEUE_LEN)) {
ret = OB_QUEUE_OVERFLOW;
EVENT_INC(MYSQL_DELIVER_FAIL);
LOG_ERROR("deliver request fail", K(req));
} else {
LOG_INFO("fail to dispatch to tenant, but push to global mysql queue", K(ret));
ret = OB_SUCCESS;
}
}
return ret;
}
@ -499,15 +522,11 @@ int ObSrvDeliver::deliver_mysql_request(ObRequest &req)
LOG_ERROR("deliver request fail", K(req));
}
} else if (OB_NOT_NULL(mysql_queue_)) {
if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread &&
OB_SUCC(dispatch_req(req))) {
// do nothing
} else {
if (OB_TENANT_NOT_IN_SERVER == ret) {
LOG_WARN("fail to dispatch to tenant", K(ret), K(req));
// set OB_SUCCESS to go normal procedure
ret = OB_SUCCESS;
if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread) {
if (OB_FAIL(dispatch_req(req, mysql_queue_))) {
LOG_ERROR("deliver request in dispatch_req fail", K(ret), K(req));
}
} else {
if (OB_SUCC(ret) && !mysql_queue_->queue_.push(&req, MAX_QUEUE_LEN)) {
ret = OB_QUEUE_OVERFLOW;
EVENT_INC(MYSQL_DELIVER_FAIL);

View File

@ -254,13 +254,13 @@ static int start_sql_nio_server(ObSqlNioServer *&sql_nio_server)
int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
ObSrvNetworkFrame *net_frame = GCTX.net_frame_;
sql_nio_server = OB_NEW(obmysql::ObSqlNioServer, "SqlNio",
obmysql::global_sm_conn_callback,
net_frame->get_mysql_handler(), tenant_id);
if (is_sys_tenant(tenant_id) || is_user_tenant(tenant_id)) {
if (NULL == sql_nio_server) {
ret = OB_NOT_INIT;
LOG_ERROR("sql_nio_server init failed", K(ret));
sql_nio_server = OB_NEW(obmysql::ObSqlNioServer, "SqlNio",
obmysql::global_sm_conn_callback,
net_frame->get_mysql_handler(), tenant_id);
if (OB_ISNULL(sql_nio_server)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_ERROR("fail to new sql_nio_server", K(ret));
} else {
int net_thread_count = 0;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
@ -268,10 +268,8 @@ static int start_sql_nio_server(ObSqlNioServer *&sql_nio_server)
net_thread_count = tenant_config->tenant_sql_net_thread_count;
}
if (0 == net_thread_count) {
ObTenant *tenant = NULL;
GCTX.omt_->get_tenant(tenant_id, tenant);
net_thread_count =
NULL == tenant ? 1 : std::max((int)tenant->unit_min_cpu(), 1);
ObTenant *tenant = (ObTenant *)MTL_CTX();
net_thread_count = tenant ? std::max((int)tenant->unit_min_cpu(), 1) : 1;
}
sql_nio_server->get_nio()->set_run_wrapper(MTL_CTX());
if (OB_FAIL(sql_nio_server->start(-1, &net_frame->get_deliver(),
@ -279,7 +277,7 @@ static int start_sql_nio_server(ObSqlNioServer *&sql_nio_server)
LOG_WARN("sql nio server start failed", K(ret));
} else {
LOG_INFO("tenant sql_nio_server mtl_start success", K(ret),
K(tenant_id));
K(tenant_id), K(net_thread_count));
}
}
}
@ -301,41 +299,40 @@ static int server_obj_pool_mtl_new(common::ObServerObjectPool<T> *&pool)
return ret;
}
static int init_mysql_queue(QueueThread *&qthread)
static int start_mysql_queue(QueueThread *&qthread)
{
int ret = OB_SUCCESS;
const uint64_t tenant_id = MTL_ID();
qthread = OB_NEW(QueueThread, ObModIds::OB_RPC, "MysqlQueueTh", tenant_id);
if (is_sys_tenant(tenant_id) || is_user_tenant(tenant_id)) {
qthread = OB_NEW(QueueThread, ObModIds::OB_RPC, "MysqlQueueTh", tenant_id);
if (OB_ISNULL(qthread)) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new qthread", K(ret), K(tenant_id));
} else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::MysqlQueueTh,
qthread->tg_id_))) {
LOG_WARN("mysql queue init failed", K(ret), K(tenant_id),
K(qthread->tg_id_));
} else {
qthread->queue_.set_qhandler(
&GCTX.net_frame_->get_deliver().get_qhandler());
ret = TG_SET_RUNNABLE_AND_START(qthread->tg_id_, qthread->thread_);
}
if (OB_SUCC(ret) && OB_NOT_NULL(qthread)) {
int sql_thread_count = 0;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
if (tenant_config.is_valid()) {
sql_thread_count = tenant_config->tenant_sql_login_thread_count;
qthread->queue_.set_qhandler(&GCTX.net_frame_->get_deliver().get_qhandler());
if (OB_FAIL(TG_SET_RUNNABLE_AND_START(qthread->tg_id_, qthread->thread_))) {
LOG_ERROR("fail to start qthread", K(ret), K(tenant_id), K(qthread->tg_id_));
} else {
int sql_thread_count = 0;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
if (tenant_config.is_valid()) {
sql_thread_count = tenant_config->tenant_sql_login_thread_count;
}
if (0 == sql_thread_count) {
ObTenant *tenant = (ObTenant *)MTL_CTX();
sql_thread_count = tenant ? std::max((int)tenant->unit_min_cpu(), 1) : 1;
}
if (OB_FAIL(qthread->set_thread_count(sql_thread_count))) {
LOG_WARN("fail to set thread count", K(ret), K(tenant_id), K(qthread->tg_id_));
} else {
LOG_INFO("tenant mysql_queue mtl_start success", K(ret),
K(tenant_id), K(qthread->tg_id_), K(sql_thread_count));
}
}
if (0 == sql_thread_count) {
ObTenant *tenant = NULL;
GCTX.omt_->get_tenant(tenant_id, tenant);
sql_thread_count =
NULL == tenant ? 1 : std::max((int)tenant->unit_min_cpu(), 1);
}
qthread->set_thread_count(sql_thread_count);
LOG_INFO("tenant mysql_queue mtl_init success", K(ret), K(tenant_id));
} else {
LOG_WARN("tenant mysql_queue mtl_init fail", K(ret), K(tenant_id),
K(qthread->tg_id_));
}
}
return ret;
@ -448,7 +445,7 @@ int ObMultiTenant::init(ObAddr myaddr,
MTL_BIND2(mtl_new_default, ObPsCache::mtl_init, nullptr, ObPsCache::mtl_stop, nullptr, mtl_destroy_default);
MTL_BIND2(server_obj_pool_mtl_new<ObPartTransCtx>, nullptr, nullptr, nullptr, nullptr, server_obj_pool_mtl_destroy<ObPartTransCtx>);
if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread) {
MTL_BIND2(nullptr, init_mysql_queue, nullptr, mtl_stop_default,
MTL_BIND2(nullptr, nullptr, start_mysql_queue, mtl_stop_default,
mtl_wait_default, mtl_destroy_default);
// MTL_BIND2(nullptr, nullptr, start_sql_nio_server, mtl_stop_default,
// mtl_wait_default, mtl_destroy_default);
@ -2127,7 +2124,6 @@ int ObSrvNetworkFrame::reload_tenant_sql_thread_config(const uint64_t tenant_id)
int ret = OB_SUCCESS;
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id));
ObTenant *tenant = NULL;
GCTX.omt_->get_tenant(tenant_id, tenant);
// reload tenant_sql_login_thread_count
int sql_login_thread_count = 0;
@ -2135,11 +2131,12 @@ int ObSrvNetworkFrame::reload_tenant_sql_thread_config(const uint64_t tenant_id)
sql_login_thread_count = tenant_config->tenant_sql_login_thread_count;
}
if (0 == sql_login_thread_count) {
sql_login_thread_count =
NULL == tenant ? 1 : std::max((int)tenant->unit_min_cpu(), 1);
ObTenant *tenant = (ObTenant *)MTL_CTX();
sql_login_thread_count = tenant ? std::max((int)tenant->unit_min_cpu(), 1) : 1;
}
MTL_SWITCH(tenant_id) {
if (OB_FAIL(MTL(QueueThread *)->set_thread_count(sql_login_thread_count))) {
QueueThread *mysql_queue = MTL(QueueThread *);
if (OB_NOT_NULL(mysql_queue) && mysql_queue->set_thread_count(sql_login_thread_count)) {
LOG_WARN("update tenant_sql_login_thread_count fail", K(ret));
}
}
@ -2167,8 +2164,8 @@ int ObSrvNetworkFrame::reload_tenant_sql_thread_config(const uint64_t tenant_id)
// }
// }
return ret;
}
return ret;
}
int ObSrvNetworkFrame::reload_sql_thread_config()
{
@ -2203,11 +2200,16 @@ int ObSrvNetworkFrame::reload_sql_thread_config()
if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread) {
omt::TenantIdList ids;
GCTX.omt_->get_tenant_ids(ids);
for (int64_t i = 0; i < ids.size(); i++) {
int tenant_id = ids[i];
if (is_sys_tenant(tenant_id) || is_user_tenant(tenant_id)) {
reload_tenant_sql_thread_config(tenant_id);
if (OB_ISNULL(GCTX.omt_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null ptr", K(ret));
} else {
GCTX.omt_->get_tenant_ids(ids);
for (int64_t i = 0; i < ids.size(); i++) {
int tenant_id = ids[i];
if (is_sys_tenant(tenant_id) || is_user_tenant(tenant_id)) {
reload_tenant_sql_thread_config(tenant_id);
}
}
}
}