Add enable_cgroup_cpu_soft_limit (#26510)
This commit is contained in:
@ -54,6 +54,19 @@ void CgroupCpuCtl::update_cpu_hard_limit(int cpu_hard_limit) {
|
||||
}
|
||||
}
|
||||
|
||||
void CgroupCpuCtl::update_cpu_soft_limit(int cpu_shares) {
|
||||
if (!_init_succ) {
|
||||
return;
|
||||
}
|
||||
std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
|
||||
if (_cpu_shares != cpu_shares) {
|
||||
Status ret = modify_cg_cpu_soft_limit_no_lock(cpu_shares);
|
||||
if (ret.ok()) {
|
||||
_cpu_shares = cpu_shares;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Status CgroupCpuCtl::write_cg_sys_file(std::string file_path, int value, std::string msg,
|
||||
bool is_append) {
|
||||
int fd = open(file_path.c_str(), is_append ? O_RDWR | O_APPEND : O_RDWR);
|
||||
@ -97,9 +110,11 @@ Status CgroupV1CpuCtl::init() {
|
||||
}
|
||||
}
|
||||
|
||||
// quota path
|
||||
// quota file
|
||||
_cgroup_v1_cpu_tg_quota_file = _cgroup_v1_cpu_tg_path + "/cpu.cfs_quota_us";
|
||||
// task path
|
||||
// cpu.shares file
|
||||
_cgroup_v1_cpu_tg_shares_file = _cgroup_v1_cpu_tg_path + "/cpu.shares";
|
||||
// task file
|
||||
_cgroup_v1_cpu_tg_task_file = _cgroup_v1_cpu_tg_path + "/tasks";
|
||||
LOG(INFO) << "cgroup v1 cpu path init success"
|
||||
<< ", query tg path=" << _cgroup_v1_cpu_tg_path
|
||||
@ -110,6 +125,11 @@ Status CgroupV1CpuCtl::init() {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status CgroupV1CpuCtl::modify_cg_cpu_soft_limit_no_lock(int cpu_shares) {
|
||||
std::string msg = "modify cpu shares to " + std::to_string(cpu_shares);
|
||||
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_shares_file, cpu_shares, msg, false);
|
||||
}
|
||||
|
||||
Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) {
|
||||
int val = _cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100;
|
||||
std::string msg = "modify cpu quota value to " + std::to_string(val);
|
||||
|
||||
@ -28,6 +28,12 @@
|
||||
|
||||
namespace doris {
|
||||
|
||||
// cgroup cpu.cfs_quota_us default value, it means disable cpu hard limit
|
||||
const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
|
||||
|
||||
// cgroup cpu.shares default value
|
||||
const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024;
|
||||
|
||||
class CgroupCpuCtl {
|
||||
public:
|
||||
virtual ~CgroupCpuCtl() = default;
|
||||
@ -35,15 +41,19 @@ public:
|
||||
|
||||
virtual Status init();
|
||||
|
||||
virtual Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) = 0;
|
||||
|
||||
virtual Status add_thread_to_cgroup() = 0;
|
||||
|
||||
void update_cpu_hard_limit(int cpu_hard_limit);
|
||||
|
||||
void update_cpu_soft_limit(int cpu_shares);
|
||||
|
||||
protected:
|
||||
Status write_cg_sys_file(std::string file_path, int value, std::string msg, bool is_append);
|
||||
|
||||
virtual Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) = 0;
|
||||
|
||||
virtual Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) = 0;
|
||||
|
||||
std::string _doris_cgroup_cpu_path;
|
||||
uint64_t _cpu_core_num = CpuInfo::num_cores();
|
||||
uint64_t _cpu_cfs_period_us = 100000;
|
||||
@ -51,6 +61,7 @@ protected:
|
||||
std::shared_mutex _lock_mutex;
|
||||
bool _init_succ = false;
|
||||
uint64_t _tg_id; // workload group id
|
||||
uint64_t _cpu_shares = 0;
|
||||
};
|
||||
|
||||
/*
|
||||
@ -73,20 +84,25 @@ protected:
|
||||
6 workload group quota file:
|
||||
/sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}/cpu.cfs_quota_us
|
||||
|
||||
7 workload group tasks file:
|
||||
7 workload group tasks file:
|
||||
/sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}/tasks
|
||||
|
||||
8 workload group cpu.shares file:
|
||||
/sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}/cpu.shares
|
||||
*/
|
||||
class CgroupV1CpuCtl : public CgroupCpuCtl {
|
||||
public:
|
||||
CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
|
||||
Status init() override;
|
||||
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
|
||||
Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override;
|
||||
Status add_thread_to_cgroup() override;
|
||||
|
||||
private:
|
||||
std::string _cgroup_v1_cpu_query_path;
|
||||
std::string _cgroup_v1_cpu_tg_path; // workload group path
|
||||
std::string _cgroup_v1_cpu_tg_quota_file;
|
||||
std::string _cgroup_v1_cpu_tg_shares_file;
|
||||
std::string _cgroup_v1_cpu_tg_task_file;
|
||||
};
|
||||
|
||||
|
||||
@ -1113,6 +1113,7 @@ DEFINE_Bool(enable_flush_file_cache_async, "true");
|
||||
|
||||
// cgroup
|
||||
DEFINE_String(doris_cgroup_cpu_path, "");
|
||||
DEFINE_Bool(enable_cgroup_cpu_soft_limit, "false");
|
||||
|
||||
DEFINE_Bool(ignore_always_true_predicate_for_segment, "true");
|
||||
|
||||
|
||||
@ -1183,6 +1183,8 @@ DECLARE_mBool(exit_on_exception);
|
||||
|
||||
// cgroup
|
||||
DECLARE_String(doris_cgroup_cpu_path);
|
||||
DECLARE_Bool(enable_cgroup_cpu_soft_limit);
|
||||
|
||||
// This config controls whether the s3 file writer would flush cache asynchronously
|
||||
DECLARE_Bool(enable_flush_file_cache_async);
|
||||
|
||||
|
||||
@ -671,18 +671,31 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
|
||||
LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id())
|
||||
<< " use task group: " << tg->debug_string()
|
||||
<< " cpu_hard_limit: " << task_group_info.cpu_hard_limit
|
||||
<< " cpu_share:" << task_group_info.cpu_share;
|
||||
<< " cpu_share:" << task_group_info.cpu_share
|
||||
<< " enable cgroup soft cpu:" << config::enable_cgroup_cpu_soft_limit;
|
||||
if (task_group_info.cpu_hard_limit > 0) {
|
||||
Status ret = _exec_env->task_group_manager()->create_and_get_task_scheduler(
|
||||
tg_id, tg_name, task_group_info.cpu_hard_limit, _exec_env,
|
||||
query_ctx.get());
|
||||
tg_id, tg_name, task_group_info.cpu_hard_limit,
|
||||
task_group_info.cpu_share, _exec_env, query_ctx.get());
|
||||
if (!ret.ok()) {
|
||||
LOG(INFO) << "workload group init failed "
|
||||
<< ", name=" << tg_name << ", id=" << tg_id
|
||||
<< ", reason=" << ret.to_string();
|
||||
}
|
||||
} else {
|
||||
query_ctx->set_task_group(tg);
|
||||
if (!config::enable_cgroup_cpu_soft_limit) {
|
||||
query_ctx->set_task_group(tg);
|
||||
} else {
|
||||
Status ret =
|
||||
_exec_env->task_group_manager()->create_and_get_task_scheduler(
|
||||
tg_id, tg_name, task_group_info.cpu_hard_limit,
|
||||
task_group_info.cpu_share, _exec_env, query_ctx.get());
|
||||
if (!ret.ok()) {
|
||||
LOG(INFO) << "workload group cpu soft limit init failed "
|
||||
<< ", name=" << tg_name << ", id=" << tg_id
|
||||
<< ", reason=" << ret.to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -60,7 +60,8 @@ void TaskGroupManager::get_resource_groups(const std::function<bool(const TaskGr
|
||||
}
|
||||
|
||||
Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::string tg_name,
|
||||
int cpu_hard_limit, ExecEnv* exec_env,
|
||||
int cpu_hard_limit, int cpu_shares,
|
||||
ExecEnv* exec_env,
|
||||
QueryContext* query_ctx_ptr) {
|
||||
std::lock_guard<std::mutex> lock(_task_scheduler_lock);
|
||||
// step 1: init cgroup cpu controller
|
||||
@ -117,7 +118,14 @@ Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri
|
||||
query_ctx_ptr->set_scan_task_scheduler(scan_task_sche);
|
||||
|
||||
// step 5 update cgroup cpu if needed
|
||||
_cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
|
||||
if (cpu_hard_limit > 0) {
|
||||
_cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
|
||||
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
|
||||
} else {
|
||||
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(cpu_shares);
|
||||
_cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(
|
||||
CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -52,7 +52,8 @@ public:
|
||||
std::vector<TaskGroupPtr>* task_groups);
|
||||
|
||||
Status create_and_get_task_scheduler(uint64_t wg_id, std::string wg_name, int cpu_hard_limit,
|
||||
ExecEnv* exec_env, QueryContext* query_ctx_ptr);
|
||||
int cpu_shares, ExecEnv* exec_env,
|
||||
QueryContext* query_ctx_ptr);
|
||||
|
||||
void delete_task_group_by_ids(std::set<uint64_t> id_set);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user