[CP] add flags parameter for CGID_DEF

This commit is contained in:
zhjc1124 2024-02-07 20:10:18 +00:00 committed by ob-robot
parent d358962098
commit f3e0f2fa6b
3 changed files with 105 additions and 30 deletions

View File

@ -65,6 +65,7 @@ using namespace oceanbase::obrpc;
#define EXPAND_INTERVAL (1 * 1000 * 1000)
#define SHRINK_INTERVAL (1 * 1000 * 1000)
#define SLEEP_INTERVAL (60 * 1000 * 1000)
int64_t FASTSTACK_REQ_QUEUE_SIZE_THRESHOLD = INT64_MAX;
@ -379,7 +380,6 @@ void ObResourceGroup::check_worker_count()
{
int ret = OB_SUCCESS;
if (OB_SUCC(workers_lock_.trylock())) {
int64_t token = 1;
int64_t now = ObTimeUtility::current_time();
bool enable_dynamic_worker = true;
int64_t threshold = 3 * 1000;
@ -388,6 +388,7 @@ void ObResourceGroup::check_worker_count()
enable_dynamic_worker = tenant_config.is_valid() ? tenant_config->_ob_enable_dynamic_worker : true;
threshold = tenant_config.is_valid() ? tenant_config->_stall_threshold_for_dynamic_worker : 3 * 1000;
}
int64_t blocking_cnt = 0;
DLIST_FOREACH_REMOVESAFE(wnode, workers_) {
const auto w = static_cast<ObThWorker*>(wnode->get_data());
if (w->has_set_stop()) {
@ -397,28 +398,47 @@ void ObResourceGroup::check_worker_count()
&& 0 != w->blocking_ts()
&& now - w->blocking_ts() >= threshold
&& enable_dynamic_worker) {
++token;
++blocking_cnt;
}
}
int64_t target_min = 0;
int64_t token = 0;
bool is_group_critical = share::ObCgSet::instance().is_group_critical(group_id_);
if (is_group_critical) {
target_min = min_worker_cnt();
token = 1 + blocking_cnt;
token = std::min(token, max_worker_cnt());
token = std::max(token, target_min);
} else {
target_min = std::min(req_queue_.size(), min_worker_cnt());
if (blocking_cnt == 0 && req_queue_.size() == 0) {
token = 0;
} else {
token = 1 + blocking_cnt;
token = std::min(token, max_worker_cnt());
}
}
int64_t succ_num = 0L;
token = std::max(token, min_worker_cnt());
token = std::min(token, max_worker_cnt());
if (OB_UNLIKELY(workers_.get_size() < min_worker_cnt())) {
const auto diff = min_worker_cnt() - workers_.get_size();
int64_t shrink_ts =
(!is_group_critical && workers_.get_size() == 1 && token == 0) ? SLEEP_INTERVAL : SHRINK_INTERVAL;
if (OB_UNLIKELY(workers_.get_size() < target_min)) {
const int64_t diff = target_min - workers_.get_size();
token_change_ts_ = now;
ATOMIC_STORE(&shrink_, false);
acquire_more_worker(diff, succ_num, /* force */ true);
LOG_INFO("worker thread created", K(tenant_->id()), K(group_id_), K(token));
} else if (OB_UNLIKELY(token > workers_.get_size())
&& OB_LIKELY(ObMallocAllocator::get_instance()->get_tenant_remain(tenant_->id()) > ObMallocAllocator::get_instance()->get_tenant_limit(tenant_->id()) * 0.05)) {
} else if (OB_UNLIKELY(workers_.get_size() < token) &&
OB_LIKELY(ObMallocAllocator::get_instance()->get_tenant_remain(tenant_->id()) >
ObMallocAllocator::get_instance()->get_tenant_limit(tenant_->id()) * 0.05)) {
ATOMIC_STORE(&shrink_, false);
if (OB_LIKELY(now - token_change_ts_ >= EXPAND_INTERVAL)) {
token_change_ts_ = now;
acquire_more_worker(1, succ_num);
LOG_INFO("worker thread created", K(tenant_->id()), K(group_id_), K(token));
}
} else if (OB_UNLIKELY(token < workers_.get_size())
&& OB_LIKELY(now - token_change_ts_ >= SHRINK_INTERVAL)) {
} else if (OB_UNLIKELY(workers_.get_size() > token) && OB_LIKELY(now - token_change_ts_ >= shrink_ts)) {
token_change_ts_ = now;
ATOMIC_STORE(&shrink_, true);
LOG_INFO("worker thread began to shrink", K(tenant_->id()), K(group_id_), K(token));
@ -1214,7 +1234,8 @@ inline bool is_warmup(const ObRpcPacket &pkt)
int ObTenant::recv_group_request(ObRequest &req, int64_t group_id)
{
int ret = OB_SUCCESS;
req.set_enqueue_timestamp(ObTimeUtility::current_time());
int64_t now = ObTimeUtility::current_time();
req.set_enqueue_timestamp(now);
ObResourceGroup* group = nullptr;
ObResourceGroupNode* node = nullptr;
ObResourceGroupNode key(group_id);
@ -1234,6 +1255,21 @@ int ObTenant::recv_group_request(ObRequest &req, int64_t group_id)
if (OB_FAIL(group->req_queue_.push(&req, 0))) {
LOG_ERROR("push request to queue fail", K(ret), K(this));
}
int tmp_ret = OB_SUCCESS;
if (!share::ObCgSet::instance().is_group_critical(group_id) && 0 == group->workers_.get_size()) {
if (OB_SUCCESS == (tmp_ret = group->workers_lock_.trylock())) {
if (0 == group->workers_.get_size()) {
int64_t succ_num = 0L;
group->token_change_ts_ = now;
ATOMIC_STORE(&group->shrink_, false);
group->acquire_more_worker(1, succ_num, /* force */ true);
LOG_INFO("worker thread created", K(id()), K(group->group_id_));
}
IGNORE_RETURN group->workers_lock_.unlock();
} else {
LOG_WARN("failed to lock group workers", K(ret), K(id_), K(group_id));
}
}
}
return ret;
}

View File

@ -25,19 +25,43 @@ namespace share
{
class ObGroupName;
typedef enum : uint64_t {
DEFAULT = 0,
CRITICAL = 1 << 0,
INVALID = UINT64_MAX
} group_flags_t;
enum ObCgId
{
#define CGID_DEF(name, id, worker_concurrency...) name = id,
#define CGID_DEF(name, id, ...) name = id,
#include "ob_group_list.h"
#undef CGID_DEF
OBCG_MAXNUM,
};
class ObCgWorkerConcurrency
class ObCgInfo
{
public:
ObCgWorkerConcurrency() : worker_concurrency_(1) {}
ObCgInfo() : name_(nullptr), is_critical_(false), worker_concurrency_(1) {}
void set_name(const char *name) { name_ = name; }
void set_args(group_flags_t flags = DEFAULT, uint64_t worker_concurrency = 1)
{
set_flags(flags);
set_worker_concurrency(worker_concurrency);
}
void set_flags(group_flags_t flags = DEFAULT)
{
if (DEFAULT == flags || INVALID == flags) {
// do nothing
} else {
if (CRITICAL & flags) {
is_critical_ = true;
}
}
}
void set_worker_concurrency(uint64_t worker_concurrency = 1) { worker_concurrency_ = worker_concurrency; }
const char *name_;
bool is_critical_;
uint64_t worker_concurrency_;
};
@ -45,7 +69,7 @@ class ObCgSet
{
ObCgSet()
{
#define CGID_DEF(name, id, worker_concurrency...) names_[id] = #name; worker_concurrency_[id].set_worker_concurrency(worker_concurrency);
#define CGID_DEF(name, id, args...) group_infos_[id].set_name(#name); group_infos_[id].set_args(args);
#include "ob_group_list.h"
#undef CGID_DEF
}
@ -56,7 +80,7 @@ public:
{
const char *name = "OBCG_DEFAULT";
if (id >= 0 && id < OBCG_MAXNUM) {
name = names_[id];
name = group_infos_[id].name_;
}
return name;
}
@ -64,12 +88,21 @@ public:
uint64_t get_worker_concurrency(int64_t id) const
{
uint64_t worker_concurrency = 1;
if (id > 0 && id < OBCG_MAXNUM) {
worker_concurrency = worker_concurrency_[id].worker_concurrency_;
if (id >= 0 && id < OBCG_MAXNUM) {
worker_concurrency = group_infos_[id].worker_concurrency_;
}
return worker_concurrency;
}
bool is_group_critical(int64_t id) const
{
bool is_group_critical = false;
if (id >= 0 && id < OBCG_MAXNUM) {
is_group_critical = group_infos_[id].is_critical_;
}
return is_group_critical;
}
static ObCgSet &instance()
{
return instance_;
@ -77,9 +110,7 @@ public:
private:
static ObCgSet instance_;
const char *names_[OBCG_MAXNUM];
ObCgWorkerConcurrency worker_concurrency_[OBCG_MAXNUM];
ObCgInfo group_infos_[OBCG_MAXNUM];
};
struct OBGroupIOInfo final

View File

@ -12,16 +12,21 @@
// [0, 100) for inner group
//CGID_DEF(group_name, group_id, worker_concurrency)
// CGID_DEF(group_name, group_id[, flags=DEFAULT][, worker_concurrency=1])
// example: CGID_DEF(OBCG_EXAMPLE1, 1, CRITICAL)
// CGID_DEF(OBCG_EXAMPLE2, 2, DEFAULT, 4)
// flags option:
// DEFAULT. No flags.
// CRITICAL. If a group is not critical, the thread num of it can be set to 0 when idle.
CGID_DEF(OBCG_DEFAULT, 0)
CGID_DEF(OBCG_CLOG, 1)
CGID_DEF(OBCG_ELECTION, 2)
CGID_DEF(OBCG_ID_SERVICE, 5)
CGID_DEF(OBCG_ID_SQL_REQ_LEVEL1, 6, 4)
CGID_DEF(OBCG_ID_SQL_REQ_LEVEL2, 7, 4)
CGID_DEF(OBCG_ID_SQL_REQ_LEVEL3, 8, 4)
CGID_DEF(OBCG_DETECT_RS, 9)
CGID_DEF(OBCG_LOC_CACHE, 10)
CGID_DEF(OBCG_ELECTION, 2, CRITICAL)
CGID_DEF(OBCG_ID_SERVICE, 5, CRITICAL)
CGID_DEF(OBCG_ID_SQL_REQ_LEVEL1, 6, DEFAULT, 4)
CGID_DEF(OBCG_ID_SQL_REQ_LEVEL2, 7, DEFAULT, 4)
CGID_DEF(OBCG_ID_SQL_REQ_LEVEL3, 8, DEFAULT, 4)
CGID_DEF(OBCG_DETECT_RS, 9, CRITICAL)
CGID_DEF(OBCG_LOC_CACHE, 10, CRITICAL)
CGID_DEF(OBCG_SQL_NIO, 11)
CGID_DEF(OBCG_MYSQL_LOGIN, 12)
CGID_DEF(OBCG_CDCSERVICE, 13)
@ -29,9 +34,12 @@ CGID_DEF(OBCG_DIAG_TENANT, 14)
CGID_DEF(OBCG_WR, 15)
CGID_DEF(OBCG_STORAGE_HA_LEVEL1, 16)
CGID_DEF(OBCG_STORAGE_HA_LEVEL2, 17)
CGID_DEF(OBCG_DBA_COMMAND, 18, 1)
CGID_DEF(OBCG_DBA_COMMAND, 18)
CGID_DEF(OBCG_STORAGE, 19)
CGID_DEF(OBCG_LOCK, 20)
CGID_DEF(OBCG_UNLOCK, 21)
CGID_DEF(OBCG_DIRECT_LOAD_HIGH_PRIO, 22)
// 100 for CG_LQ
CGID_DEF(OBCG_LQ, 100)