diff --git a/src/observer/omt/ob_multi_level_queue.cpp b/src/observer/omt/ob_multi_level_queue.cpp index 5892084a9a..bac2681e43 100644 --- a/src/observer/omt/ob_multi_level_queue.cpp +++ b/src/observer/omt/ob_multi_level_queue.cpp @@ -23,13 +23,11 @@ using namespace oceanbase::rpc; using namespace oceanbase::obrpc; -int ObMultiLevelQueue::init(int64_t limit) +void ObMultiLevelQueue::set_limit(int64_t limit) { - int ret = OB_SUCCESS; for (int32_t level = 0; level < MULTI_LEVEL_QUEUE_SIZE; level++) { queue_[level].set_limit(limit); } - return ret; } int ObMultiLevelQueue::push(ObRequest &req, const int32_t level, const int32_t prio) diff --git a/src/observer/omt/ob_multi_level_queue.h b/src/observer/omt/ob_multi_level_queue.h index 0a0f781eef..1dc2e1731a 100644 --- a/src/observer/omt/ob_multi_level_queue.h +++ b/src/observer/omt/ob_multi_level_queue.h @@ -26,7 +26,7 @@ namespace omt class ObMultiLevelQueue { public: - int init(const int64_t limit); + void set_limit(const int64_t limit); int push(rpc::ObRequest &req, const int32_t level, const int32_t prio); int pop(common::ObLink *&task, const int32_t level, const int64_t timeout_us); int pop_timeup(common::ObLink *&task, const int32_t level, const int64_t timeout_us); diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index 6c8f12a77b..1814f38a6d 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -317,6 +317,11 @@ int ObResourceGroup::init() return ret; } +void ObResourceGroup::update_queue_size() +{ + req_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size); +} + int ObResourceGroup::acquire_more_worker(int64_t num, int64_t &succ_num) { int ret = OB_SUCCESS; @@ -686,9 +691,7 @@ int ObTenant::init(const ObTenantMeta &meta) } else if (OB_ISNULL(multi_level_queue_ = OB_NEW(ObMultiLevelQueue, ObModIds::OMT_TENANT))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("alloc ObMultiLevelQueue failed", K(ret), K(*this)); - } else if (OB_FAIL(multi_level_queue_->init( - common::ObServerConfig::get_instance().tenant_task_queue_size))) { - LOG_WARN("ObMultiLevelQueue init failed", K(ret), K_(id), K(*this)); + } else if (FALSE_IT(multi_level_queue_->set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size))) { } else if (OB_ISNULL(rpc_stat_info_ = OB_NEW(RpcStatInfo, ObModIds::OMT_TENANT, id_))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("alloc RpcStatInfo failed", K(ret), K(*this)); @@ -1367,6 +1370,7 @@ int ObTenant::timeup() calibrate_worker_count(); handle_retry_req(); calibrate_token_count(); + update_queue_size(); return ret; } @@ -1384,6 +1388,20 @@ void ObTenant::handle_retry_req() } } +void ObTenant::update_queue_size() +{ + ObResourceGroupNode* iter = NULL; + ObResourceGroup* group = nullptr; + while (NULL != (iter = group_map_.quick_next(iter))) { + group = static_cast(iter); + group->update_queue_size(); + } + req_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size); + if (nullptr != multi_level_queue_) { + multi_level_queue_->set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size); + } +} + void ObTenant::calibrate_token_count() { if (dynamic_modify_token_ || OB_DATA_TENANT_ID == id_) { diff --git a/src/observer/omt/ob_tenant.h b/src/observer/omt/ob_tenant.h index 598072aad1..75e9b9be7f 100644 --- a/src/observer/omt/ob_tenant.h +++ b/src/observer/omt/ob_tenant.h @@ -308,6 +308,7 @@ public: share::ObCgroupCtrl *get_cgroup_ctrl() { return cgroup_ctrl_; } int init(); + void update_queue_size(); int acquire_more_worker(int64_t num, int64_t &succ_num); void calibrate_token_count(); void check_worker_count(); @@ -487,6 +488,7 @@ public: int recv_large_request(rpc::ObRequest &req); int push_retry_queue(rpc::ObRequest &req, const uint64_t idx); void handle_retry_req(); + void update_queue_size(); void calibrate_token_count(); void calibrate_group_token_count(); diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index 26e6a4f01d..d196f24c1c 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -72,7 +72,7 @@ DEF_INT(high_priority_net_thread_count, OB_CLUSTER_PARAMETER, "0", "[0,64]", DEF_INT(rdma_io_thread_count, OB_CLUSTER_PARAMETER, "0", "[0,8]", "the number of RDMA I/O threads for Libreasy. Range: [0, 8] in integer, 0 stands for RDMA being disabled.", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::STATIC_EFFECTIVE)); -DEF_INT(tenant_task_queue_size, OB_CLUSTER_PARAMETER, "65536", "[1024,]", +DEF_INT(tenant_task_queue_size, OB_CLUSTER_PARAMETER, "8192", "[1024,]", "the size of the task queue for each tenant. Range: [1024,+∞)", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); _DEF_PARAMETER_SCOPE_CHECKER_EASY(private, Capacity, memory_limit, OB_CLUSTER_PARAMETER, "0",