[refactor](cgroup) remove cgroup manager it is useless (#18124)

Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
yiguolei
2023-03-27 23:02:18 +08:00
committed by GitHub
parent fa586c00a9
commit 359f5be53e
19 changed files with 4 additions and 927 deletions

View File

@ -26,14 +26,12 @@ set(AGENT_SOURCES
heartbeat_server.cpp
task_worker_pool.cpp
utils.cpp
cgroups_mgr.cpp
topic_subscriber.cpp
user_resource_listener.cpp
)
if (OS_MACOSX)
list(REMOVE_ITEM AGENT_SOURCES cgroups_mgr.cpp user_resource_listener.cpp)
list(APPEND AGENT_SOURCES cgroups_mgr_mac.cpp)
list(REMOVE_ITEM AGENT_SOURCES user_resource_listener.cpp)
endif()
add_library(Agent STATIC ${AGENT_SOURCES})

View File

@ -1,472 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "agent/cgroups_mgr.h"
#include <asm/unistd.h>
#include <linux/magic.h>
#include <sys/stat.h>
#include <sys/sysmacros.h>
#include <sys/vfs.h>
#include <unistd.h>
#include <fstream>
#include <future>
#include <map>
#include <sstream>
#include "common/logging.h"
#include "olap/data_dir.h"
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
using std::string;
using std::map;
using std::vector;
using std::stringstream;
using apache::thrift::TException;
using apache::thrift::transport::TTransportException;
namespace doris {
static CgroupsMgr* s_global_cg_mgr;
const std::string CgroupsMgr::_s_system_user = "system";
const std::string CgroupsMgr::_s_system_group = "normal";
std::map<TResourceType::type, std::string> CgroupsMgr::_s_resource_cgroups = {
{TResourceType::type::TRESOURCE_CPU_SHARE, "cpu.shares"},
{TResourceType::type::TRESOURCE_IO_SHARE, "blkio.weight"}};
CgroupsMgr::CgroupsMgr(ExecEnv* exec_env, const string& root_cgroups_path)
: _root_cgroups_path(root_cgroups_path), _is_cgroups_init_success(false), _cur_version(-1) {
if (s_global_cg_mgr == nullptr) {
s_global_cg_mgr = this;
}
}
CgroupsMgr::~CgroupsMgr() {}
Status CgroupsMgr::update_local_cgroups(const TFetchResourceResult& new_fetched_resource) {
std::lock_guard<std::mutex> lck(_update_cgroups_mtx);
if (!_is_cgroups_init_success) {
return Status::InternalError("Cgroup not inited");
}
if (_cur_version >= new_fetched_resource.resourceVersion) {
return Status::OK();
}
const std::map<std::string, TUserResource>& new_user_resource =
new_fetched_resource.resourceByUser;
if (!_local_users.empty()) {
std::set<std::string>::const_iterator old_it = _local_users.begin();
for (; old_it != _local_users.end(); ++old_it) {
if (new_user_resource.count(*old_it) == 0) {
this->delete_user_cgroups(*old_it);
}
}
}
// Clear local users set, because it will be inserted again
_local_users.clear();
std::map<std::string, TUserResource>::const_iterator new_it = new_user_resource.begin();
for (; new_it != new_user_resource.end(); ++new_it) {
const string& user_name = new_it->first;
const std::map<std::string, int32_t>& level_share = new_it->second.shareByGroup;
std::map<std::string, int32_t> user_share;
const std::map<TResourceType::type, int32_t>& resource_share =
new_it->second.resource.resourceByType;
std::map<TResourceType::type, int32_t>::const_iterator resource_it = resource_share.begin();
for (; resource_it != resource_share.end(); ++resource_it) {
if (_s_resource_cgroups.count(resource_it->first) > 0) {
user_share[_s_resource_cgroups[resource_it->first]] = resource_it->second;
}
}
modify_user_cgroups(user_name, user_share, level_share);
_config_user_disk_throttle(user_name, resource_share);
// Insert user to local user's set
_local_users.insert(user_name);
}
// Using resource version, not subscribe version
_cur_version = new_fetched_resource.resourceVersion;
return Status::OK();
}
void CgroupsMgr::_config_user_disk_throttle(
std::string user_name, const std::map<TResourceType::type, int32_t>& resource_share) {
int64_t hdd_read_iops =
_get_resource_value(TResourceType::type::TRESOURCE_HDD_READ_IOPS, resource_share);
int64_t hdd_write_iops =
_get_resource_value(TResourceType::type::TRESOURCE_HDD_WRITE_IOPS, resource_share);
int64_t hdd_read_mbps =
_get_resource_value(TResourceType::type::TRESOURCE_HDD_READ_MBPS, resource_share);
int64_t hdd_write_mbps =
_get_resource_value(TResourceType::type::TRESOURCE_HDD_WRITE_MBPS, resource_share);
int64_t ssd_read_iops =
_get_resource_value(TResourceType::type::TRESOURCE_SSD_READ_IOPS, resource_share);
int64_t ssd_write_iops =
_get_resource_value(TResourceType::type::TRESOURCE_SSD_WRITE_IOPS, resource_share);
int64_t ssd_read_mbps =
_get_resource_value(TResourceType::type::TRESOURCE_SSD_READ_MBPS, resource_share);
int64_t ssd_write_mbps =
_get_resource_value(TResourceType::type::TRESOURCE_SSD_WRITE_MBPS, resource_share);
_config_disk_throttle(user_name, "", hdd_read_iops, hdd_write_iops, hdd_read_mbps,
hdd_write_mbps, ssd_read_iops, ssd_write_iops, ssd_read_mbps,
ssd_write_mbps);
_config_disk_throttle(user_name, "low", hdd_read_iops, hdd_write_iops, hdd_read_mbps,
hdd_write_mbps, ssd_read_iops, ssd_write_iops, ssd_read_mbps,
ssd_write_mbps);
_config_disk_throttle(user_name, "normal", hdd_read_iops, hdd_write_iops, hdd_read_mbps,
hdd_write_mbps, ssd_read_iops, ssd_write_iops, ssd_read_mbps,
ssd_write_mbps);
_config_disk_throttle(user_name, "high", hdd_read_iops, hdd_write_iops, hdd_read_mbps,
hdd_write_mbps, ssd_read_iops, ssd_write_iops, ssd_read_mbps,
ssd_write_mbps);
}
int64_t CgroupsMgr::_get_resource_value(
const TResourceType::type resource_type,
const std::map<TResourceType::type, int32_t>& resource_share) {
int64_t resource_value = -1;
std::map<TResourceType::type, int32_t>::const_iterator it = resource_share.find(resource_type);
if (it != resource_share.end()) {
resource_value = it->second;
}
return resource_value;
}
Status CgroupsMgr::_config_disk_throttle(std::string user_name, std::string level,
int64_t hdd_read_iops, int64_t hdd_write_iops,
int64_t hdd_read_mbps, int64_t hdd_write_mbps,
int64_t ssd_read_iops, int64_t ssd_write_iops,
int64_t ssd_read_mbps, int64_t ssd_write_mbps) {
string cgroups_path = this->_root_cgroups_path + "/" + user_name + "/" + level;
string read_bps_path = cgroups_path + "/blkio.throttle.read_bps_device";
string write_bps_path = cgroups_path + "/blkio.throttle.write_bps_device";
string read_iops_path = cgroups_path + "/blkio.throttle.read_iops_device";
string write_iops_path = cgroups_path + "/blkio.throttle.write_iops_device";
if (!is_file_exist(cgroups_path.c_str())) {
if (!std::filesystem::create_directory(cgroups_path)) {
LOG(ERROR) << "Create cgroups: " << cgroups_path << " failed";
return Status::InternalError("Create cgroups {} failed", cgroups_path);
}
}
// add olap engine data path here
auto stores = StorageEngine::instance()->get_stores();
// buld load data path, it is already in data path
// _exec_env->load_path_mgr()->get_load_data_path(&data_paths);
std::stringstream ctrl_cmd;
for (auto store : stores) {
// check disk type
int64_t read_iops = hdd_read_iops;
int64_t write_iops = hdd_write_iops;
int64_t read_mbps = hdd_read_mbps;
int64_t write_mbps = hdd_write_mbps;
// if user set hdd not ssd, then use hdd for ssd
if (store->is_ssd_disk()) {
read_iops = ssd_read_iops == -1 ? hdd_read_iops : ssd_read_iops;
write_iops = ssd_write_iops == -1 ? hdd_write_iops : ssd_write_iops;
read_mbps = ssd_read_mbps == -1 ? hdd_read_mbps : ssd_read_mbps;
write_mbps = ssd_write_mbps == -1 ? hdd_write_mbps : ssd_write_mbps;
}
struct stat file_stat;
if (stat(store->path().c_str(), &file_stat) != 0) {
continue;
}
int major_number = major(file_stat.st_dev);
int minor_number = minor(file_stat.st_dev);
minor_number = (minor_number / 16) * 16;
if (read_iops != -1) {
ctrl_cmd << major_number << ":" << minor_number << " " << read_iops;
_echo_cmd_to_cgroup(ctrl_cmd, read_iops_path);
ctrl_cmd.clear();
ctrl_cmd.str(std::string());
}
if (write_iops != -1) {
ctrl_cmd << major_number << ":" << minor_number << " " << write_iops;
_echo_cmd_to_cgroup(ctrl_cmd, write_iops_path);
ctrl_cmd.clear();
ctrl_cmd.str(std::string());
}
if (read_mbps != -1) {
ctrl_cmd << major_number << ":" << minor_number << " " << (read_mbps << 20);
_echo_cmd_to_cgroup(ctrl_cmd, read_bps_path);
ctrl_cmd.clear();
ctrl_cmd.str(std::string());
}
if (write_mbps != -1) {
ctrl_cmd << major_number << ":" << minor_number << " " << (write_mbps << 20);
_echo_cmd_to_cgroup(ctrl_cmd, write_bps_path);
ctrl_cmd.clear();
ctrl_cmd.str(std::string());
}
}
return Status::OK();
}
Status CgroupsMgr::modify_user_cgroups(const string& user_name,
const map<string, int32_t>& user_share,
const map<string, int32_t>& level_share) {
// Check if the user's cgroups exists, if not create it
string user_cgroups_path = this->_root_cgroups_path + "/" + user_name;
if (!is_file_exist(user_cgroups_path.c_str())) {
if (!std::filesystem::create_directory(user_cgroups_path)) {
LOG(ERROR) << "Create cgroups for user " << user_name << " failed";
return Status::InternalError("Create cgroups for user {} failed", user_name);
}
}
// Traverse the user resource share map to append share value to cgroup's file
for (map<string, int32_t>::const_iterator user_resource = user_share.begin();
user_resource != user_share.end(); ++user_resource) {
string resource_file_name = user_resource->first;
int32_t user_share_weight = user_resource->second;
// Append the share_weight value to the file
string user_resource_path = user_cgroups_path + "/" + resource_file_name;
std::ofstream user_cgroups(user_resource_path.c_str(), std::ios::out | std::ios::app);
if (!user_cgroups.is_open()) {
return Status::InternalError("User cgroup is not open");
}
user_cgroups << user_share_weight << std::endl;
user_cgroups.close();
LOG(INFO) << "Append " << user_share_weight << " to " << user_resource_path;
for (map<string, int32_t>::const_iterator level_resource = level_share.begin();
level_resource != level_share.end(); ++level_resource) {
// Append resource share to level shares
string level_name = level_resource->first;
int32_t level_share_weight = level_resource->second;
// Check if the level cgroups exist
string level_cgroups_path = user_cgroups_path + "/" + level_name;
if (!is_file_exist(level_cgroups_path.c_str())) {
if (!std::filesystem::create_directory(level_cgroups_path)) {
return Status::InternalError("User level cgroups not exist");
}
}
// Append the share_weight value to the file
string level_resource_path = level_cgroups_path + "/" + resource_file_name;
std::ofstream level_cgroups(level_resource_path.c_str(), std::ios::out | std::ios::app);
if (!level_cgroups.is_open()) {
return Status::InternalError("User level cgroup is not open");
}
level_cgroups << level_share_weight << std::endl;
level_cgroups.close();
LOG(INFO) << "Append " << level_share_weight << " to " << level_resource_path;
}
}
return Status::OK();
}
Status CgroupsMgr::init_cgroups() {
std::string root_cgroups_tasks_path = this->_root_cgroups_path + "/tasks";
// Check if the root cgroups exists
if (is_directory(this->_root_cgroups_path.c_str()) &&
is_file_exist(root_cgroups_tasks_path.c_str())) {
// Check the folder's virtual filesystem to find whether it is a cgroup file system
#ifndef BE_TEST
struct statfs fs_type;
statfs(root_cgroups_tasks_path.c_str(), &fs_type);
if (fs_type.f_type != CGROUP_SUPER_MAGIC) {
LOG(ERROR) << _root_cgroups_path << " is not a cgroups file system.";
_is_cgroups_init_success = false;
return Status::InternalError("{} is not a cgroups file system", _root_cgroups_path);
;
}
#endif
// Check if current user have write permission to cgroup folder
if (access(_root_cgroups_path.c_str(), W_OK) != 0) {
LOG(ERROR) << "Doris does not have write permission to " << _root_cgroups_path;
_is_cgroups_init_success = false;
return Status::InternalError("Doris does not have write permission to {}",
_root_cgroups_path);
}
// If root folder exists, then delete all subfolders under it
std::filesystem::directory_iterator item_begin(this->_root_cgroups_path);
std::filesystem::directory_iterator item_end;
for (; item_begin != item_end; item_begin++) {
if (is_directory(item_begin->path().string().c_str())) {
// Delete the sub folder
if (!delete_user_cgroups(item_begin->path().filename().string()).ok()) {
LOG(ERROR) << "Could not clean subfolder " << item_begin->path().string();
_is_cgroups_init_success = false;
return Status::InternalError("Could not clean subfolder {}",
item_begin->path().string());
}
}
}
LOG(INFO) << "Initialize doris cgroups successfully under folder " << _root_cgroups_path;
_is_cgroups_init_success = true;
return Status::OK();
} else {
VLOG_NOTICE << "Could not find a valid cgroups path for resource isolation,"
<< "current value is " << _root_cgroups_path << ". ignore it.";
_is_cgroups_init_success = false;
return Status::InternalError("Could not find a valid cgroups path for resource isolation");
;
}
}
#define gettid() syscall(__NR_gettid)
void CgroupsMgr::apply_cgroup(const string& user_name, const string& level) {
if (s_global_cg_mgr == nullptr) {
return;
}
s_global_cg_mgr->assign_to_cgroups(user_name, level);
}
Status CgroupsMgr::assign_to_cgroups(const string& user_name, const string& level) {
if (!_is_cgroups_init_success) {
return Status::InternalError("Cgroups not inited");
}
int64_t tid = gettid();
return assign_thread_to_cgroups(tid, user_name, level);
}
Status CgroupsMgr::assign_thread_to_cgroups(int64_t thread_id, const string& user_name,
const string& level) {
if (!_is_cgroups_init_success) {
return Status::InternalError("Cgroups not inited");
}
string tasks_path = _root_cgroups_path + "/" + user_name + "/" + level + "/tasks";
if (!is_file_exist(_root_cgroups_path + "/" + user_name)) {
tasks_path = this->_root_cgroups_path + "/" + _default_user_name + "/" + _default_level +
"/tasks";
} else if (!is_file_exist(_root_cgroups_path + "/" + user_name + "/" + level)) {
tasks_path = this->_root_cgroups_path + "/" + user_name + "/tasks";
}
if (!is_file_exist(tasks_path.c_str())) {
LOG(ERROR) << "Cgroups path " << tasks_path << " not exist!";
return Status::InternalError("Cgroups path not exist");
}
std::ofstream tasks(tasks_path.c_str(), std::ios::out | std::ios::app);
if (!tasks.is_open()) {
// This means doris could not open this file. May be it does not have access to it
LOG(ERROR) << "Echo thread: " << thread_id << " to " << tasks_path << " failed!";
return Status::InternalError("Echo thread to cgroup failed");
}
// Append thread id to the tasks file directly
tasks << thread_id << std::endl;
tasks.close();
return Status::OK();
}
Status CgroupsMgr::delete_user_cgroups(const string& user_name) {
string user_cgroups_path = this->_root_cgroups_path + "/" + user_name;
if (is_file_exist(user_cgroups_path.c_str())) {
// Delete sub cgroups --> level cgroups
std::filesystem::directory_iterator item_begin(user_cgroups_path);
std::filesystem::directory_iterator item_end;
for (; item_begin != item_end; item_begin++) {
if (is_directory(item_begin->path().string().c_str())) {
string cur_cgroups_path = item_begin->path().string();
RETURN_IF_ERROR(drop_cgroups(cur_cgroups_path));
}
}
// Delete user cgroups
RETURN_IF_ERROR(drop_cgroups(user_cgroups_path));
}
return Status::OK();
}
Status CgroupsMgr::drop_cgroups(const string& deleted_cgroups_path) {
// Try to delete the cgroups folder
// If failed then there maybe exist active tasks under it and try to relocate them
// Currently, try 10 times to relocate and delete the cgroups.
int32_t i = 0;
while (is_file_exist(deleted_cgroups_path) && rmdir(deleted_cgroups_path.c_str()) < 0 &&
i < this->_drop_retry_times) {
this->relocate_tasks(deleted_cgroups_path, this->_root_cgroups_path);
++i;
#ifdef BE_TEST
std::filesystem::remove_all(deleted_cgroups_path);
#endif
if (i == this->_drop_retry_times) {
LOG(ERROR) << "drop cgroups under path: " << deleted_cgroups_path << " failed.";
return Status::InternalError("Drop cgroup failed");
}
}
return Status::OK();
}
Status CgroupsMgr::relocate_tasks(const string& src_cgroups, const string& dest_cgroups) {
string src_tasks_path = src_cgroups + "/tasks";
string dest_tasks_path = dest_cgroups + "/tasks";
std::ifstream src_tasks(src_tasks_path.c_str());
if (!src_tasks) {
return Status::InternalError("Src tasks is null");
}
std::ofstream dest_tasks(dest_tasks_path.c_str(), std::ios::out | std::ios::app);
if (!dest_tasks) {
return Status::InternalError("Desk task is null");
}
int64_t taskid;
while (src_tasks >> taskid) {
dest_tasks << taskid << std::endl;
// If the thread id or process id not exists, then error occurs in the stream.
// Clear the error state for every append.
dest_tasks.clear();
}
src_tasks.close();
dest_tasks.close();
return Status::OK();
}
void CgroupsMgr::_echo_cmd_to_cgroup(stringstream& ctrl_cmd, string& cgroups_path) {
std::ofstream cgroups_stream(cgroups_path.c_str(), std::ios::out | std::ios::app);
if (cgroups_stream.is_open()) {
cgroups_stream << ctrl_cmd.str() << std::endl;
cgroups_stream.close();
}
}
bool CgroupsMgr::is_directory(const char* file_path) {
struct stat file_stat;
if (stat(file_path, &file_stat) != 0) {
return false;
}
if (S_ISDIR(file_stat.st_mode)) {
return true;
} else {
return false;
}
}
bool CgroupsMgr::is_file_exist(const char* file_path) {
struct stat file_stat;
if (stat(file_path, &file_stat) != 0) {
return false;
}
return true;
}
bool CgroupsMgr::is_file_exist(const std::string& file_path) {
return is_file_exist(file_path.c_str());
}
} // namespace doris

View File

@ -1,169 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <map>
#include <mutex>
#include <string>
#include "common/status.h"
#include "gen_cpp/MasterService_types.h"
namespace doris {
class ExecEnv;
class CgroupsMgr {
public:
// Input parameters:
// exec_env: global variable to get global objects
// cgroups_root_path: root cgroup allocated to doris by admin
explicit CgroupsMgr(ExecEnv* exec_env, const std::string& root_cgroups_path);
~CgroupsMgr();
// Compare the old user resource and new user resource to find deleted user
// then delete nonexisting cgroups, create new user cgroups, update all user cgroups
Status update_local_cgroups(const TFetchResourceResult& new_fetched_resource);
// Delete all existing cgroups under root path
Status init_cgroups();
// Modify cgroup resource shares under cgroups_root_path.
// Create related cgroups if it not exist.
//
// Input parameters:
// user_name: unique name for the user. it is a dir under cgroups_root_path
//
// user_share: a mapping for shares for different resource like (cpu.share, 100)
// mapping key is resource file name in cgroup; value is share weight
//
// level_share: a mapping for shares for different levels under the user.
// mapping key is level name; value is level's share. Currently, different resource using the same share.
Status modify_user_cgroups(const std::string& user_name,
const std::map<std::string, int32_t>& user_share,
const std::map<std::string, int32_t>& level_share);
static void apply_cgroup(const std::string& user_name, const std::string& level);
static void apply_system_cgroup() { apply_cgroup(_s_system_user, _s_system_group); }
// Assign the thread calling this funciton to the cgroup identified by user name and level
//
// Input parameters:
// user_name&level: the user name and level used to find the cgroup
Status assign_to_cgroups(const std::string& user_name, const std::string& level);
// Assign the thread identified by thread id to the cgroup identified by user name and level
//
// Input parameters:
// thread_id: the unique id for the thread
// user_name&level: the user name and level used to find the cgroup
Status assign_thread_to_cgroups(int64_t thread_id, const std::string& user_name,
const std::string& level);
// Delete the user's cgroups and its sub level cgroups using DropCgroups
// Input parameters:
// user name: user name to be deleted
Status delete_user_cgroups(const std::string& user_name);
// Delete a cgroup
// If there are active tasks in this cgroups, they will be relocated
// to root cgroups.
// If there are sub cgroups, it will return error.
// Input parameters:
// deleted_cgroups_path: the absolute cgroups path to be deleted
Status drop_cgroups(const std::string& deleted_cgroups_path);
// Relocate all threads or processes in src cgroups to dest cgroups
// Ignore errors when echo to dest cgroups
// Input parameters:
// src_cgroups: absolute path for src cgroups folder
// dest_cgroups: absolute path for dest cgroups folder
Status relocate_tasks(const std::string& src_cgroups, const std::string& dest_cgroups);
int64_t get_cgroups_version() { return _cur_version; }
// set the disk throttle for the user by getting resource value from the map and echo it to the cgroups.
// currently, both the user and groups under the user are set to the same value
// because throttle does not support hierarchy.
// Input parameters:
// user_name: name for the user
// resource_share: resource value get from fe
void _config_user_disk_throttle(std::string user_name,
const std::map<TResourceType::type, int32_t>& resource_share);
// get user resource share value from the map
int64_t _get_resource_value(const TResourceType::type resource_type,
const std::map<TResourceType::type, int32_t>& resource_share);
// set disk throttle according to the parameters. currently, we set different
// values for hdd and ssd.
// Input parameters:
// hdd_read_iops: read iops number for hdd disk.
// hdd_write_iops: write iops number for hdd disk.
// hdd_read_mbps: read bps number for hdd disk, using mb not byte or kb.
// hdd_write_mbps: write bps number for hdd disk, using mb not byte or kb.
// ssd_read_iops: read iops number for ssd disk.
// ssd_write_iops: write iops number for ssd disk.
// ssd_read_mbps: read bps number for ssd disk, using mb not byte or kb.
// ssd_write_mbps: write bps number for ssd disk, using mb not byte or kb.
Status _config_disk_throttle(std::string user_name, std::string level, int64_t hdd_read_iops,
int64_t hdd_write_iops, int64_t hdd_read_mbps,
int64_t hdd_write_mbps, int64_t ssd_read_iops,
int64_t ssd_write_iops, int64_t ssd_read_mbps,
int64_t ssd_write_mbps);
// echo command in string stream to the cgroup file
// Input parameters:
// ctrl_cmd: stringstream that contains the string to echo
// cgroups_path: target cgroup file path
void _echo_cmd_to_cgroup(std::stringstream& ctrl_cmd, std::string& cgroups_path);
// check if the path exists and it is a directory
// Input parameters:
// file_path: path to the file
bool is_directory(const char* file_path);
// check if the path exists
// Input parameters:
// file_path: path to the file
bool is_file_exist(const char* file_path);
// check if the path exists
// Input parameters:
// file_path: string value of the path
bool is_file_exist(const std::string& file_path);
public:
const static std::string _s_system_user;
const static std::string _s_system_group;
private:
std::string _root_cgroups_path;
int32_t _drop_retry_times = 10;
bool _is_cgroups_init_success;
std::string _default_user_name = "default";
std::string _default_level = "normal";
int64_t _cur_version;
std::set<std::string> _local_users;
std::mutex _update_cgroups_mtx;
// A static mapping from fe's resource type to cgroups file
static std::map<TResourceType::type, std::string> _s_resource_cgroups;
};
} // namespace doris

View File

@ -1,37 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "agent/cgroups_mgr.h"
namespace doris {
const std::string CgroupsMgr::_s_system_user = "system";
const std::string CgroupsMgr::_s_system_group = "normal";
std::map<TResourceType::type, std::string> CgroupsMgr::_s_resource_cgroups;
CgroupsMgr::CgroupsMgr(ExecEnv* exec_env, const std::string& root_cgroups_path) {}
CgroupsMgr::~CgroupsMgr() = default;
Status CgroupsMgr::init_cgroups() {
return Status::OK();
}
void CgroupsMgr::apply_cgroup(const std::string& user_name, const std::string& level) {}
} // namespace doris

View File

@ -39,9 +39,7 @@ using apache::thrift::transport::TTransportException;
// TRESOURCE_IOPS not mapped
UserResourceListener::UserResourceListener(ExecEnv* exec_env, const TMasterInfo& master_info)
: _master_info(master_info),
_exec_env(exec_env),
_cgroups_mgr(*(exec_env->cgroups_mgr())) {}
: _master_info(master_info), _exec_env(exec_env) {}
UserResourceListener::~UserResourceListener() {}
@ -58,9 +56,6 @@ void UserResourceListener::handle_update(const TAgentServiceVersion::type& proto
}
void UserResourceListener::update_users_resource(int64_t new_version) {
if (new_version <= _cgroups_mgr.get_cgroups_version()) {
return;
}
// Call fe to get latest user resource
Status master_status;
// using 500ms as default timeout value
@ -98,6 +93,5 @@ void UserResourceListener::update_users_resource(int64_t new_version) {
<< e.what();
return;
}
_cgroups_mgr.update_local_cgroups(new_fetched_resource);
}
} // namespace doris

View File

@ -19,7 +19,6 @@
#include <string>
#include "agent/cgroups_mgr.h"
#include "agent/topic_listener.h"
#include "gen_cpp/AgentService_types.h"
#include "gen_cpp/HeartbeatService_types.h"
@ -42,7 +41,6 @@ public:
private:
const TMasterInfo& _master_info;
ExecEnv* _exec_env;
CgroupsMgr& _cgroups_mgr;
// Call cgroups mgr to update user's cgroups resource share
// Also refresh local user resource's cache
void update_users_resource(int64_t new_version);

View File

@ -165,9 +165,6 @@ CONF_String(log_buffer_level, "");
// number of threads available to serve backend execution requests
CONF_Int32(be_service_threads, "64");
// cgroups allocated for doris
CONF_String(doris_cgroups, "");
// Controls the number of threads to run work per core. It's common to pick 2x
// or 3x the number of cores. This keeps the cores busy without causing excessive
// thrashing.

View File

@ -20,7 +20,6 @@
#include <sstream>
#include <string>
#include "agent/cgroups_mgr.h"
#include "boost/lexical_cast.hpp"
#include "common/logging.h"
#include "http/http_channel.h"
@ -45,9 +44,6 @@ ChecksumAction::ChecksumAction() {}
void ChecksumAction::handle(HttpRequest* req) {
LOG(INFO) << "accept one request " << req->debug_string();
// add tid to cgroup in order to limit read bandwidth
CgroupsMgr::apply_system_cgroup();
// Get tablet id
const std::string& tablet_id_str = req->param(TABLET_ID);
if (tablet_id_str.empty()) {

View File

@ -23,7 +23,6 @@
#include <sstream>
#include <string>
#include "agent/cgroups_mgr.h"
#include "env/env.h"
#include "http/http_channel.h"
#include "http/http_headers.h"
@ -104,9 +103,6 @@ void DownloadAction::handle_error_log(HttpRequest* req, const std::string& file_
void DownloadAction::handle(HttpRequest* req) {
VLOG_CRITICAL << "accept one download request " << req->debug_string();
// add tid to cgroup in order to limit read bandwidth
CgroupsMgr::apply_system_cgroup();
// Get 'file' parameter, then assembly file absolute path
const std::string& file_path = req->param(FILE_PARAMETER);
if (file_path.empty()) {

View File

@ -20,7 +20,6 @@
#include <sstream>
#include <string>
#include "agent/cgroups_mgr.h"
#include "boost/lexical_cast.hpp"
#include "common/logging.h"
#include "http/http_channel.h"
@ -42,10 +41,6 @@ ReloadTabletAction::ReloadTabletAction(ExecEnv* exec_env) : _exec_env(exec_env)
void ReloadTabletAction::handle(HttpRequest* req) {
LOG(INFO) << "accept one request " << req->debug_string();
// add tid to cgroup in order to limit read bandwidth
CgroupsMgr::apply_system_cgroup();
// Get path
const std::string& path = req->param(PATH);
if (path.empty()) {

View File

@ -22,7 +22,6 @@
#include <sstream>
#include <string>
#include "agent/cgroups_mgr.h"
#include "env/env.h"
#include "gutil/strings/substitute.h" // for Substitute
#include "http/http_channel.h"
@ -50,8 +49,6 @@ RestoreTabletAction::RestoreTabletAction(ExecEnv* exec_env) : _exec_env(exec_env
void RestoreTabletAction::handle(HttpRequest* req) {
LOG(INFO) << "accept one request " << req->debug_string();
// add tid to cgroup in order to limit read bandwidth
CgroupsMgr::apply_system_cgroup();
Status status = _handle(req);
std::string result = status.to_json();
LOG(INFO) << "handle request result:" << result;

View File

@ -21,7 +21,6 @@
#include <sstream>
#include <string>
#include "agent/cgroups_mgr.h"
#include "common/logging.h"
#include "gen_cpp/AgentService_types.h"
#include "http/http_channel.h"
@ -43,9 +42,6 @@ SnapshotAction::SnapshotAction() {}
void SnapshotAction::handle(HttpRequest* req) {
LOG(INFO) << "accept one request " << req->debug_string();
// add tid to cgroup in order to limit read bandwidth
CgroupsMgr::apply_system_cgroup();
// Get tablet id
const std::string& tablet_id_str = req->param(TABLET_ID);
if (tablet_id_str.empty()) {

View File

@ -25,7 +25,6 @@
#include <random>
#include <string>
#include "agent/cgroups_mgr.h"
#include "common/config.h"
#include "common/status.h"
#include "gutil/strings/substitute.h"
@ -359,10 +358,8 @@ void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& dat
do {
LOG(INFO) << "begin to produce tablet meta checkpoint tasks.";
for (auto data_dir : data_dirs) {
auto st = _tablet_meta_checkpoint_thread_pool->submit_func([data_dir, this]() {
CgroupsMgr::apply_system_cgroup();
_tablet_manager->do_tablet_meta_checkpoint(data_dir);
});
auto st = _tablet_meta_checkpoint_thread_pool->submit_func(
[data_dir, this]() { _tablet_manager->do_tablet_meta_checkpoint(data_dir); });
if (!st.ok()) {
LOG(WARNING) << "submit tablet checkpoint tasks failed.";
}
@ -653,7 +650,6 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
? _cumu_compaction_thread_pool
: _base_compaction_thread_pool;
auto st = thread_pool->submit_func([tablet, compaction_type, permits, this]() {
CgroupsMgr::apply_system_cgroup();
tablet->execute_compaction(compaction_type);
_permit_limiter.release(permits);
// reset compaction

View File

@ -34,7 +34,6 @@
#include <random>
#include <set>
#include "agent/cgroups_mgr.h"
#include "agent/task_worker_pool.h"
#include "env/env.h"
#include "env/env_util.h"

View File

@ -40,7 +40,6 @@ class BrokerMgr;
template <class T>
class BrpcClientCache;
class CgroupsMgr;
class DataStreamMgr;
class EvHttpServer;
class ExternalScanContextMgr;
@ -143,7 +142,6 @@ public:
}
return _download_cache_buf_map[token].get();
}
CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; }
FragmentMgr* fragment_mgr() { return _fragment_mgr; }
ResultCache* result_cache() { return _result_cache; }
TMasterInfo* master_info() { return _master_info; }
@ -224,7 +222,6 @@ private:
std::unique_ptr<ThreadPool> _join_node_thread_pool;
// ThreadPoolToken -> buffer
std::unordered_map<ThreadPoolToken*, std::unique_ptr<char[]>> _download_cache_buf_map;
CgroupsMgr* _cgroups_mgr = nullptr;
FragmentMgr* _fragment_mgr = nullptr;
pipeline::TaskScheduler* _pipeline_task_scheduler = nullptr;
ResultCache* _result_cache = nullptr;

View File

@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
#include "agent/cgroups_mgr.h"
#include "common/config.h"
#include "common/logging.h"
#include "gen_cpp/BackendService.h"
@ -119,8 +118,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
RETURN_IF_ERROR(init_pipeline_task_scheduler());
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
_cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups);
_fragment_mgr = new FragmentMgr(this);
_result_cache = new ResultCache(config::query_cache_max_size_mb,
config::query_cache_elasticity_size_mb);
@ -142,7 +139,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_frontend_client_cache->init_metrics("frontend");
_broker_client_cache->init_metrics("broker");
_result_mgr->init();
_cgroups_mgr->init_cgroups();
Status status = _load_path_mgr->init();
if (!status.ok()) {
LOG(ERROR) << "load path mgr init failed." << status;
@ -347,7 +343,6 @@ void ExecEnv::_destroy() {
SAFE_DELETE(_master_info);
SAFE_DELETE(_pipeline_task_scheduler);
SAFE_DELETE(_fragment_mgr);
SAFE_DELETE(_cgroups_mgr);
SAFE_DELETE(_broker_client_cache);
SAFE_DELETE(_frontend_client_cache);
SAFE_DELETE(_backend_client_cache);

View File

@ -25,7 +25,6 @@
#include <memory>
#include <sstream>
#include "agent/cgroups_mgr.h"
#include "common/object_pool.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/PaloInternalService_types.h"
@ -223,7 +222,6 @@ Status FragmentExecState::execute() {
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
CgroupsMgr::apply_system_cgroup();
opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start executing Fragment");
Status st = _executor.open();
WARN_IF_ERROR(st,

View File

@ -24,7 +24,6 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/test")
set(AGENT_TEST_FILES
agent/utils_test.cpp
# agent/agent_server_test.cpp
# agent/cgroups_mgr_test.cpp
# agent/heartbeat_server_test.cpp
)
set(COMMON_TEST_FILES

View File

@ -1,196 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "agent/cgroups_mgr.h"
#include <algorithm>
#include <filesystem>
#include <fstream>
#include "gtest/gtest.h"
using ::testing::_;
using ::testing::Return;
using ::testing::SetArgPointee;
using std::string;
namespace doris {
StorageEngine* k_engine = nullptr;
class CgroupsMgrTest : public testing::Test {
public:
// create a mock cgroup folder
static void SetUpTestCase() {
EXPECT_TRUE(std::filesystem::remove_all(_s_cgroup_path));
// create a mock cgroup path
EXPECT_TRUE(std::filesystem::create_directory(_s_cgroup_path));
std::vector<StorePath> paths;
paths.emplace_back(config::storage_root_path, -1);
doris::EngineOptions options;
options.store_paths = paths;
Status s = doris::StorageEngine::open(options, &k_engine);
EXPECT_TRUE(s.ok()) << s.to_string();
}
// delete the mock cgroup folder
static void TearDownTestCase() { EXPECT_TRUE(std::filesystem::remove_all(_s_cgroup_path)); }
// test if a file contains specific number
static bool does_contain_number(const std::string& file_path, int32_t number) {
std::ifstream input_file(file_path.c_str());
int32_t task_id;
while (input_file >> task_id) {
if (task_id == number) {
return true;
}
}
return false;
}
static std::string _s_cgroup_path;
static CgroupsMgr _s_cgroups_mgr;
};
std::string CgroupsMgrTest::_s_cgroup_path = "./doris_cgroup_testxxxx123";
CgroupsMgr CgroupsMgrTest::_s_cgroups_mgr(nullptr, CgroupsMgrTest::_s_cgroup_path);
TEST_F(CgroupsMgrTest, TestIsDirectory) {
// test folder exist
bool exist = _s_cgroups_mgr.is_directory(CgroupsMgrTest::_s_cgroup_path.c_str());
EXPECT_TRUE(exist);
// test folder not exist
bool not_exist = _s_cgroups_mgr.is_directory("./abc");
EXPECT_FALSE(not_exist);
// test file exist, but not folder
bool not_folder = _s_cgroups_mgr.is_directory("/etc/profile");
EXPECT_FALSE(not_folder);
}
TEST_F(CgroupsMgrTest, TestIsFileExist) {
// test file exist
bool exist = _s_cgroups_mgr.is_file_exist(CgroupsMgrTest::_s_cgroup_path.c_str());
EXPECT_TRUE(exist);
// test file not exist
bool not_exist = _s_cgroups_mgr.is_file_exist("./abc");
EXPECT_FALSE(not_exist);
}
TEST_F(CgroupsMgrTest, TestInitCgroups) {
// test for task file not exist
Status op_status = _s_cgroups_mgr.init_cgroups();
EXPECT_EQ(Status::DORIS_ERROR, op_status);
// create task file, then init should success
std::string task_file_path = _s_cgroup_path + "/tasks";
std::ofstream outfile(task_file_path.c_str());
outfile << 1111111 << std::endl;
outfile.close();
// create a mock user under cgroup path
EXPECT_TRUE(std::filesystem::create_directory(_s_cgroup_path + "/yiguolei"));
std::ofstream user_out_file(_s_cgroup_path + "/yiguolei/tasks");
user_out_file << 123 << std::endl;
user_out_file.close();
// create a mock user group under cgroup path
EXPECT_TRUE(std::filesystem::create_directory(_s_cgroup_path + "/yiguolei/low"));
std::ofstream group_out_file(CgroupsMgrTest::_s_cgroup_path + "/yiguolei/low/tasks");
group_out_file << 456 << std::endl;
group_out_file.close();
op_status = _s_cgroups_mgr.init_cgroups();
// init should be successful
EXPECT_EQ(Status::OK(), op_status);
// all tasks should be migrated to root cgroup path
EXPECT_TRUE(does_contain_number(task_file_path, 1111111));
EXPECT_TRUE(does_contain_number(task_file_path, 123));
EXPECT_TRUE(does_contain_number(task_file_path, 456));
}
TEST_F(CgroupsMgrTest, TestAssignThreadToCgroups) {
// default cgroup not exist, so that assign to an unknown user will fail
Status op_status = _s_cgroups_mgr.assign_thread_to_cgroups(111, "abc", "low");
EXPECT_EQ(Status::DORIS_ERROR, op_status);
// user cgroup exist
// create a mock user under cgroup path
EXPECT_TRUE(std::filesystem::create_directory(_s_cgroup_path + "/yiguolei2"));
std::ofstream user_out_file(_s_cgroup_path + "/yiguolei2/tasks");
user_out_file << 123 << std::endl;
user_out_file.close();
op_status = _s_cgroups_mgr.assign_thread_to_cgroups(111, "yiguolei2", "aaaa");
EXPECT_EQ(Status::OK(), op_status);
EXPECT_TRUE(does_contain_number(_s_cgroup_path + "/yiguolei2/tasks", 111));
// user,level cgroup exist
// create a mock user group under cgroup path
EXPECT_TRUE(std::filesystem::create_directory(_s_cgroup_path + "/yiguolei2/low"));
std::ofstream group_out_file(_s_cgroup_path + "/yiguolei2/low/tasks");
group_out_file << 456 << std::endl;
group_out_file.close();
op_status = _s_cgroups_mgr.assign_thread_to_cgroups(111, "yiguolei2", "low");
EXPECT_EQ(Status::OK(), op_status);
EXPECT_TRUE(does_contain_number(_s_cgroup_path + "/yiguolei2/low/tasks", 111));
}
TEST_F(CgroupsMgrTest, TestModifyUserCgroups) {
std::map<std::string, int32_t> user_share;
std::map<std::string, int32_t> level_share;
user_share["cpu.shares"] = 100;
level_share["low"] = 100;
std::string user_name = "user_modify";
Status op_status = _s_cgroups_mgr.modify_user_cgroups(user_name, user_share, level_share);
EXPECT_EQ(Status::OK(), op_status);
EXPECT_TRUE(does_contain_number(_s_cgroup_path + "/user_modify/cpu.shares", 100));
EXPECT_TRUE(does_contain_number(_s_cgroup_path + "/user_modify/low/cpu.shares", 100));
}
TEST_F(CgroupsMgrTest, TestUpdateLocalCgroups) {
// mock TFetchResourceResult from fe
TFetchResourceResult user_resource_result;
TUserResource user_resource;
user_resource.shareByGroup["low"] = 123;
user_resource.shareByGroup["normal"] = 234;
TResourceGroup resource_group;
resource_group.resourceByType[TResourceType::type::TRESOURCE_CPU_SHARE] = 100;
user_resource.resource = resource_group;
user_resource_result.resourceVersion = 2;
user_resource_result.resourceByUser["yiguolei3"] = user_resource;
Status op_status = _s_cgroups_mgr.update_local_cgroups(user_resource_result);
EXPECT_EQ(Status::OK(), op_status);
EXPECT_EQ(2, _s_cgroups_mgr._cur_version);
EXPECT_TRUE(does_contain_number(_s_cgroup_path + "/yiguolei3/cpu.shares", 100));
EXPECT_TRUE(does_contain_number(_s_cgroup_path + "/yiguolei3/low/cpu.shares", 123));
EXPECT_TRUE(does_contain_number(_s_cgroup_path + "/yiguolei3/normal/cpu.shares", 234));
}
TEST_F(CgroupsMgrTest, TestRelocateTasks) {
// create a source cgroup, add some taskid into it
Status op_status = _s_cgroups_mgr.relocate_tasks("/a/b/c/d", _s_cgroup_path);
EXPECT_EQ(Status::DORIS_ERROR, op_status);
}
} // namespace doris