modify tenant queue size from 65536 to 8192 and support dynamic modify

This commit is contained in:
obdev
2023-02-09 17:37:18 +00:00
committed by ob-robot
parent 048e2eef5d
commit 93dfd6021f
5 changed files with 26 additions and 8 deletions

View File

@ -23,13 +23,11 @@ using namespace oceanbase::rpc;
using namespace oceanbase::obrpc;
int ObMultiLevelQueue::init(int64_t limit)
void ObMultiLevelQueue::set_limit(int64_t limit)
{
int ret = OB_SUCCESS;
for (int32_t level = 0; level < MULTI_LEVEL_QUEUE_SIZE; level++) {
queue_[level].set_limit(limit);
}
return ret;
}
int ObMultiLevelQueue::push(ObRequest &req, const int32_t level, const int32_t prio)

View File

@ -26,7 +26,7 @@ namespace omt
class ObMultiLevelQueue {
public:
int init(const int64_t limit);
void set_limit(const int64_t limit);
int push(rpc::ObRequest &req, const int32_t level, const int32_t prio);
int pop(common::ObLink *&task, const int32_t level, const int64_t timeout_us);
int pop_timeup(common::ObLink *&task, const int32_t level, const int64_t timeout_us);

View File

@ -317,6 +317,11 @@ int ObResourceGroup::init()
return ret;
}
void ObResourceGroup::update_queue_size()
{
req_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
}
int ObResourceGroup::acquire_more_worker(int64_t num, int64_t &succ_num)
{
int ret = OB_SUCCESS;
@ -686,9 +691,7 @@ int ObTenant::init(const ObTenantMeta &meta)
} else if (OB_ISNULL(multi_level_queue_ = OB_NEW(ObMultiLevelQueue, ObModIds::OMT_TENANT))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc ObMultiLevelQueue failed", K(ret), K(*this));
} else if (OB_FAIL(multi_level_queue_->init(
common::ObServerConfig::get_instance().tenant_task_queue_size))) {
LOG_WARN("ObMultiLevelQueue init failed", K(ret), K_(id), K(*this));
} else if (FALSE_IT(multi_level_queue_->set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size))) {
} else if (OB_ISNULL(rpc_stat_info_ = OB_NEW(RpcStatInfo, ObModIds::OMT_TENANT, id_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc RpcStatInfo failed", K(ret), K(*this));
@ -1367,6 +1370,7 @@ int ObTenant::timeup()
calibrate_worker_count();
handle_retry_req();
calibrate_token_count();
update_queue_size();
return ret;
}
@ -1384,6 +1388,20 @@ void ObTenant::handle_retry_req()
}
}
void ObTenant::update_queue_size()
{
ObResourceGroupNode* iter = NULL;
ObResourceGroup* group = nullptr;
while (NULL != (iter = group_map_.quick_next(iter))) {
group = static_cast<ObResourceGroup*>(iter);
group->update_queue_size();
}
req_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
if (nullptr != multi_level_queue_) {
multi_level_queue_->set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size);
}
}
void ObTenant::calibrate_token_count()
{
if (dynamic_modify_token_ || OB_DATA_TENANT_ID == id_) {

View File

@ -308,6 +308,7 @@ public:
share::ObCgroupCtrl *get_cgroup_ctrl() { return cgroup_ctrl_; }
int init();
void update_queue_size();
int acquire_more_worker(int64_t num, int64_t &succ_num);
void calibrate_token_count();
void check_worker_count();
@ -487,6 +488,7 @@ public:
int recv_large_request(rpc::ObRequest &req);
int push_retry_queue(rpc::ObRequest &req, const uint64_t idx);
void handle_retry_req();
void update_queue_size();
void calibrate_token_count();
void calibrate_group_token_count();

View File

@ -72,7 +72,7 @@ DEF_INT(high_priority_net_thread_count, OB_CLUSTER_PARAMETER, "0", "[0,64]",
DEF_INT(rdma_io_thread_count, OB_CLUSTER_PARAMETER, "0", "[0,8]",
"the number of RDMA I/O threads for Libreasy. Range: [0, 8] in integer, 0 stands for RDMA being disabled.",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::STATIC_EFFECTIVE));
DEF_INT(tenant_task_queue_size, OB_CLUSTER_PARAMETER, "65536", "[1024,]",
DEF_INT(tenant_task_queue_size, OB_CLUSTER_PARAMETER, "8192", "[1024,]",
"the size of the task queue for each tenant. Range: [1024,+∞)",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
_DEF_PARAMETER_SCOPE_CHECKER_EASY(private, Capacity, memory_limit, OB_CLUSTER_PARAMETER, "0",