From 0559b6d6990f33923c06ecf73ac9eeea917cd1a8 Mon Sep 17 00:00:00 2001 From: zhjc1124 Date: Mon, 21 Aug 2023 11:10:32 +0000 Subject: [PATCH] add create worker forcely for each group --- src/observer/omt/ob_tenant.cpp | 14 +++++++------- src/observer/omt/ob_tenant.h | 6 +++--- src/observer/omt/ob_th_worker.cpp | 4 ++-- src/observer/omt/ob_th_worker.h | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index f689f807fc..31191a8dc0 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -337,7 +337,7 @@ void ObResourceGroup::update_queue_size() req_queue_.set_limit(common::ObServerConfig::get_instance().tenant_task_queue_size); } -int ObResourceGroup::acquire_more_worker(int64_t num, int64_t &succ_num) +int ObResourceGroup::acquire_more_worker(int64_t num, int64_t &succ_num, bool force) { int ret = OB_SUCCESS; ObTenantSwitchGuard guard(tenant_); @@ -347,7 +347,7 @@ int ObResourceGroup::acquire_more_worker(int64_t num, int64_t &succ_num) while (OB_SUCC(ret) && need_num > succ_num) { ObThWorker *w = nullptr; - if (OB_FAIL(create_worker(w, tenant_, group_id_, INT32_MAX, this))) { + if (OB_FAIL(create_worker(w, tenant_, group_id_, INT32_MAX, force, this))) { LOG_WARN("create worker failed", K(ret)); } else if (!workers_.add_last(&w->worker_node_)) { OB_ASSERT(false); @@ -401,7 +401,7 @@ void ObResourceGroup::check_worker_count() const auto diff = min_worker_cnt() - workers_.get_size(); token_change_ts_ = now; ATOMIC_STORE(&shrink_, false); - acquire_more_worker(diff, succ_num); + acquire_more_worker(diff, succ_num, /* force */ true); LOG_INFO("worker thread created", K(tenant_->id()), K(group_id_), K(token)); } else if (OB_UNLIKELY(token > workers_.get_size()) && OB_LIKELY(ObMallocAllocator::get_instance()->get_tenant_remain(tenant_->id()) > ObMallocAllocator::get_instance()->get_tenant_limit(tenant_->id()) * 0.05)) { @@ -1402,7 +1402,7 @@ void ObTenant::check_worker_count() const auto diff = min_worker_cnt() - workers_.get_size(); token_change_ts_ = now; ATOMIC_STORE(&shrink_, false); - acquire_more_worker(diff, succ_num); + acquire_more_worker(diff, succ_num, /* force */ true); LOG_INFO("worker thread created", K(id_), K(token)); } else if (OB_UNLIKELY(token > workers_.get_size()) && OB_LIKELY(ObMallocAllocator::get_instance()->get_tenant_remain(id_) > ObMallocAllocator::get_instance()->get_tenant_limit(id_) * 0.05)) { @@ -1465,7 +1465,7 @@ int ObTenant::acquire_level_worker(int64_t num, int64_t &succ_num, int32_t level } else { while (OB_SUCC(ret) && need_num > succ_num) { ObThWorker *w = nullptr; - if (OB_FAIL(create_worker(w, this, 0, level))) { + if (OB_FAIL(create_worker(w, this, 0, level, true))) { LOG_WARN("create worker failed", K(ret)); } else if (!nesting_workers_.add_last(&w->worker_node_)) { OB_ASSERT(false); @@ -1489,7 +1489,7 @@ int ObTenant::acquire_level_worker(int64_t num, int64_t &succ_num, int32_t level } // This interface is unnecessary after adding htap -int ObTenant::acquire_more_worker(int64_t num, int64_t &succ_num) +int ObTenant::acquire_more_worker(int64_t num, int64_t &succ_num, bool force) { int ret = OB_SUCCESS; succ_num = 0; @@ -1497,7 +1497,7 @@ int ObTenant::acquire_more_worker(int64_t num, int64_t &succ_num) ObTenantSwitchGuard guard(this); while (OB_SUCC(ret) && num > succ_num) { ObThWorker *w = nullptr; - if (OB_FAIL(create_worker(w, this, 0, 0))) { + if (OB_FAIL(create_worker(w, this, 0, 0, force))) { LOG_WARN("create worker failed", K(ret)); } else if (!workers_.add_last(&w->worker_node_)) { OB_ASSERT(false); diff --git a/src/observer/omt/ob_tenant.h b/src/observer/omt/ob_tenant.h index dbbbc29674..9c41ce7acb 100644 --- a/src/observer/omt/ob_tenant.h +++ b/src/observer/omt/ob_tenant.h @@ -280,7 +280,7 @@ public: int init(); void update_queue_size(); - int acquire_more_worker(int64_t num, int64_t &succ_num); + int acquire_more_worker(int64_t num, int64_t &succ_num, bool force = false); void check_worker_count(); void check_worker_count(ObThWorker &w); int clear_worker(); @@ -359,7 +359,7 @@ class ObTenant : public share::ObTenantBase friend class ObResourceGroup; friend int ::select_dump_tenant_info(lua_State*); friend int create_worker(ObThWorker* &worker, ObTenant *tenant, int32_t group_id, - int32_t level, ObResourceGroup *group); + int32_t level, bool force, ObResourceGroup *group); friend int destroy_worker(ObThWorker *worker); using WListNode = common::ObDLinkNode; using WList = common::ObDList; @@ -506,7 +506,7 @@ private: void check_group_worker_count(); // alloc NUM worker int acquire_level_worker(int64_t num, int64_t &succ_num, int32_t level); - int acquire_more_worker(int64_t num, int64_t &succ_num); + int acquire_more_worker(int64_t num, int64_t &succ_num, bool force = false); int64_t worker_count() const { return workers_.get_size(); } diff --git a/src/observer/omt/ob_th_worker.cpp b/src/observer/omt/ob_th_worker.cpp index 5621c9aa36..158a17c411 100644 --- a/src/observer/omt/ob_th_worker.cpp +++ b/src/observer/omt/ob_th_worker.cpp @@ -46,10 +46,10 @@ extern TLOCAL(bool, TLOCAL_NEED_WAIT_IN_LOCK_WAIT_MGR); namespace omt { int create_worker(ObThWorker* &worker, ObTenant *tenant, int32_t group_id, - int32_t level, ObResourceGroup *group) + int32_t level, bool force, ObResourceGroup *group) { int ret = OB_SUCCESS; - if (tenant->total_worker_cnt() >= tenant->max_worker_cnt()) { + if (!force && tenant->total_worker_cnt() >= tenant->max_worker_cnt()) { ret = OB_RESOURCE_OUT; LOG_WARN("create worker fail", K(ret), K(tenant->id()), K(group_id), K(level), K(tenant->total_worker_cnt()), K(tenant->max_worker_cnt())); diff --git a/src/observer/omt/ob_th_worker.h b/src/observer/omt/ob_th_worker.h index f436cbda7f..08c00dd0a0 100644 --- a/src/observer/omt/ob_th_worker.h +++ b/src/observer/omt/ob_th_worker.h @@ -169,7 +169,7 @@ level: set worker's level, in ObResourceGroup level = INT32_MAX, in ObTenant lev group: set worker's group, in ObResourceGroup level = this, in ObTenant level = nullptr, */ int create_worker(ObThWorker* &worker, ObTenant *tenant, int32_t group_id, - int32_t level = INT32_MAX, ObResourceGroup *group = nullptr); + int32_t level = INT32_MAX, bool force = false, ObResourceGroup *group = nullptr); // defalut level=INT32_MAX, group=nullptr int destroy_worker(ObThWorker *worker);