From 37e923b7efe302b57551dd59110770a64d07beca Mon Sep 17 00:00:00 2001 From: obdev Date: Sat, 10 Feb 2024 04:56:48 +0000 Subject: [PATCH] [cp]fix: group request may be deaklock because nesting rpc --- src/observer/omt/ob_multi_level_queue.h | 1 + src/observer/omt/ob_tenant.cpp | 141 +++++++++++++++++++----- src/observer/omt/ob_tenant.h | 31 +++--- src/observer/omt/ob_th_worker.cpp | 6 +- 4 files changed, 136 insertions(+), 43 deletions(-) diff --git a/src/observer/omt/ob_multi_level_queue.h b/src/observer/omt/ob_multi_level_queue.h index 1dc2e1731..ddb63bacb 100644 --- a/src/observer/omt/ob_multi_level_queue.h +++ b/src/observer/omt/ob_multi_level_queue.h @@ -17,6 +17,7 @@ #include "rpc/ob_request.h" #define MULTI_LEVEL_QUEUE_SIZE (10) #define MULTI_LEVEL_THRESHOLD (2) +#define GROUP_MULTI_LEVEL_THRESHOLD (1) namespace oceanbase { diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index 92478e970..9bdfc159b 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -365,6 +365,7 @@ ObResourceGroup::ObResourceGroup(int32_t group_id, ObTenant* tenant, share::ObCg recv_req_cnt_(0), shrink_(false), token_change_ts_(0), + nesting_worker_cnt_(0), tenant_(tenant), cgroup_ctrl_(cgroup_ctrl) { @@ -376,6 +377,8 @@ int ObResourceGroup::init() if (nullptr == tenant_) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("group init failed"); + } else if (FALSE_IT(multi_level_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size))) { + LOG_WARN("multi level queue set limit failed", K(ret), K(tenant_->id()), K(group_id_), K(*this)); } else { req_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size); inited_ = true; @@ -388,6 +391,28 @@ void ObResourceGroup::update_queue_size() req_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size); } +int ObResourceGroup::acquire_level_worker(int32_t level) +{ + int ret = OB_SUCCESS; + ObTenantSwitchGuard guard(tenant_); + + if (level <= 0 || level > MAX_REQUEST_LEVEL) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpected level", K(level), K(tenant_->id())); + } else { + ObThWorker *w = nullptr; + if (OB_FAIL(create_worker(w, tenant_, group_id_, level, true /*ignore max worker limit*/, this))) { + LOG_WARN("create worker failed", K(ret)); + } else if (!nesting_workers_.add_last(&w->worker_node_)) { + OB_ASSERT(false); + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("add worker to list fail", K(ret)); + } + } + return ret; +} + + int ObResourceGroup::acquire_more_worker(int64_t num, int64_t &succ_num, bool force) { int ret = OB_SUCCESS; @@ -424,6 +449,14 @@ void ObResourceGroup::check_worker_count() { int ret = OB_SUCCESS; if (OB_SUCC(workers_lock_.trylock())) { + if (is_user_group(group_id_) + && nesting_worker_cnt_ < (MAX_REQUEST_LEVEL - GROUP_MULTI_LEVEL_THRESHOLD)) { + for (int level = GROUP_MULTI_LEVEL_THRESHOLD + nesting_worker_cnt_; OB_SUCC(ret) && level < MAX_REQUEST_LEVEL; level++) { + if (OB_SUCC(acquire_level_worker(level))) { + nesting_worker_cnt_ = nesting_worker_cnt_ + 1; + } + } + } int64_t now = ObTimeUtility::current_time(); bool enable_dynamic_worker = true; int64_t threshold = 3 * 1000; @@ -508,9 +541,25 @@ int ObResourceGroup::clear_worker() { int ret = OB_SUCCESS; ObMutexGuard guard(workers_lock_); - while (req_queue_.size() > 0) { + while (req_queue_.size() > 0 + || (multi_level_queue_.get_total_size() > 0)) { ob_usleep(10L * 1000L); } + while (nesting_workers_.get_size() > 0) { + int ret = OB_SUCCESS; + DLIST_FOREACH_REMOVESAFE(wnode, nesting_workers_) { + ObThWorker *w = static_cast(wnode->get_data()); + nesting_workers_.remove(wnode); + destroy_worker(w); + } + if (REACH_TIME_INTERVAL(10 * 1000L * 1000L)) { + LOG_INFO( + "Tenant has some group nesting workers need stop", + K(tenant_->id()), + "group nesting workers", nesting_workers_.get_size(), + "group id", get_group_id()); + } + } while (workers_.get_size() > 0) { int ret = OB_SUCCESS; DLIST_FOREACH_REMOVESAFE(wnode, workers_) { @@ -523,7 +572,7 @@ int ObResourceGroup::clear_worker() "Tenant has some group workers need stop", K(tenant_->id()), "group workers", workers_.get_size(), - "group type", get_group_id()); + "group id", get_group_id()); } ob_usleep(10L * 1000L); } @@ -753,11 +802,8 @@ int ObTenant::init(const ObTenantMeta &meta) // there must be 2 workers. static_cast(workers_.get_first()->get_data())->set_priority_limit(QQ_HIGH); static_cast(workers_.get_last()->get_data())->set_priority_limit(QQ_NORMAL); - if (!is_virtual_tenant_id(id_) && !is_meta_tenant(id_)) { - for (int level = MULTI_LEVEL_THRESHOLD; level < MAX_REQUEST_LEVEL; level++) { - if (OB_FAIL(acquire_level_worker(1, succ_cnt, level))) { - break; - } + for (int level = MULTI_LEVEL_THRESHOLD; OB_SUCC(ret) && level < MAX_REQUEST_LEVEL; level++) { + if (OB_SUCC(acquire_level_worker(1, succ_cnt, level))) { succ_cnt = 0L; } } @@ -1165,21 +1211,41 @@ int ObTenant::get_new_request( ObLink* task = nullptr; req = nullptr; + int wk_level = 0; Thread::WaitGuard guard(Thread::WAIT_IN_TENANT_QUEUE); if (w.is_group_worker()) { w.set_large_query(false); w.set_curr_request_level(0); - if (OB_SUCC(w.get_group()->req_queue_.pop(task, timeout))) { - EVENT_INC(REQUEST_DEQUEUE_COUNT); - if (nullptr == req && nullptr != task) { - req = static_cast(task); - if (req->large_retry_flag()) { - w.set_large_query(); + wk_level = w.get_worker_level(); + if (wk_level < 0 || wk_level >= MAX_REQUEST_LEVEL) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpected level", K(wk_level), K(id_)); + } else if (wk_level >= MAX_REQUEST_LEVEL - 1) { + ret = w.get_group()->multi_level_queue_.pop_timeup(task, wk_level, timeout); + if ((ret == OB_SUCCESS && nullptr == task) || ret == OB_ENTRY_NOT_EXIST) { + ret = OB_ENTRY_NOT_EXIST; + usleep(10 * 1000L); + } else if (ret == OB_SUCCESS){ + rpc::ObRequest *tmp_req = static_cast(task); + LOG_WARN("req is timeout and discard", "tenant_id", id_, K(tmp_req)); + } else { + LOG_ERROR("pop queue err", "tenant_id", id_, K(ret)); + } + } else if (w.is_level_worker()) { + ret = w.get_group()->multi_level_queue_.pop(task, wk_level, timeout); + } else { + for (int32_t level = MAX_REQUEST_LEVEL - 1; level >= GROUP_MULTI_LEVEL_THRESHOLD; level--) { + IGNORE_RETURN w.get_group()->multi_level_queue_.try_pop(task, level); + if (nullptr != task) { + ret = OB_SUCCESS; + break; } } + if (nullptr == task) { + ret = w.get_group()->req_queue_.pop(task, timeout); + } } } else { - int wk_level = 0; w.set_large_query(false); w.set_curr_request_level(0); wk_level = w.get_worker_level(); @@ -1228,21 +1294,25 @@ int ObTenant::get_new_request( } } } + } - if (OB_SUCC(ret)) { - EVENT_INC(REQUEST_DEQUEUE_COUNT); - if (nullptr == req && nullptr != task) { - req = static_cast(task); + if (OB_SUCC(ret)) { + EVENT_INC(REQUEST_DEQUEUE_COUNT); + if (nullptr == req && nullptr != task) { + req = static_cast(task); + } + if (nullptr != req) { + if (w.is_group_worker() && req->large_retry_flag()) { + w.set_large_query(); } - if (nullptr != req && req->get_type() == ObRequest::OB_RPC) { - using obrpc::ObRpcPacket; - const ObRpcPacket &pkt - = static_cast(req->get_packet()); - w.set_curr_request_level(pkt.get_request_level()); + if (req->get_type() == ObRequest::OB_RPC) { + using obrpc::ObRpcPacket; + const ObRpcPacket &pkt + = static_cast(req->get_packet()); + w.set_curr_request_level(pkt.get_request_level()); } } } - return ret; } @@ -1280,6 +1350,7 @@ int ObTenant::recv_group_request(ObRequest &req, int64_t group_id) ObResourceGroup* group = nullptr; ObResourceGroupNode* node = nullptr; ObResourceGroupNode key(group_id); + int req_level = 0; if (OB_SUCC(GroupMap::err_code_map(group_map_.get(&key, node)))) { group = static_cast(node); } else if (OB_FAIL(group_map_.create_and_insert_group(group_id, this, &cgroup_ctrl_, group))) { @@ -1292,9 +1363,25 @@ int ObTenant::recv_group_request(ObRequest &req, int64_t group_id) LOG_INFO("create group successfully", K_(id), K(group_id), K(group)); } if (OB_SUCC(ret)) { - group->atomic_inc_recv_cnt(); - if (OB_FAIL(group->req_queue_.push(&req, 0))) { - LOG_ERROR("push request to queue fail", K(ret), K(this)); + if (req.get_type() == ObRequest::OB_RPC) { + using obrpc::ObRpcPacket; + const ObRpcPacket &pkt + = static_cast(req.get_packet()); + req_level = min(pkt.get_request_level(), MAX_REQUEST_LEVEL - 1); + } + if (req_level < 0) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("unexpected level", K(req_level), K(id_), K(group_id)); + } else if (is_user_group(group_id) && req_level >= GROUP_MULTI_LEVEL_THRESHOLD) { + group->recv_level_rpc_cnt_.atomic_inc(req_level); + if (OB_FAIL(group->multi_level_queue_.push(req, req_level, 0))) { + LOG_WARN("push request to queue fail", K(req_level), K(id_), K(group_id)); + } + } else { + group->atomic_inc_recv_cnt(); + if (OB_FAIL(group->req_queue_.push(&req, 0))) { + LOG_ERROR("push request to queue fail", K(id_), K(group_id)); + } } int tmp_ret = OB_SUCCESS; if (!share::ObCgSet::instance().is_group_critical(group_id) && 0 == group->workers_.get_size()) { diff --git a/src/observer/omt/ob_tenant.h b/src/observer/omt/ob_tenant.h index 8691830eb..3c0619d12 100644 --- a/src/observer/omt/ob_tenant.h +++ b/src/observer/omt/ob_tenant.h @@ -287,18 +287,33 @@ public: int init(); void update_queue_size(); int acquire_more_worker(int64_t num, int64_t &succ_num, bool force = false); + int acquire_level_worker(int32_t level); void check_worker_count(); void check_worker_count(ObThWorker &w); int clear_worker(); + TO_STRING_KV("group_id", group_id_, + "queue_size", req_queue_.size(), + "recv_req_cnt", recv_req_cnt_, + "min_worker_cnt", min_worker_cnt(), + "max_worker_cnt", max_worker_cnt(), + K(multi_level_queue_), + "recv_level_rpc_cnt", recv_level_rpc_cnt_, + "worker_cnt", workers_.get_size(), + "nesting_worker_cnt", nesting_workers_.get_size(), + "token_change", token_change_ts_); private: lib::ObMutex& workers_lock_; WList workers_; + WList nesting_workers_; common::ObPriorityQueue2<0, 1> req_queue_; + ObMultiLevelQueue multi_level_queue_; bool inited_; // Mark whether the container has threads and queues allocated volatile uint64_t recv_req_cnt_ CACHE_ALIGNED; // Statistics requested to enqueue volatile bool shrink_ CACHE_ALIGNED; int64_t token_change_ts_; + MultiLevelReqCnt recv_level_rpc_cnt_; + int nesting_worker_cnt_; ObTenant *tenant_; share::ObCgroupCtrl *cgroup_ctrl_; }; @@ -323,20 +338,8 @@ public: while (NULL != (iter = const_cast(this)->GroupHash::quick_next(iter))) { group = static_cast(iter); common::databuff_printf(buf, buf_len, pos, - "group_id = %d," - "queue_size = %ld," - "recv_req_cnt = %lu," - "min_worker_cnt = %ld," - "max_worker_cnt = %ld," - "worker_cnt = %d," - "token_change = %ld ", - group->group_id_, - group->req_queue_.size(), - group->recv_req_cnt_, - group->min_worker_cnt(), - group->max_worker_cnt(), - group->workers_.get_size(), - group->token_change_ts_); + "%s", + to_cstring(group)); } return pos; } diff --git a/src/observer/omt/ob_th_worker.cpp b/src/observer/omt/ob_th_worker.cpp index c44cb1dfb..986ff5fa9 100644 --- a/src/observer/omt/ob_th_worker.cpp +++ b/src/observer/omt/ob_th_worker.cpp @@ -392,10 +392,12 @@ void ObThWorker::worker(int64_t &tenant_id, int64_t &req_recv_timestamp, int32_t ret = OB_SUCCESS; } IGNORE_RETURN ATOMIC_FAA(&idle_us_, (wait_end_time - wait_start_time)); - if (this->get_worker_level() == 0 && !is_group_worker()) { + if (this->get_worker_level() != 0) { + // nesting workers not allowed to calling check_worker_count + } else if (this->get_group() == nullptr) { tenant_->check_worker_count(*this); tenant_->lq_end(*this); - } else if (this->is_group_worker()) { + } else { group_->check_worker_count(*this); } }