[pick]support cgroup v2 (#42465)

## Proposed changes

pick #39991   #39374  #36663
This commit is contained in:
wangbo
2024-10-25 20:13:27 +08:00
committed by GitHub
parent 4a62d9e44b
commit 9eef393e2a
11 changed files with 405 additions and 166 deletions

View File

@ -19,33 +19,154 @@
#include <fmt/format.h>
#include <sys/stat.h>
#include <unistd.h>
#include <filesystem>
#include "util/cgroup_util.h"
#include "util/defer_op.h"
namespace doris {
Status CgroupCpuCtl::init() {
_doris_cgroup_cpu_path = config::doris_cgroup_cpu_path;
if (_doris_cgroup_cpu_path.empty()) {
LOG(INFO) << "doris cgroup cpu path is not specify, path=" << _doris_cgroup_cpu_path;
return Status::InternalError<false>("doris cgroup cpu path {} is not specify.",
_doris_cgroup_cpu_path);
bool CgroupCpuCtl::is_a_valid_cgroup_path(std::string cg_path) {
if (!cg_path.empty()) {
if (cg_path.back() != '/') {
cg_path = cg_path + "/";
}
if (_is_enable_cgroup_v2_in_env) {
std::string query_path_cg_type = cg_path + "cgroup.type";
std::string query_path_ctl = cg_path + "cgroup.subtree_control";
std::string query_path_procs = cg_path + "cgroup.procs";
if (access(query_path_cg_type.c_str(), F_OK) != 0 ||
access(query_path_ctl.c_str(), F_OK) != 0 ||
access(query_path_procs.c_str(), F_OK) != 0) {
LOG(WARNING) << "[cgroup_init_path]invalid cgroup v2 path, access neccessary file "
"failed";
} else {
return true;
}
} else if (_is_enable_cgroup_v1_in_env) {
std::string query_path_tasks = cg_path + "tasks";
std::string query_path_cpu_shares = cg_path + "cpu.shares";
std::string query_path_quota = cg_path + "cpu.cfs_quota_us";
if (access(query_path_tasks.c_str(), F_OK) != 0 ||
access(query_path_cpu_shares.c_str(), F_OK) != 0 ||
access(query_path_quota.c_str(), F_OK) != 0) {
LOG(WARNING) << "[cgroup_init_path]invalid cgroup v1 path, access neccessary file "
"failed";
} else {
return true;
}
}
}
return false;
}
void CgroupCpuCtl::init_doris_cgroup_path() {
std::string conf_path = config::doris_cgroup_cpu_path;
if (conf_path.empty()) {
LOG(INFO) << "[cgroup_init_path]doris cgroup home path is not specify, if you not use "
"workload group, you can ignore this log.";
return;
}
if (access(_doris_cgroup_cpu_path.c_str(), F_OK) != 0) {
LOG(INFO) << "doris cgroup cpu path not exists, path=" << _doris_cgroup_cpu_path;
return Status::InternalError<false>("doris cgroup cpu path {} not exists.",
_doris_cgroup_cpu_path);
if (access(conf_path.c_str(), F_OK) != 0) {
LOG(INFO) << "[cgroup_init_path]doris cgroup home path not exists, path=" << conf_path;
return;
}
if (_doris_cgroup_cpu_path.back() != '/') {
_doris_cgroup_cpu_path = _doris_cgroup_cpu_path + "/";
if (conf_path.back() != '/') {
conf_path = conf_path + "/";
}
// check whether current user specified path is a valid cgroup path
std::string cg_msg = "not set cgroup in env";
if (CGroupUtil::cgroupsv2_enable()) {
_is_enable_cgroup_v2_in_env = true;
cg_msg = "cgroup v2 is enabled in env";
} else if (CGroupUtil::cgroupsv1_enable()) {
_is_enable_cgroup_v1_in_env = true;
cg_msg = "cgroup v1 is enabled in env";
}
bool is_cgroup_path_valid = CgroupCpuCtl::is_a_valid_cgroup_path(conf_path);
std::string tmp_query_path = conf_path + "query";
if (is_cgroup_path_valid) {
if (access(tmp_query_path.c_str(), F_OK) != 0) {
int ret = mkdir(tmp_query_path.c_str(), S_IRWXU);
if (ret != 0) {
LOG(ERROR) << "[cgroup_init_path]cgroup mkdir query failed, path="
<< tmp_query_path;
}
}
_is_cgroup_query_path_valid = CgroupCpuCtl::is_a_valid_cgroup_path(tmp_query_path);
}
_doris_cgroup_cpu_path = conf_path;
_doris_cgroup_cpu_query_path = tmp_query_path;
std::string query_path_msg = _is_cgroup_query_path_valid ? "cgroup query path is valid"
: "cgroup query path is not valid";
_cpu_core_num = CpuInfo::num_cores();
std::string init_cg_v2_msg = "";
if (_is_enable_cgroup_v2_in_env && _is_cgroup_query_path_valid) {
Status ret = init_cgroup_v2_query_path_public_file(_doris_cgroup_cpu_path,
_doris_cgroup_cpu_query_path);
if (!ret.ok()) {
init_cg_v2_msg = " write cgroup v2 file failed, err=" + ret.to_string_no_stack() + ". ";
} else {
init_cg_v2_msg = "write cgroup v2 public file succ.";
}
}
LOG(INFO) << "[cgroup_init_path]init cgroup home path finish, home path="
<< _doris_cgroup_cpu_path << ", query path=" << _doris_cgroup_cpu_query_path << ", "
<< cg_msg << ", " << query_path_msg << ", core_num=" << _cpu_core_num << ". "
<< init_cg_v2_msg;
}
Status CgroupCpuCtl::init_cgroup_v2_query_path_public_file(std::string home_path,
std::string query_path) {
// 1 enable cpu controller for home path's child
_doris_cgroup_cpu_path_subtree_ctl_file = home_path + "cgroup.subtree_control";
if (access(_doris_cgroup_cpu_path_subtree_ctl_file.c_str(), F_OK) != 0) {
return Status::InternalError<false>("not find cgroup v2 doris home's subtree control file");
}
RETURN_IF_ERROR(CgroupCpuCtl::write_cg_sys_file(_doris_cgroup_cpu_path_subtree_ctl_file, "+cpu",
"set cpu controller", false));
// 2 enable cpu controller for query path's child
_cgroup_v2_query_path_subtree_ctl_file = query_path + "/cgroup.subtree_control";
if (access(_cgroup_v2_query_path_subtree_ctl_file.c_str(), F_OK) != 0) {
return Status::InternalError<false>("not find cgroup v2 query path's subtree control file");
}
RETURN_IF_ERROR(CgroupCpuCtl::write_cg_sys_file(_cgroup_v2_query_path_subtree_ctl_file, "+cpu",
"set cpu controller", false));
// 3 write cgroup.procs
_doris_cg_v2_procs_file = query_path + "/cgroup.procs";
if (access(_doris_cg_v2_procs_file.c_str(), F_OK) != 0) {
return Status::InternalError<false>("not find cgroup v2 cgroup.procs file");
}
RETURN_IF_ERROR(CgroupCpuCtl::write_cg_sys_file(_doris_cg_v2_procs_file,
std::to_string(getpid()),
"set pid to cg v2 procs file", false));
return Status::OK();
}
uint64_t CgroupCpuCtl::cpu_soft_limit_default_value() {
return _is_enable_cgroup_v2_in_env ? 100 : 1024;
}
std::unique_ptr<CgroupCpuCtl> CgroupCpuCtl::create_cgroup_cpu_ctl(uint64_t wg_id) {
if (_is_enable_cgroup_v2_in_env) {
return std::make_unique<CgroupV2CpuCtl>(wg_id);
} else if (_is_enable_cgroup_v1_in_env) {
return std::make_unique<CgroupV1CpuCtl>(wg_id);
}
return nullptr;
}
void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit) {
std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
*cpu_shares = this->_cpu_shares;
@ -78,7 +199,7 @@ void CgroupCpuCtl::update_cpu_soft_limit(int cpu_shares) {
}
}
Status CgroupCpuCtl::write_cg_sys_file(std::string file_path, int value, std::string msg,
Status CgroupCpuCtl::write_cg_sys_file(std::string file_path, std::string value, std::string msg,
bool is_append) {
int fd = open(file_path.c_str(), is_append ? O_RDWR | O_APPEND : O_RDWR);
if (fd == -1) {
@ -102,82 +223,7 @@ Status CgroupCpuCtl::write_cg_sys_file(std::string file_path, int value, std::st
return Status::OK();
}
Status CgroupV1CpuCtl::init() {
RETURN_IF_ERROR(CgroupCpuCtl::init());
// query path
_cgroup_v1_cpu_query_path = _doris_cgroup_cpu_path + "query";
if (access(_cgroup_v1_cpu_query_path.c_str(), F_OK) != 0) {
int ret = mkdir(_cgroup_v1_cpu_query_path.c_str(), S_IRWXU);
if (ret != 0) {
LOG(ERROR) << "cgroup v1 mkdir query failed, path=" << _cgroup_v1_cpu_query_path;
return Status::InternalError<false>("cgroup v1 mkdir query failed, path={}",
_cgroup_v1_cpu_query_path);
}
}
// check whether current user specified path is a valid cgroup path
std::string query_path_tasks = _cgroup_v1_cpu_query_path + "/tasks";
std::string query_path_cpu_shares = _cgroup_v1_cpu_query_path + "/cpu.shares";
std::string query_path_quota = _cgroup_v1_cpu_query_path + "/cpu.cfs_quota_us";
if (access(query_path_tasks.c_str(), F_OK) != 0) {
return Status::InternalError<false>("invalid cgroup path, not find task file");
}
if (access(query_path_cpu_shares.c_str(), F_OK) != 0) {
return Status::InternalError<false>("invalid cgroup path, not find cpu share file");
}
if (access(query_path_quota.c_str(), F_OK) != 0) {
return Status::InternalError<false>("invalid cgroup path, not find cpu quota file");
}
if (_wg_id == -1) {
// means current cgroup cpu ctl is just used to clear dir,
// it does not contains workload group.
// todo(wb) rethinking whether need to refactor cgroup_cpu_ctl
_init_succ = true;
LOG(INFO) << "init cgroup cpu query path succ, path=" << _cgroup_v1_cpu_query_path;
return Status::OK();
}
// workload group path
_cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + std::to_string(_wg_id);
if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) {
int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU);
if (ret != 0) {
LOG(ERROR) << "cgroup v1 mkdir workload group failed, path=" << _cgroup_v1_cpu_tg_path;
return Status::InternalError<false>("cgroup v1 mkdir workload group failed, path=",
_cgroup_v1_cpu_tg_path);
}
}
// quota file
_cgroup_v1_cpu_tg_quota_file = _cgroup_v1_cpu_tg_path + "/cpu.cfs_quota_us";
// cpu.shares file
_cgroup_v1_cpu_tg_shares_file = _cgroup_v1_cpu_tg_path + "/cpu.shares";
// task file
_cgroup_v1_cpu_tg_task_file = _cgroup_v1_cpu_tg_path + "/tasks";
LOG(INFO) << "cgroup v1 cpu path init success"
<< ", query tg path=" << _cgroup_v1_cpu_tg_path
<< ", query tg quota file path=" << _cgroup_v1_cpu_tg_quota_file
<< ", query tg tasks file path=" << _cgroup_v1_cpu_tg_task_file
<< ", core num=" << _cpu_core_num;
_init_succ = true;
return Status::OK();
}
Status CgroupV1CpuCtl::modify_cg_cpu_soft_limit_no_lock(int cpu_shares) {
std::string msg = "modify cpu shares to " + std::to_string(cpu_shares);
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_shares_file, cpu_shares, msg, false);
}
Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) {
int val = cpu_hard_limit > 0 ? (_cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100)
: CGROUP_CPU_HARD_LIMIT_DEFAULT_VALUE;
std::string msg = "modify cpu quota value to " + std::to_string(val);
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_quota_file, val, msg, false);
}
Status CgroupV1CpuCtl::add_thread_to_cgroup() {
Status CgroupCpuCtl::add_thread_to_cgroup(std::string task_path) {
if (!_init_succ) {
return Status::OK();
}
@ -189,18 +235,17 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() {
std::string msg =
"add thread " + std::to_string(tid) + " to group" + " " + std::to_string(_wg_id);
std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid, msg, true);
return CgroupCpuCtl::write_cg_sys_file(task_path, std::to_string(tid), msg, true);
#endif
}
Status CgroupV1CpuCtl::delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) {
if (!_init_succ) {
return Status::InternalError<false>(
"cgroup cpu ctl init failed, delete can not be executed");
Status CgroupCpuCtl::delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) {
if (!_is_cgroup_query_path_valid) {
return Status::InternalError<false>("not find a valid cgroup query path");
}
// 1 get unused wg id
std::set<std::string> unused_wg_ids;
for (const auto& entry : std::filesystem::directory_iterator(_cgroup_v1_cpu_query_path)) {
for (const auto& entry : std::filesystem::directory_iterator(_doris_cgroup_cpu_query_path)) {
const std::string dir_name = entry.path().string();
struct stat st;
// == 0 means exists
@ -222,9 +267,9 @@ Status CgroupV1CpuCtl::delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids
// 2 delete unused cgroup path
int failed_count = 0;
std::string query_path = _cgroup_v1_cpu_query_path.back() != '/'
? _cgroup_v1_cpu_query_path + "/"
: _cgroup_v1_cpu_query_path;
std::string query_path = _doris_cgroup_cpu_query_path.back() != '/'
? _doris_cgroup_cpu_query_path + "/"
: _doris_cgroup_cpu_query_path;
for (const std::string& unused_wg_id : unused_wg_ids) {
std::string wg_path = query_path + unused_wg_id;
int ret = rmdir(wg_path.c_str());
@ -240,4 +285,138 @@ Status CgroupV1CpuCtl::delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids
return Status::OK();
}
Status CgroupV1CpuCtl::init() {
if (!_is_cgroup_query_path_valid) {
return Status::InternalError<false>("cgroup query path is not valid");
}
if (_wg_id <= 0) {
return Status::InternalError<false>("find an invalid wg_id {}", _wg_id);
}
// workload group path
_cgroup_v1_cpu_tg_path = _doris_cgroup_cpu_query_path + "/" + std::to_string(_wg_id);
if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) {
int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU);
if (ret != 0) {
LOG(ERROR) << "cgroup v1 mkdir workload group failed, path=" << _cgroup_v1_cpu_tg_path;
return Status::InternalError<false>("cgroup v1 mkdir workload group failed, path={}",
_cgroup_v1_cpu_tg_path);
}
}
_cgroup_v1_cpu_tg_quota_file = _cgroup_v1_cpu_tg_path + "/cpu.cfs_quota_us";
if (access(_cgroup_v1_cpu_tg_quota_file.c_str(), F_OK) != 0) {
return Status::InternalError<false>("not find cgroup v1 cpu.cfs_quota_us file");
}
_cgroup_v1_cpu_tg_shares_file = _cgroup_v1_cpu_tg_path + "/cpu.shares";
if (access(_cgroup_v1_cpu_tg_shares_file.c_str(), F_OK) != 0) {
return Status::InternalError<false>("not find cgroup v1 cpu.shares file");
}
_cgroup_v1_cpu_tg_task_file = _cgroup_v1_cpu_tg_path + "/tasks";
if (access(_cgroup_v1_cpu_tg_task_file.c_str(), F_OK) != 0) {
return Status::InternalError<false>("not find cgroup v1 cpu.shares file");
}
LOG(INFO) << "cgroup v1 cpu path init success"
<< ", query tg path=" << _cgroup_v1_cpu_tg_path
<< ", query wg quota file path=" << _cgroup_v1_cpu_tg_quota_file
<< ", query wg share file path=" << _cgroup_v1_cpu_tg_shares_file
<< ", query wg tasks file path=" << _cgroup_v1_cpu_tg_task_file
<< ", core num=" << _cpu_core_num;
_init_succ = true;
return Status::OK();
}
Status CgroupV1CpuCtl::modify_cg_cpu_soft_limit_no_lock(int cpu_shares) {
std::string cpu_share_str = std::to_string(cpu_shares);
std::string msg = "modify cpu shares to " + cpu_share_str;
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_shares_file, cpu_share_str, msg,
false);
}
Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) {
int val = cpu_hard_limit > 0 ? (_cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100)
: CGROUP_CPU_HARD_LIMIT_DEFAULT_VALUE;
std::string str_val = std::to_string(val);
std::string msg = "modify cpu quota value to " + str_val;
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_quota_file, str_val, msg, false);
}
Status CgroupV1CpuCtl::add_thread_to_cgroup() {
return CgroupCpuCtl::add_thread_to_cgroup(_cgroup_v1_cpu_tg_task_file);
}
Status CgroupV2CpuCtl::init() {
if (!_is_cgroup_query_path_valid) {
return Status::InternalError<false>(" cgroup query path is empty");
}
if (_wg_id <= 0) {
return Status::InternalError<false>("find an invalid wg_id {}", _wg_id);
}
// wg path
_cgroup_v2_query_wg_path = _doris_cgroup_cpu_query_path + "/" + std::to_string(_wg_id);
if (access(_cgroup_v2_query_wg_path.c_str(), F_OK) != 0) {
int ret = mkdir(_cgroup_v2_query_wg_path.c_str(), S_IRWXU);
if (ret != 0) {
return Status::InternalError<false>("cgroup v2 mkdir wg failed, path={}",
_cgroup_v2_query_wg_path);
}
}
_cgroup_v2_query_wg_cpu_max_file = _cgroup_v2_query_wg_path + "/cpu.max";
if (access(_cgroup_v2_query_wg_cpu_max_file.c_str(), F_OK) != 0) {
return Status::InternalError<false>("not find cgroup v2 wg cpu.max file");
}
_cgroup_v2_query_wg_cpu_weight_file = _cgroup_v2_query_wg_path + "/cpu.weight";
if (access(_cgroup_v2_query_wg_cpu_weight_file.c_str(), F_OK) != 0) {
return Status::InternalError<false>("not find cgroup v2 wg cpu.weight file");
}
_cgroup_v2_query_wg_thread_file = _cgroup_v2_query_wg_path + "/cgroup.threads";
if (access(_cgroup_v2_query_wg_thread_file.c_str(), F_OK) != 0) {
return Status::InternalError<false>("not find cgroup v2 wg cgroup.threads file");
}
_cgroup_v2_query_wg_type_file = _cgroup_v2_query_wg_path + "/cgroup.type";
if (access(_cgroup_v2_query_wg_type_file.c_str(), F_OK) != 0) {
return Status::InternalError<false>("not find cgroup v2 wg cgroup.type file");
}
RETURN_IF_ERROR(CgroupCpuCtl::write_cg_sys_file(_cgroup_v2_query_wg_type_file, "threaded",
"set cgroup type", false));
LOG(INFO) << "cgroup v2 cpu path init success"
<< ", query wg path=" << _cgroup_v2_query_wg_path
<< ", cpu.max file = " << _cgroup_v2_query_wg_cpu_max_file
<< ", cgroup.threads file = " << _cgroup_v2_query_wg_thread_file
<< ", core num=" << _cpu_core_num;
_init_succ = true;
return Status::OK();
}
Status CgroupV2CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) {
std::string value = "";
if (cpu_hard_limit > 0) {
uint64_t int_val = _cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100;
value = std::to_string(int_val) + " 100000";
} else {
value = CGROUP_V2_CPU_HARD_LIMIT_DEFAULT_VALUE;
}
std::string msg = "modify cpu.max to [" + value + "]";
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v2_query_wg_cpu_max_file, value, msg, false);
}
Status CgroupV2CpuCtl::modify_cg_cpu_soft_limit_no_lock(int cpu_weight) {
std::string cpu_weight_str = std::to_string(cpu_weight);
std::string msg = "modify cpu.weight to " + cpu_weight_str;
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v2_query_wg_cpu_weight_file, cpu_weight_str, msg,
false);
}
Status CgroupV2CpuCtl::add_thread_to_cgroup() {
return CgroupCpuCtl::add_thread_to_cgroup(_cgroup_v2_query_wg_thread_file);
}
} // namespace doris

View File

@ -30,14 +30,14 @@ namespace doris {
// cgroup cpu.cfs_quota_us default value, it means disable cpu hard limit
const static int CGROUP_CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
const static std::string CGROUP_V2_CPU_HARD_LIMIT_DEFAULT_VALUE = "max 100000";
class CgroupCpuCtl {
public:
virtual ~CgroupCpuCtl() = default;
CgroupCpuCtl() = default;
CgroupCpuCtl(uint64_t wg_id) { _wg_id = wg_id; }
virtual Status init();
virtual Status init() = 0;
virtual Status add_thread_to_cgroup() = 0;
@ -48,18 +48,44 @@ public:
// for log
void get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit);
virtual Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) = 0;
static void init_doris_cgroup_path();
static Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids);
static std::unique_ptr<CgroupCpuCtl> create_cgroup_cpu_ctl(uint64_t wg_id);
static bool is_a_valid_cgroup_path(std::string cg_path);
static uint64_t cpu_soft_limit_default_value();
protected:
Status write_cg_sys_file(std::string file_path, int value, std::string msg, bool is_append);
virtual Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) = 0;
virtual Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) = 0;
std::string _doris_cgroup_cpu_path;
uint64_t _cpu_core_num = CpuInfo::num_cores();
uint64_t _cpu_cfs_period_us = 100000;
Status add_thread_to_cgroup(std::string task_file);
static Status write_cg_sys_file(std::string file_path, std::string value, std::string msg,
bool is_append);
static Status init_cgroup_v2_query_path_public_file(std::string home_path,
std::string query_path);
protected:
inline static uint64_t _cpu_core_num;
const static uint64_t _cpu_cfs_period_us = 100000;
inline static std::string _doris_cgroup_cpu_path = "";
inline static std::string _doris_cgroup_cpu_query_path = "";
inline static bool _is_enable_cgroup_v1_in_env = false;
inline static bool _is_enable_cgroup_v2_in_env = false;
inline static bool _is_cgroup_query_path_valid = false;
// cgroup v2 public file
inline static std::string _doris_cgroup_cpu_path_subtree_ctl_file = "";
inline static std::string _cgroup_v2_query_path_subtree_ctl_file = "";
inline static std::string _doris_cg_v2_procs_file = "";
protected:
int _cpu_hard_limit = 0;
std::shared_mutex _lock_mutex;
bool _init_succ = false;
@ -96,20 +122,65 @@ protected:
class CgroupV1CpuCtl : public CgroupCpuCtl {
public:
CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
CgroupV1CpuCtl() = default;
Status init() override;
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override;
Status add_thread_to_cgroup() override;
Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) override;
private:
std::string _cgroup_v1_cpu_query_path;
std::string _cgroup_v1_cpu_tg_path; // workload group path
std::string _cgroup_v1_cpu_tg_quota_file;
std::string _cgroup_v1_cpu_tg_shares_file;
std::string _cgroup_v1_cpu_tg_task_file;
};
/*
NOTE: cgroup v2 directory structure
1 root path:
/sys/fs/cgroup
2 doris home path:
/sys/fs/cgroup/{doris_home}/
3 doris home subtree_control file:
/sys/fs/cgroup/{doris_home}/cgroup.subtree_control
4 query path:
/sys/fs/cgroup/{doris_home}/query/
5 query path subtree_control file:
/sys/fs/cgroup/{doris_home}/query/cgroup.subtree_control
6 query path procs file:
/sys/fs/cgroup/{doris_home}/query/cgroup.procs
7 workload group path:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}
8 workload grou cpu.max file:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cpu.max
9 workload grou cpu.weight file:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cpu.weight
10 workload group cgroup type file:
/sys/fs/cgroup/{doris_home}/query/{workload_group_id}/cgroup.type
*/
class CgroupV2CpuCtl : public CgroupCpuCtl {
public:
CgroupV2CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
Status init() override;
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override;
Status add_thread_to_cgroup() override;
private:
std::string _cgroup_v2_query_wg_path;
std::string _cgroup_v2_query_wg_cpu_max_file;
std::string _cgroup_v2_query_wg_cpu_weight_file;
std::string _cgroup_v2_query_wg_thread_file;
std::string _cgroup_v2_query_wg_type_file;
};
} // namespace doris

View File

@ -80,7 +80,7 @@ class MemTracker;
class StorageEngine;
class ResultBufferMgr;
class ResultQueueMgr;
class RuntimeQueryStatiticsMgr;
class RuntimeQueryStatisticsMgr;
class TMasterInfo;
class LoadChannelMgr;
class LoadStreamMgr;
@ -162,7 +162,7 @@ public:
pipeline::TaskScheduler* pipeline_task_scheduler() { return _without_group_task_scheduler; }
WorkloadGroupMgr* workload_group_mgr() { return _workload_group_manager; }
WorkloadSchedPolicyMgr* workload_sched_policy_mgr() { return _workload_sched_mgr; }
RuntimeQueryStatiticsMgr* runtime_query_statistics_mgr() {
RuntimeQueryStatisticsMgr* runtime_query_statistics_mgr() {
return _runtime_query_statistics_mgr;
}
@ -458,7 +458,7 @@ private:
WorkloadSchedPolicyMgr* _workload_sched_mgr = nullptr;
RuntimeQueryStatiticsMgr* _runtime_query_statistics_mgr = nullptr;
RuntimeQueryStatisticsMgr* _runtime_query_statistics_mgr = nullptr;
std::unique_ptr<pipeline::PipelineTracerContext> _pipeline_tracer_ctx;
std::unique_ptr<segment_v2::TmpFileDirs> _tmp_file_dirs;

View File

@ -211,7 +211,8 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
// NOTE: runtime query statistics mgr could be visited by query and daemon thread
// so it should be created before all query begin and deleted after all query and daemon thread stoppped
_runtime_query_statistics_mgr = new RuntimeQueryStatiticsMgr();
_runtime_query_statistics_mgr = new RuntimeQueryStatisticsMgr();
CgroupCpuCtl::init_doris_cgroup_path();
std::vector<doris::CachePath> cache_paths;
init_file_cache_factory(cache_paths);

View File

@ -35,10 +35,10 @@ void QueryStatisticsCtx::collect_query_statistics(TQueryStatistics* tq_s) {
tq_s->__set_workload_group_id(_wg_id);
}
void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id,
std::shared_ptr<QueryStatistics> qs_ptr,
TNetworkAddress fe_addr,
TQueryType::type query_type) {
void RuntimeQueryStatisticsMgr::register_query_statistics(std::string query_id,
std::shared_ptr<QueryStatistics> qs_ptr,
TNetworkAddress fe_addr,
TQueryType::type query_type) {
std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
if (_query_statistics_ctx_map.find(query_id) == _query_statistics_ctx_map.end()) {
_query_statistics_ctx_map[query_id] =
@ -47,7 +47,7 @@ void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id,
_query_statistics_ctx_map.at(query_id)->_qs_list.push_back(qs_ptr);
}
void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() {
int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
// 1 get query statistics map
std::map<TNetworkAddress, std::map<std::string, TQueryStatistics>> fe_qs_map;
@ -166,7 +166,7 @@ void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
}
}
void RuntimeQueryStatiticsMgr::set_query_finished(std::string query_id) {
void RuntimeQueryStatisticsMgr::set_query_finished(std::string query_id) {
// NOTE: here must be a write lock
std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
// when a query get query_ctx succ, but failed before create node/operator,
@ -178,7 +178,7 @@ void RuntimeQueryStatiticsMgr::set_query_finished(std::string query_id) {
}
}
std::shared_ptr<QueryStatistics> RuntimeQueryStatiticsMgr::get_runtime_query_statistics(
std::shared_ptr<QueryStatistics> RuntimeQueryStatisticsMgr::get_runtime_query_statistics(
std::string query_id) {
std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
if (_query_statistics_ctx_map.find(query_id) == _query_statistics_ctx_map.end()) {
@ -191,7 +191,7 @@ std::shared_ptr<QueryStatistics> RuntimeQueryStatiticsMgr::get_runtime_query_sta
return qs_ptr;
}
void RuntimeQueryStatiticsMgr::get_metric_map(
void RuntimeQueryStatisticsMgr::get_metric_map(
std::string query_id, std::map<WorkloadMetricType, std::string>& metric_map) {
QueryStatistics ret_qs;
int64_t query_time_ms = 0;
@ -212,7 +212,7 @@ void RuntimeQueryStatiticsMgr::get_metric_map(
std::to_string(ret_qs.get_current_used_memory_bytes()));
}
void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64_t wg_id) {
void RuntimeQueryStatisticsMgr::set_workload_group_id(std::string query_id, int64_t wg_id) {
// wg id just need eventual consistency, read lock is ok
std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
if (_query_statistics_ctx_map.find(query_id) != _query_statistics_ctx_map.end()) {
@ -220,7 +220,7 @@ void RuntimeQueryStatiticsMgr::set_workload_group_id(std::string query_id, int64
}
}
void RuntimeQueryStatiticsMgr::get_active_be_tasks_block(vectorized::Block* block) {
void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* block) {
std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;

View File

@ -54,10 +54,10 @@ public:
int64_t _query_start_time;
};
class RuntimeQueryStatiticsMgr {
class RuntimeQueryStatisticsMgr {
public:
RuntimeQueryStatiticsMgr() = default;
~RuntimeQueryStatiticsMgr() = default;
RuntimeQueryStatisticsMgr() = default;
~RuntimeQueryStatisticsMgr() = default;
void register_query_statistics(std::string query_id, std::shared_ptr<QueryStatistics> qs_ptr,
TNetworkAddress fe_addr, TQueryType::type query_type);

View File

@ -44,11 +44,9 @@
namespace doris {
const static uint64_t CPU_SHARE_DEFAULT_VALUE = 1024;
const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%";
const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024;
const static int SPILL_LOW_WATERMARK_DEFAULT_VALUE = 50;
const static int SPILL_HIGH_WATERMARK_DEFAULT_VALUE = 80;
@ -329,7 +327,7 @@ Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_g
workload_group_info->version = version;
// 4 cpu_share
uint64_t cpu_share = CPU_SHARE_DEFAULT_VALUE;
uint64_t cpu_share = CgroupCpuCtl::cpu_soft_limit_default_value();
if (tworkload_group_info.__isset.cpu_share) {
cpu_share = tworkload_group_info.cpu_share;
}
@ -433,14 +431,18 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) {
std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = std::make_unique<CgroupV1CpuCtl>(tg_id);
Status ret = cgroup_cpu_ctl->init();
if (ret.ok()) {
_cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id;
std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = CgroupCpuCtl::create_cgroup_cpu_ctl(tg_id);
if (cgroup_cpu_ctl) {
Status ret = cgroup_cpu_ctl->init();
if (ret.ok()) {
_cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
LOG(INFO) << "[upsert wg thread pool] cgroup init success, wg_id=" << tg_id;
} else {
LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id=" << tg_id
<< ", reason=" << ret.to_string();
}
} else {
LOG(INFO) << "[upsert wg thread pool] cgroup init failed, wg_id= " << tg_id
<< ", reason=" << ret.to_string();
LOG(INFO) << "[upsert wg thread pool] create cgroup cpu ctl for " << tg_id << " failed";
}
}
@ -533,7 +535,8 @@ void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
if (enable_cpu_hard_limit) {
if (cpu_hard_limit > 0) {
_cgroup_cpu_ctl->update_cpu_hard_limit(cpu_hard_limit);
_cgroup_cpu_ctl->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
_cgroup_cpu_ctl->update_cpu_soft_limit(
CgroupCpuCtl::cpu_soft_limit_default_value());
} else {
LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is illegal: "
<< cpu_hard_limit << ", gid=" << tg_id;

View File

@ -92,7 +92,8 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
}
// wg is shutdown and running rum = 0, its resource can be released in BE
if (workload_group_ptr->can_be_dropped()) {
LOG(INFO) << "[topic_publish_wg]There is no query in wg" << wg_id << ", delete it.";
LOG(INFO) << "[topic_publish_wg]There is no query in wg " << wg_id
<< ", delete it.";
deleted_task_groups.push_back(workload_group_ptr);
}
}
@ -121,30 +122,16 @@ void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_i
// Using cgdelete has no such issue.
{
if (config::doris_cgroup_cpu_path != "") {
std::lock_guard<std::shared_mutex> write_lock(_init_cg_ctl_lock);
if (!_cg_cpu_ctl) {
_cg_cpu_ctl = std::make_unique<CgroupV1CpuCtl>();
}
if (!_is_init_succ) {
Status ret = _cg_cpu_ctl->init();
if (ret.ok()) {
_is_init_succ = true;
} else {
LOG(INFO) << "[topic_publish_wg]init workload group mgr cpu ctl failed, "
<< ret.to_string();
}
}
if (_is_init_succ) {
Status ret = _cg_cpu_ctl->delete_unused_cgroup_path(used_wg_id);
if (!ret.ok()) {
LOG(WARNING) << "[topic_publish_wg]" << ret.to_string();
}
std::lock_guard<std::shared_mutex> write_lock(_clear_cgroup_lock);
Status ret = CgroupCpuCtl::delete_unused_cgroup_path(used_wg_id);
if (!ret.ok()) {
LOG(WARNING) << "[topic_publish_wg]" << ret.to_string();
}
}
}
int64_t time_cost_ms = MonotonicMillis() - begin_time;
LOG(INFO) << "[topic_publish_wg]finish clear unused workload group, time cost: " << time_cost_ms
<< "ms, deleted group size:" << deleted_task_groups.size()
<< " ms, deleted group size:" << deleted_task_groups.size()
<< ", before wg size=" << old_wg_size << ", after wg size=" << new_wg_size;
}

View File

@ -66,9 +66,7 @@ private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;
std::shared_mutex _init_cg_ctl_lock;
std::unique_ptr<CgroupCpuCtl> _cg_cpu_ctl;
bool _is_init_succ = false;
std::shared_mutex _clear_cgroup_lock;
};
} // namespace doris

View File

@ -479,7 +479,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
row.add(val + "%");
}
} else if (CPU_SHARE.equals(key) && !properties.containsKey(key)) {
row.add("1024");
row.add("-1");
} else if (MEMORY_LIMIT.equals(key) && !properties.containsKey(key)) {
row.add("0%");
} else if (ENABLE_MEMORY_OVERCOMMIT.equals(key) && !properties.containsKey(key)) {

View File

@ -54,16 +54,16 @@ normal 20 50% true 2147483647 0 0 1% 16
test_group 10 11% false 100 0 0 20% -1
-- !show_spill_1 --
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 10% 10%
spill_group_test -1 0% true 2147483647 0 0 -1 -1 10% 10%
-- !show_spill_1 --
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 -1 10%
spill_group_test -1 0% true 2147483647 0 0 -1 -1 -1 10%
-- !show_spill_2 --
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 5% 10%
spill_group_test -1 0% true 2147483647 0 0 -1 -1 5% 10%
-- !show_spill_3 --
spill_group_test 1024 0% true 2147483647 0 0 -1 -1 5% 40%
spill_group_test -1 0% true 2147483647 0 0 -1 -1 5% 40%
-- !show_wg_tag --
tag1_mem_wg1 50% -1 mem_tag1