diff --git a/src/observer/omt/ob_tenant.cpp b/src/observer/omt/ob_tenant.cpp index 947e039c71..10335c9a61 100644 --- a/src/observer/omt/ob_tenant.cpp +++ b/src/observer/omt/ob_tenant.cpp @@ -65,6 +65,7 @@ using namespace oceanbase::obrpc; #define EXPAND_INTERVAL (1 * 1000 * 1000) #define SHRINK_INTERVAL (1 * 1000 * 1000) +#define SLEEP_INTERVAL (60 * 1000 * 1000) int64_t FASTSTACK_REQ_QUEUE_SIZE_THRESHOLD = INT64_MAX; @@ -379,7 +380,6 @@ void ObResourceGroup::check_worker_count() { int ret = OB_SUCCESS; if (OB_SUCC(workers_lock_.trylock())) { - int64_t token = 1; int64_t now = ObTimeUtility::current_time(); bool enable_dynamic_worker = true; int64_t threshold = 3 * 1000; @@ -388,6 +388,7 @@ void ObResourceGroup::check_worker_count() enable_dynamic_worker = tenant_config.is_valid() ? tenant_config->_ob_enable_dynamic_worker : true; threshold = tenant_config.is_valid() ? tenant_config->_stall_threshold_for_dynamic_worker : 3 * 1000; } + int64_t blocking_cnt = 0; DLIST_FOREACH_REMOVESAFE(wnode, workers_) { const auto w = static_cast(wnode->get_data()); if (w->has_set_stop()) { @@ -397,28 +398,47 @@ void ObResourceGroup::check_worker_count() && 0 != w->blocking_ts() && now - w->blocking_ts() >= threshold && enable_dynamic_worker) { - ++token; + ++blocking_cnt; } } + + int64_t target_min = 0; + int64_t token = 0; + bool is_group_critical = share::ObCgSet::instance().is_group_critical(group_id_); + if (is_group_critical) { + target_min = min_worker_cnt(); + token = 1 + blocking_cnt; + token = std::min(token, max_worker_cnt()); + token = std::max(token, target_min); + } else { + target_min = std::min(req_queue_.size(), min_worker_cnt()); + if (blocking_cnt == 0 && req_queue_.size() == 0) { + token = 0; + } else { + token = 1 + blocking_cnt; + token = std::min(token, max_worker_cnt()); + } + } + int64_t succ_num = 0L; - token = std::max(token, min_worker_cnt()); - token = std::min(token, max_worker_cnt()); - if (OB_UNLIKELY(workers_.get_size() < min_worker_cnt())) { - const auto diff = min_worker_cnt() - workers_.get_size(); + int64_t shrink_ts = + (!is_group_critical && workers_.get_size() == 1 && token == 0) ? SLEEP_INTERVAL : SHRINK_INTERVAL; + if (OB_UNLIKELY(workers_.get_size() < target_min)) { + const int64_t diff = target_min - workers_.get_size(); token_change_ts_ = now; ATOMIC_STORE(&shrink_, false); acquire_more_worker(diff, succ_num, /* force */ true); LOG_INFO("worker thread created", K(tenant_->id()), K(group_id_), K(token)); - } else if (OB_UNLIKELY(token > workers_.get_size()) - && OB_LIKELY(ObMallocAllocator::get_instance()->get_tenant_remain(tenant_->id()) > ObMallocAllocator::get_instance()->get_tenant_limit(tenant_->id()) * 0.05)) { + } else if (OB_UNLIKELY(workers_.get_size() < token) && + OB_LIKELY(ObMallocAllocator::get_instance()->get_tenant_remain(tenant_->id()) > + ObMallocAllocator::get_instance()->get_tenant_limit(tenant_->id()) * 0.05)) { ATOMIC_STORE(&shrink_, false); if (OB_LIKELY(now - token_change_ts_ >= EXPAND_INTERVAL)) { token_change_ts_ = now; acquire_more_worker(1, succ_num); LOG_INFO("worker thread created", K(tenant_->id()), K(group_id_), K(token)); } - } else if (OB_UNLIKELY(token < workers_.get_size()) - && OB_LIKELY(now - token_change_ts_ >= SHRINK_INTERVAL)) { + } else if (OB_UNLIKELY(workers_.get_size() > token) && OB_LIKELY(now - token_change_ts_ >= shrink_ts)) { token_change_ts_ = now; ATOMIC_STORE(&shrink_, true); LOG_INFO("worker thread began to shrink", K(tenant_->id()), K(group_id_), K(token)); @@ -1214,7 +1234,8 @@ inline bool is_warmup(const ObRpcPacket &pkt) int ObTenant::recv_group_request(ObRequest &req, int64_t group_id) { int ret = OB_SUCCESS; - req.set_enqueue_timestamp(ObTimeUtility::current_time()); + int64_t now = ObTimeUtility::current_time(); + req.set_enqueue_timestamp(now); ObResourceGroup* group = nullptr; ObResourceGroupNode* node = nullptr; ObResourceGroupNode key(group_id); @@ -1234,6 +1255,21 @@ int ObTenant::recv_group_request(ObRequest &req, int64_t group_id) if (OB_FAIL(group->req_queue_.push(&req, 0))) { LOG_ERROR("push request to queue fail", K(ret), K(this)); } + int tmp_ret = OB_SUCCESS; + if (!share::ObCgSet::instance().is_group_critical(group_id) && 0 == group->workers_.get_size()) { + if (OB_SUCCESS == (tmp_ret = group->workers_lock_.trylock())) { + if (0 == group->workers_.get_size()) { + int64_t succ_num = 0L; + group->token_change_ts_ = now; + ATOMIC_STORE(&group->shrink_, false); + group->acquire_more_worker(1, succ_num, /* force */ true); + LOG_INFO("worker thread created", K(id()), K(group->group_id_)); + } + IGNORE_RETURN group->workers_lock_.unlock(); + } else { + LOG_WARN("failed to lock group workers", K(ret), K(id_), K(group_id)); + } + } } return ret; } diff --git a/src/share/resource_manager/ob_cgroup_ctrl.h b/src/share/resource_manager/ob_cgroup_ctrl.h index 5cb936fbe1..83187e8fc6 100644 --- a/src/share/resource_manager/ob_cgroup_ctrl.h +++ b/src/share/resource_manager/ob_cgroup_ctrl.h @@ -25,19 +25,43 @@ namespace share { class ObGroupName; +typedef enum : uint64_t { + DEFAULT = 0, + CRITICAL = 1 << 0, + INVALID = UINT64_MAX +} group_flags_t; + enum ObCgId { -#define CGID_DEF(name, id, worker_concurrency...) name = id, +#define CGID_DEF(name, id, ...) name = id, #include "ob_group_list.h" #undef CGID_DEF OBCG_MAXNUM, }; -class ObCgWorkerConcurrency +class ObCgInfo { public: - ObCgWorkerConcurrency() : worker_concurrency_(1) {} + ObCgInfo() : name_(nullptr), is_critical_(false), worker_concurrency_(1) {} + void set_name(const char *name) { name_ = name; } + void set_args(group_flags_t flags = DEFAULT, uint64_t worker_concurrency = 1) + { + set_flags(flags); + set_worker_concurrency(worker_concurrency); + } + void set_flags(group_flags_t flags = DEFAULT) + { + if (DEFAULT == flags || INVALID == flags) { + // do nothing + } else { + if (CRITICAL & flags) { + is_critical_ = true; + } + } + } void set_worker_concurrency(uint64_t worker_concurrency = 1) { worker_concurrency_ = worker_concurrency; } + const char *name_; + bool is_critical_; uint64_t worker_concurrency_; }; @@ -45,7 +69,7 @@ class ObCgSet { ObCgSet() { -#define CGID_DEF(name, id, worker_concurrency...) names_[id] = #name; worker_concurrency_[id].set_worker_concurrency(worker_concurrency); +#define CGID_DEF(name, id, args...) group_infos_[id].set_name(#name); group_infos_[id].set_args(args); #include "ob_group_list.h" #undef CGID_DEF } @@ -56,7 +80,7 @@ public: { const char *name = "OBCG_DEFAULT"; if (id >= 0 && id < OBCG_MAXNUM) { - name = names_[id]; + name = group_infos_[id].name_; } return name; } @@ -64,12 +88,21 @@ public: uint64_t get_worker_concurrency(int64_t id) const { uint64_t worker_concurrency = 1; - if (id > 0 && id < OBCG_MAXNUM) { - worker_concurrency = worker_concurrency_[id].worker_concurrency_; + if (id >= 0 && id < OBCG_MAXNUM) { + worker_concurrency = group_infos_[id].worker_concurrency_; } return worker_concurrency; } + bool is_group_critical(int64_t id) const + { + bool is_group_critical = false; + if (id >= 0 && id < OBCG_MAXNUM) { + is_group_critical = group_infos_[id].is_critical_; + } + return is_group_critical; + } + static ObCgSet &instance() { return instance_; @@ -77,9 +110,7 @@ public: private: static ObCgSet instance_; - - const char *names_[OBCG_MAXNUM]; - ObCgWorkerConcurrency worker_concurrency_[OBCG_MAXNUM]; + ObCgInfo group_infos_[OBCG_MAXNUM]; }; struct OBGroupIOInfo final diff --git a/src/share/resource_manager/ob_group_list.h b/src/share/resource_manager/ob_group_list.h index c9d8e5dcfc..7862364c02 100644 --- a/src/share/resource_manager/ob_group_list.h +++ b/src/share/resource_manager/ob_group_list.h @@ -12,16 +12,21 @@ // [0, 100) for inner group -//CGID_DEF(group_name, group_id, worker_concurrency) +// CGID_DEF(group_name, group_id[, flags=DEFAULT][, worker_concurrency=1]) +// example: CGID_DEF(OBCG_EXAMPLE1, 1, CRITICAL) +// CGID_DEF(OBCG_EXAMPLE2, 2, DEFAULT, 4) +// flags option: +// DEFAULT. No flags. +// CRITICAL. If a group is not critical, the thread num of it can be set to 0 when idle. CGID_DEF(OBCG_DEFAULT, 0) CGID_DEF(OBCG_CLOG, 1) -CGID_DEF(OBCG_ELECTION, 2) -CGID_DEF(OBCG_ID_SERVICE, 5) -CGID_DEF(OBCG_ID_SQL_REQ_LEVEL1, 6, 4) -CGID_DEF(OBCG_ID_SQL_REQ_LEVEL2, 7, 4) -CGID_DEF(OBCG_ID_SQL_REQ_LEVEL3, 8, 4) -CGID_DEF(OBCG_DETECT_RS, 9) -CGID_DEF(OBCG_LOC_CACHE, 10) +CGID_DEF(OBCG_ELECTION, 2, CRITICAL) +CGID_DEF(OBCG_ID_SERVICE, 5, CRITICAL) +CGID_DEF(OBCG_ID_SQL_REQ_LEVEL1, 6, DEFAULT, 4) +CGID_DEF(OBCG_ID_SQL_REQ_LEVEL2, 7, DEFAULT, 4) +CGID_DEF(OBCG_ID_SQL_REQ_LEVEL3, 8, DEFAULT, 4) +CGID_DEF(OBCG_DETECT_RS, 9, CRITICAL) +CGID_DEF(OBCG_LOC_CACHE, 10, CRITICAL) CGID_DEF(OBCG_SQL_NIO, 11) CGID_DEF(OBCG_MYSQL_LOGIN, 12) CGID_DEF(OBCG_CDCSERVICE, 13) @@ -29,9 +34,12 @@ CGID_DEF(OBCG_DIAG_TENANT, 14) CGID_DEF(OBCG_WR, 15) CGID_DEF(OBCG_STORAGE_HA_LEVEL1, 16) CGID_DEF(OBCG_STORAGE_HA_LEVEL2, 17) -CGID_DEF(OBCG_DBA_COMMAND, 18, 1) +CGID_DEF(OBCG_DBA_COMMAND, 18) CGID_DEF(OBCG_STORAGE, 19) CGID_DEF(OBCG_LOCK, 20) CGID_DEF(OBCG_UNLOCK, 21) CGID_DEF(OBCG_DIRECT_LOAD_HIGH_PRIO, 22) + + +// 100 for CG_LQ CGID_DEF(OBCG_LQ, 100)