diff --git a/deps/oblib/src/rpc/frame/ob_req_queue_thread.cpp b/deps/oblib/src/rpc/frame/ob_req_queue_thread.cpp index 04ce2b6ff4..b8e5cf37a9 100644 --- a/deps/oblib/src/rpc/frame/ob_req_queue_thread.cpp +++ b/deps/oblib/src/rpc/frame/ob_req_queue_thread.cpp @@ -33,6 +33,7 @@ using namespace oceanbase::lib; ObReqQueue::ObReqQueue(int queue_capacity) : wait_finish_(true), + push_worker_count_(0), queue_(), qhandler_(NULL), host_() @@ -73,6 +74,17 @@ bool ObReqQueue::push(ObRequest *req, int max_queue_len, bool block) return bret; } +oceanbase::rpc::ObRequest *ObReqQueue::pop() +{ + void *task = NULL; + int64_t timeout = 0; + ObRequest *req = NULL; + if (queue_.size() > 0 && OB_LIKELY(OB_SUCCESS == queue_.pop(task, timeout)) && OB_NOT_NULL(task)) { + req = reinterpret_cast(task); + } + return req; +} + void ObReqQueue::set_host(const ObAddr &host) { host_ = host; @@ -165,6 +177,7 @@ void ObReqQueue::loop() if (!wait_finish_) { LOG_INFO("exiting queue thread without wait finish", K(queue_.size())); } else { + while(get_push_worker_count() != 0); // wait to push finish LOG_INFO("exiting queue thread and wait remain finish", K(queue_.size())); // Process remains if we should wait until all task has been // processed before exiting this thread. Previous return code diff --git a/deps/oblib/src/rpc/frame/ob_req_queue_thread.h b/deps/oblib/src/rpc/frame/ob_req_queue_thread.h index c9be527177..36912defff 100644 --- a/deps/oblib/src/rpc/frame/ob_req_queue_thread.h +++ b/deps/oblib/src/rpc/frame/ob_req_queue_thread.h @@ -41,7 +41,7 @@ public: void set_qhandler(ObiReqQHandler *handler); bool push(ObRequest *req, int max_queue_len, bool block = true); - + ObRequest * pop(); // only for dispatch_req of mysql queue. pop req when tenant is stopped. void set_host(const common::ObAddr &host); void loop(); @@ -50,13 +50,28 @@ public: return queue_.size(); } + void inc_push_worker_count() + { + ATOMIC_INC(&push_worker_count_); + } + void dec_push_worker_count() + { + ATOMIC_DEC(&push_worker_count_); + } + bool get_push_worker_count() + { + return ATOMIC_LOAD(&push_worker_count_); + } + private: int process_task(void *task); DISALLOW_COPY_AND_ASSIGN(ObReqQueue); protected: + bool wait_finish_; + int push_worker_count_; common::ObLightyQueue queue_; ObiReqQHandler *qhandler_; diff --git a/src/observer/ob_srv_deliver.cpp b/src/observer/ob_srv_deliver.cpp index ca1dfff3d4..8033bf87e9 100644 --- a/src/observer/ob_srv_deliver.cpp +++ b/src/observer/ob_srv_deliver.cpp @@ -107,31 +107,42 @@ int extract_tenant_id(ObRequest &req, uint64_t &tenant_id) return ret; } -int dispatch_req(ObRequest& req) +int dispatch_req(ObRequest &req, QueueThread *global_mysql_queue) { int ret = OB_SUCCESS; + static const int64_t MAX_QUEUE_LEN = 10000; uint64_t tenant_id = OB_INVALID_ID; if (OB_FAIL(extract_tenant_id(req, tenant_id))) { LOG_WARN("extract tenant_id fail", K(ret), K(tenant_id), K(req)); - // handle all error by OB_TENANT_NOT_IN_SERVER - ret = OB_TENANT_NOT_IN_SERVER; } else if (is_meta_tenant(tenant_id)) { // cannot login meta tenant + ret = OB_ERR_UNEXPECTED; LOG_WARN("cannot login meta tenant", K(ret), K(tenant_id)); - ret = OB_TENANT_NOT_IN_SERVER; } else if (is_sys_tenant(tenant_id) || is_user_tenant(tenant_id)) { MTL_SWITCH(tenant_id) { QueueThread *mysql_queue = MTL(QueueThread *); - if (!mysql_queue->queue_.push(&req, - 10000)) { // MAX_QUEUE_LEN = 10000; + ObTenant *tenant = (ObTenant *)MTL_CTX(); + mysql_queue->queue_.inc_push_worker_count(); + if (OB_ISNULL(tenant)) { + ret = OB_TENANT_NOT_IN_SERVER; + LOG_WARN("tenant is NULL", K(ret), K(tenant_id)); + } else if (tenant->has_stopped()) { + ret = OB_TENANT_NOT_IN_SERVER; + LOG_WARN("tenant is stopped", K(ret), K(tenant_id)); + } else if (OB_ISNULL(mysql_queue)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("mysql_queue is NULL", K(ret), K(tenant_id)); + } else if (!mysql_queue->queue_.push(&req, MAX_QUEUE_LEN)) { // MAX_QUEUE_LEN = 10000; ret = OB_QUEUE_OVERFLOW; EVENT_INC(MYSQL_DELIVER_FAIL); LOG_ERROR("deliver request fail", K(ret), K(tenant_id), K(req)); + } else { + LOG_INFO("succeed to dispatch to tenant mysql queue", K(tenant_id)); } + mysql_queue->queue_.dec_push_worker_count(); // print queue length per 10s if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) { - LOG_INFO("mysql login queue", K(tenant_id), - K(mysql_queue->queue_.size())); + LOG_INFO("mysql login queue", K(mysql_queue->queue_.size())); } // if (0 != MTL(obmysql::ObSqlNioServer *) @@ -144,6 +155,18 @@ int dispatch_req(ObRequest& req) LOG_WARN("cannot switch to tenant", K(ret), K(tenant_id)); } } + + // failed to dispatch, push to global mysql queue + if (OB_FAIL(ret)) { + if (!global_mysql_queue->queue_.push(&req, MAX_QUEUE_LEN)) { + ret = OB_QUEUE_OVERFLOW; + EVENT_INC(MYSQL_DELIVER_FAIL); + LOG_ERROR("deliver request fail", K(req)); + } else { + LOG_INFO("fail to dispatch to tenant, but push to global mysql queue", K(ret)); + ret = OB_SUCCESS; + } + } return ret; } @@ -499,15 +522,11 @@ int ObSrvDeliver::deliver_mysql_request(ObRequest &req) LOG_ERROR("deliver request fail", K(req)); } } else if (OB_NOT_NULL(mysql_queue_)) { - if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread && - OB_SUCC(dispatch_req(req))) { - // do nothing - } else { - if (OB_TENANT_NOT_IN_SERVER == ret) { - LOG_WARN("fail to dispatch to tenant", K(ret), K(req)); - // set OB_SUCCESS to go normal procedure - ret = OB_SUCCESS; + if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread) { + if (OB_FAIL(dispatch_req(req, mysql_queue_))) { + LOG_ERROR("deliver request in dispatch_req fail", K(ret), K(req)); } + } else { if (OB_SUCC(ret) && !mysql_queue_->queue_.push(&req, MAX_QUEUE_LEN)) { ret = OB_QUEUE_OVERFLOW; EVENT_INC(MYSQL_DELIVER_FAIL); diff --git a/src/observer/omt/ob_multi_tenant.cpp b/src/observer/omt/ob_multi_tenant.cpp index 093d7dcda0..51cd40df97 100644 --- a/src/observer/omt/ob_multi_tenant.cpp +++ b/src/observer/omt/ob_multi_tenant.cpp @@ -254,13 +254,13 @@ static int start_sql_nio_server(ObSqlNioServer *&sql_nio_server) int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); ObSrvNetworkFrame *net_frame = GCTX.net_frame_; - sql_nio_server = OB_NEW(obmysql::ObSqlNioServer, "SqlNio", - obmysql::global_sm_conn_callback, - net_frame->get_mysql_handler(), tenant_id); if (is_sys_tenant(tenant_id) || is_user_tenant(tenant_id)) { - if (NULL == sql_nio_server) { - ret = OB_NOT_INIT; - LOG_ERROR("sql_nio_server init failed", K(ret)); + sql_nio_server = OB_NEW(obmysql::ObSqlNioServer, "SqlNio", + obmysql::global_sm_conn_callback, + net_frame->get_mysql_handler(), tenant_id); + if (OB_ISNULL(sql_nio_server)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_ERROR("fail to new sql_nio_server", K(ret)); } else { int net_thread_count = 0; omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id)); @@ -268,10 +268,8 @@ static int start_sql_nio_server(ObSqlNioServer *&sql_nio_server) net_thread_count = tenant_config->tenant_sql_net_thread_count; } if (0 == net_thread_count) { - ObTenant *tenant = NULL; - GCTX.omt_->get_tenant(tenant_id, tenant); - net_thread_count = - NULL == tenant ? 1 : std::max((int)tenant->unit_min_cpu(), 1); + ObTenant *tenant = (ObTenant *)MTL_CTX(); + net_thread_count = tenant ? std::max((int)tenant->unit_min_cpu(), 1) : 1; } sql_nio_server->get_nio()->set_run_wrapper(MTL_CTX()); if (OB_FAIL(sql_nio_server->start(-1, &net_frame->get_deliver(), @@ -279,7 +277,7 @@ static int start_sql_nio_server(ObSqlNioServer *&sql_nio_server) LOG_WARN("sql nio server start failed", K(ret)); } else { LOG_INFO("tenant sql_nio_server mtl_start success", K(ret), - K(tenant_id)); + K(tenant_id), K(net_thread_count)); } } } @@ -301,41 +299,40 @@ static int server_obj_pool_mtl_new(common::ObServerObjectPool *&pool) return ret; } -static int init_mysql_queue(QueueThread *&qthread) +static int start_mysql_queue(QueueThread *&qthread) { int ret = OB_SUCCESS; const uint64_t tenant_id = MTL_ID(); - qthread = OB_NEW(QueueThread, ObModIds::OB_RPC, "MysqlQueueTh", tenant_id); if (is_sys_tenant(tenant_id) || is_user_tenant(tenant_id)) { + qthread = OB_NEW(QueueThread, ObModIds::OB_RPC, "MysqlQueueTh", tenant_id); if (OB_ISNULL(qthread)) { ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to new qthread", K(ret), K(tenant_id)); } else if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::MysqlQueueTh, qthread->tg_id_))) { LOG_WARN("mysql queue init failed", K(ret), K(tenant_id), K(qthread->tg_id_)); } else { - qthread->queue_.set_qhandler( - &GCTX.net_frame_->get_deliver().get_qhandler()); - ret = TG_SET_RUNNABLE_AND_START(qthread->tg_id_, qthread->thread_); - } - - if (OB_SUCC(ret) && OB_NOT_NULL(qthread)) { - int sql_thread_count = 0; - omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id)); - if (tenant_config.is_valid()) { - sql_thread_count = tenant_config->tenant_sql_login_thread_count; + qthread->queue_.set_qhandler(&GCTX.net_frame_->get_deliver().get_qhandler()); + if (OB_FAIL(TG_SET_RUNNABLE_AND_START(qthread->tg_id_, qthread->thread_))) { + LOG_ERROR("fail to start qthread", K(ret), K(tenant_id), K(qthread->tg_id_)); + } else { + int sql_thread_count = 0; + omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id)); + if (tenant_config.is_valid()) { + sql_thread_count = tenant_config->tenant_sql_login_thread_count; + } + if (0 == sql_thread_count) { + ObTenant *tenant = (ObTenant *)MTL_CTX(); + sql_thread_count = tenant ? std::max((int)tenant->unit_min_cpu(), 1) : 1; + } + if (OB_FAIL(qthread->set_thread_count(sql_thread_count))) { + LOG_WARN("fail to set thread count", K(ret), K(tenant_id), K(qthread->tg_id_)); + } else { + LOG_INFO("tenant mysql_queue mtl_start success", K(ret), + K(tenant_id), K(qthread->tg_id_), K(sql_thread_count)); + } } - if (0 == sql_thread_count) { - ObTenant *tenant = NULL; - GCTX.omt_->get_tenant(tenant_id, tenant); - sql_thread_count = - NULL == tenant ? 1 : std::max((int)tenant->unit_min_cpu(), 1); - } - qthread->set_thread_count(sql_thread_count); - LOG_INFO("tenant mysql_queue mtl_init success", K(ret), K(tenant_id)); - } else { - LOG_WARN("tenant mysql_queue mtl_init fail", K(ret), K(tenant_id), - K(qthread->tg_id_)); } } return ret; @@ -448,7 +445,7 @@ int ObMultiTenant::init(ObAddr myaddr, MTL_BIND2(mtl_new_default, ObPsCache::mtl_init, nullptr, ObPsCache::mtl_stop, nullptr, mtl_destroy_default); MTL_BIND2(server_obj_pool_mtl_new, nullptr, nullptr, nullptr, nullptr, server_obj_pool_mtl_destroy); if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread) { - MTL_BIND2(nullptr, init_mysql_queue, nullptr, mtl_stop_default, + MTL_BIND2(nullptr, nullptr, start_mysql_queue, mtl_stop_default, mtl_wait_default, mtl_destroy_default); // MTL_BIND2(nullptr, nullptr, start_sql_nio_server, mtl_stop_default, // mtl_wait_default, mtl_destroy_default); @@ -2127,7 +2124,6 @@ int ObSrvNetworkFrame::reload_tenant_sql_thread_config(const uint64_t tenant_id) int ret = OB_SUCCESS; omt::ObTenantConfigGuard tenant_config(TENANT_CONF(tenant_id)); ObTenant *tenant = NULL; - GCTX.omt_->get_tenant(tenant_id, tenant); // reload tenant_sql_login_thread_count int sql_login_thread_count = 0; @@ -2135,11 +2131,12 @@ int ObSrvNetworkFrame::reload_tenant_sql_thread_config(const uint64_t tenant_id) sql_login_thread_count = tenant_config->tenant_sql_login_thread_count; } if (0 == sql_login_thread_count) { - sql_login_thread_count = - NULL == tenant ? 1 : std::max((int)tenant->unit_min_cpu(), 1); + ObTenant *tenant = (ObTenant *)MTL_CTX(); + sql_login_thread_count = tenant ? std::max((int)tenant->unit_min_cpu(), 1) : 1; } MTL_SWITCH(tenant_id) { - if (OB_FAIL(MTL(QueueThread *)->set_thread_count(sql_login_thread_count))) { + QueueThread *mysql_queue = MTL(QueueThread *); + if (OB_NOT_NULL(mysql_queue) && mysql_queue->set_thread_count(sql_login_thread_count)) { LOG_WARN("update tenant_sql_login_thread_count fail", K(ret)); } } @@ -2167,8 +2164,8 @@ int ObSrvNetworkFrame::reload_tenant_sql_thread_config(const uint64_t tenant_id) // } // } - return ret; - } + return ret; +} int ObSrvNetworkFrame::reload_sql_thread_config() { @@ -2203,11 +2200,16 @@ int ObSrvNetworkFrame::reload_sql_thread_config() if (GCONF._enable_new_sql_nio && GCONF._enable_tenant_sql_net_thread) { omt::TenantIdList ids; - GCTX.omt_->get_tenant_ids(ids); - for (int64_t i = 0; i < ids.size(); i++) { - int tenant_id = ids[i]; - if (is_sys_tenant(tenant_id) || is_user_tenant(tenant_id)) { - reload_tenant_sql_thread_config(tenant_id); + if (OB_ISNULL(GCTX.omt_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("null ptr", K(ret)); + } else { + GCTX.omt_->get_tenant_ids(ids); + for (int64_t i = 0; i < ids.size(); i++) { + int tenant_id = ids[i]; + if (is_sys_tenant(tenant_id) || is_user_tenant(tenant_id)) { + reload_tenant_sql_thread_config(tenant_id); + } } } }