[workload](pipeline) Add cgroup cpu controller (#24052)

This commit is contained in:
wangbo
2023-09-21 21:49:33 +08:00
committed by GitHub
parent 94907ca0f2
commit c9b2f4cb92
18 changed files with 334 additions and 17 deletions

View File

@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "agent/cgroup_cpu_ctl.h"
namespace doris {
Status CgroupCpuCtl::init() {
_doris_cgroup_cpu_path = config::doris_cgroup_cpu_path;
if (_doris_cgroup_cpu_path.empty()) {
LOG(INFO) << "doris cgroup cpu path is not specify, path=" << _doris_cgroup_cpu_path;
return Status::InternalError("doris cgroup cpu path {} is not specify.",
_doris_cgroup_cpu_path);
}
if (access(_doris_cgroup_cpu_path.c_str(), F_OK) != 0) {
LOG(ERROR) << "doris cgroup cpu path not exists, path=" << _doris_cgroup_cpu_path;
return Status::InternalError("doris cgroup cpu path {} not exists.",
_doris_cgroup_cpu_path);
}
if (_doris_cgroup_cpu_path.back() != '/') {
_doris_cgroup_cpu_path = _doris_cgroup_cpu_path + "/";
}
return Status::OK();
}
void CgroupCpuCtl::update_cpu_hard_limit(int cpu_hard_limit) {
if (!_init_succ) {
return;
}
std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
if (_cpu_hard_limit != cpu_hard_limit) {
Status ret = modify_cg_cpu_hard_limit_no_lock(cpu_hard_limit);
if (ret.ok()) {
_cpu_hard_limit = cpu_hard_limit;
}
}
}
Status CgroupCpuCtl::write_cg_sys_file(std::string file_path, int value, std::string msg,
bool is_append) {
int fd = open(file_path.c_str(), is_append ? O_RDWR | O_APPEND : O_RDWR);
if (fd == -1) {
LOG(ERROR) << "open path failed, path=" << file_path;
return Status::InternalError("open path failed, path={}", file_path);
}
std::stringstream ss;
ss << value << std::endl;
const std::string& str = ss.str();
int ret = write(fd, str.c_str(), str.size());
if (ret == -1) {
LOG(ERROR) << msg << " write sys file failed";
return Status::InternalError("{} write sys file failed", msg);
}
LOG(INFO) << msg << " success";
return Status::OK();
}
Status CgroupV1CpuCtl::init() {
RETURN_IF_ERROR(CgroupCpuCtl::init());
// query path
_cgroup_v1_cpu_query_path = _doris_cgroup_cpu_path + "query";
if (access(_cgroup_v1_cpu_query_path.c_str(), F_OK) != 0) {
int ret = mkdir(_cgroup_v1_cpu_query_path.c_str(), S_IRWXU);
if (ret != 0) {
LOG(ERROR) << "cgroup v1 mkdir query failed, path=" << _cgroup_v1_cpu_query_path;
return Status::InternalError("cgroup v1 mkdir query failed, path=",
_cgroup_v1_cpu_query_path);
}
}
// quota path
_cgroup_v1_cpu_query_quota_path = _cgroup_v1_cpu_query_path + "/cpu.cfs_quota_us";
// task path
_cgroup_v1_cpu_query_task_path = _cgroup_v1_cpu_query_path + "/tasks";
LOG(INFO) << "cgroup v1 cpu path init success"
<< ", query path=" << _cgroup_v1_cpu_query_path
<< ", query quota path=" << _cgroup_v1_cpu_query_quota_path
<< ", query tasks path=" << _cgroup_v1_cpu_query_task_path
<< ", core num=" << _cpu_core_num;
_init_succ = true;
return Status::OK();
}
Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) {
int val = _cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100;
std::string msg = "modify cpu quota value to " + std::to_string(val);
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_query_quota_path, val, msg, false);
}
Status CgroupV1CpuCtl::add_thread_to_cgroup() {
if (!_init_succ) {
return Status::OK();
}
int tid = static_cast<int>(syscall(SYS_gettid));
std::string msg = "add thread " + std::to_string(tid) + " to group";
std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_query_task_path, tid, msg, true);
}
} // namespace doris

View File

@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <shared_mutex>
#include "common/config.h"
#include "common/status.h"
#include "util/cpu_info.h"
namespace doris {
class CgroupCpuCtl {
public:
CgroupCpuCtl() {}
virtual ~CgroupCpuCtl() {}
virtual Status init();
virtual Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) = 0;
virtual Status add_thread_to_cgroup() = 0;
void update_cpu_hard_limit(int cpu_hard_limit);
protected:
Status write_cg_sys_file(std::string file_path, int value, std::string msg, bool is_append);
std::string _doris_cgroup_cpu_path;
uint64_t _cpu_core_num = CpuInfo::num_cores();
uint64_t _cpu_cfs_period_us = 100000;
uint64_t _cpu_hard_limit = 0;
std::shared_mutex _lock_mutex;
bool _init_succ = false;
};
/*
NOTE: directory structure
1 sys cgroup root path:
/sys/fs/cgroup
2 sys cgroup cpu controller path:
/sys/fs/cgroup/cpu
3 doris home path:
/sys/fs/cgroup/cpu/{doris_home}/
4 doris query path
/sys/fs/cgroup/cpu/{doris_home}/query
5 doris query quota file:
/sys/fs/cgroup/cpu/{doris_home}/query/cpu.cfs_quota_us
6 doris query tasks file:
/sys/fs/cgroup/cpu/{doris_home}/query/tasks
*/
class CgroupV1CpuCtl : public CgroupCpuCtl {
public:
Status init() override;
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
Status add_thread_to_cgroup() override;
private:
// todo(wb) support load/compaction path
std::string _cgroup_v1_cpu_query_path;
std::string _cgroup_v1_cpu_query_quota_path;
std::string _cgroup_v1_cpu_query_task_path;
};
} // namespace doris

View File

@ -1092,7 +1092,10 @@ DEFINE_Int32(group_commit_insert_threads, "10");
DEFINE_mInt32(scan_thread_nice_value, "0");
DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400");
DEFINE_Bool(exit_on_exception, "false")
DEFINE_Bool(exit_on_exception, "false");
// cgroup
DEFINE_String(doris_cgroup_cpu_path, "");
// clang-format off
#ifdef BE_TEST

View File

@ -1171,6 +1171,9 @@ DECLARE_mInt32(tablet_schema_cache_recycle_interval);
// Use `LOG(FATAL)` to replace `throw` when true
DECLARE_mBool(exit_on_exception);
// cgroup
DECLARE_String(doris_cgroup_cpu_path);
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);

View File

@ -199,6 +199,7 @@ Status TaskScheduler::start() {
.set_min_threads(cores)
.set_max_threads(cores)
.set_max_queue_size(0)
.set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
.build(&_fix_thread_pool);
_markers.reserve(cores);
for (size_t i = 0; i < cores; ++i) {

View File

@ -77,11 +77,13 @@ private:
class TaskScheduler {
public:
TaskScheduler(ExecEnv* exec_env, std::shared_ptr<BlockedTaskScheduler> b_scheduler,
std::shared_ptr<TaskQueue> task_queue, std::string name)
std::shared_ptr<TaskQueue> task_queue, std::string name,
CgroupCpuCtl* cgroup_cpu_ctl)
: _task_queue(std::move(task_queue)),
_blocked_task_scheduler(std::move(b_scheduler)),
_shutdown(false),
_name(name) {}
_name(name),
_cgroup_cpu_ctl(cgroup_cpu_ctl) {}
~TaskScheduler();
@ -100,6 +102,7 @@ private:
std::shared_ptr<BlockedTaskScheduler> _blocked_task_scheduler;
std::atomic<bool> _shutdown;
std::string _name;
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
void _do_work(size_t index);
// after _try_close_task, task maybe destructed.

View File

@ -261,6 +261,8 @@ public:
return _inverted_index_query_cache;
}
CgroupCpuCtl* get_cgroup_cpu_ctl() { return _cgroup_cpu_ctl.get(); }
private:
ExecEnv();
@ -362,6 +364,8 @@ private:
CacheManager* _cache_manager = nullptr;
segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = nullptr;
segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr;
std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl = nullptr;
};
template <>

View File

@ -222,7 +222,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
}
_broker_mgr->init();
_small_file_mgr->init();
status = _scanner_scheduler->init();
status = _scanner_scheduler->init(this);
if (!status.ok()) {
LOG(ERROR) << "Scanner scheduler init failed. " << status;
return status;
@ -271,17 +271,27 @@ Status ExecEnv::init_pipeline_task_scheduler() {
executors_size = CpuInfo::num_cores();
}
if (!config::doris_cgroup_cpu_path.empty()) {
_cgroup_cpu_ctl = std::make_unique<CgroupV1CpuCtl>();
Status ret = _cgroup_cpu_ctl->init();
if (!ret.ok()) {
LOG(ERROR) << "init cgroup cpu controller failed";
}
} else {
LOG(INFO) << "cgroup cpu controller is not inited";
}
// TODO pipeline task group combie two blocked schedulers.
auto t_queue = std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
auto b_scheduler = std::make_shared<pipeline::BlockedTaskScheduler>(t_queue);
_pipeline_task_scheduler =
new pipeline::TaskScheduler(this, b_scheduler, t_queue, "WithoutGroupTaskSchePool");
_pipeline_task_scheduler = new pipeline::TaskScheduler(this, b_scheduler, t_queue,
"WithoutGroupTaskSchePool", nullptr);
RETURN_IF_ERROR(_pipeline_task_scheduler->start());
auto tg_queue = std::make_shared<pipeline::TaskGroupTaskQueue>(executors_size);
auto tg_b_scheduler = std::make_shared<pipeline::BlockedTaskScheduler>(tg_queue);
_pipeline_task_group_scheduler =
new pipeline::TaskScheduler(this, tg_b_scheduler, tg_queue, "WithGroupTaskSchePool");
_pipeline_task_group_scheduler = new pipeline::TaskScheduler(
this, tg_b_scheduler, tg_queue, "WithGroupTaskSchePool", _cgroup_cpu_ctl.get());
RETURN_IF_ERROR(_pipeline_task_group_scheduler->start());
return Status::OK();

View File

@ -616,15 +616,21 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) {
if (params.__isset.workload_groups && !params.workload_groups.empty()) {
taskgroup::TaskGroupInfo task_group_info;
auto status = taskgroup::TaskGroupInfo::parse_group_info(params.workload_groups[0],
&task_group_info);
int query_cpu_hard_limit = -1;
auto status = taskgroup::TaskGroupInfo::parse_group_info(
params.workload_groups[0], &task_group_info, &query_cpu_hard_limit);
if (status.ok()) {
auto tg = _exec_env->task_group_manager()->get_or_create_task_group(
task_group_info);
tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
query_ctx->set_task_group(tg);
LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id())
<< " use task group: " << tg->debug_string();
<< " use task group: " << tg->debug_string()
<< " query_cpu_hard_limit: " << query_cpu_hard_limit;
if (query_cpu_hard_limit > 0 && _exec_env->get_cgroup_cpu_ctl() != nullptr) {
_exec_env->get_cgroup_cpu_ctl()->update_cpu_hard_limit(
query_cpu_hard_limit);
}
}
} else {
VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id())

View File

@ -42,6 +42,7 @@ namespace taskgroup {
const static std::string CPU_SHARE = "cpu_share";
const static std::string MEMORY_LIMIT = "memory_limit";
const static std::string ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit";
const static std::string QUERY_CPU_HARD_LIMIT = "query_cpu_hard_limit";
template <typename QueueType>
TaskGroupEntity<QueueType>::TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type)
@ -185,7 +186,7 @@ void TaskGroup::task_group_info(TaskGroupInfo* tg_info) const {
}
Status TaskGroupInfo::parse_group_info(const TPipelineWorkloadGroup& resource_group,
TaskGroupInfo* task_group_info) {
TaskGroupInfo* task_group_info, int* query_cpu_hard_limit) {
if (UNLIKELY(!check_group_info(resource_group))) {
std::stringstream ss;
ss << "incomplete resource group parameters: ";
@ -198,6 +199,10 @@ Status TaskGroupInfo::parse_group_info(const TPipelineWorkloadGroup& resource_gr
uint64_t share = 0;
std::from_chars(iter->second.c_str(), iter->second.c_str() + iter->second.size(), share);
auto iter2 = resource_group.properties.find(QUERY_CPU_HARD_LIMIT);
std::from_chars(iter2->second.c_str(), iter2->second.c_str() + iter2->second.size(),
*query_cpu_hard_limit);
task_group_info->id = resource_group.id;
task_group_info->name = resource_group.name;
task_group_info->version = resource_group.version;

View File

@ -160,7 +160,7 @@ struct TaskGroupInfo {
int64_t version;
static Status parse_group_info(const TPipelineWorkloadGroup& resource_group,
TaskGroupInfo* task_group_info);
TaskGroupInfo* task_group_info, int* query_cpu_hard_limit);
private:
static bool check_group_info(const TPipelineWorkloadGroup& resource_group);

View File

@ -75,6 +75,11 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) {
return *this;
}
ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl) {
_cgroup_cpu_ctl = cgroup_cpu_ctl;
return *this;
}
ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode,
int max_concurrency)
: _mode(mode),
@ -240,6 +245,7 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder)
_num_threads_pending_start(0),
_active_threads(0),
_total_queued_tasks(0),
_cgroup_cpu_ctl(builder._cgroup_cpu_ctl),
_tokenless(new_token(ExecutionMode::CONCURRENT)) {}
ThreadPool::~ThreadPool() {
@ -470,6 +476,10 @@ void ThreadPool::dispatch_thread() {
_num_threads++;
_num_threads_pending_start--;
if (_cgroup_cpu_ctl != nullptr) {
_cgroup_cpu_ctl->add_thread_to_cgroup();
}
// Owned by this worker thread and added/removed from _idle_threads as needed.
IdleThread me;

View File

@ -37,6 +37,7 @@
#include <string>
#include <unordered_set>
#include "agent/cgroup_cpu_ctl.h"
#include "common/status.h"
#include "util/work_thread_pool.hpp"
@ -106,6 +107,7 @@ public:
ThreadPoolBuilder& set_min_threads(int min_threads);
ThreadPoolBuilder& set_max_threads(int max_threads);
ThreadPoolBuilder& set_max_queue_size(int max_queue_size);
ThreadPoolBuilder& set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl);
template <class Rep, class Period>
ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration<Rep, Period>& idle_timeout) {
_idle_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(idle_timeout);
@ -131,6 +133,7 @@ private:
int _min_threads;
int _max_threads;
int _max_queue_size;
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
std::chrono::milliseconds _idle_timeout;
ThreadPoolBuilder(const ThreadPoolBuilder&) = delete;
@ -335,6 +338,8 @@ private:
// Protected by _lock.
int _total_queued_tasks;
CgroupCpuCtl* _cgroup_cpu_ctl = nullptr;
// All allocated tokens.
//
// Protected by _lock.

View File

@ -94,7 +94,7 @@ void ScannerScheduler::stop() {
LOG(INFO) << "ScannerScheduler stopped";
}
Status ScannerScheduler::init() {
Status ScannerScheduler::init(ExecEnv* env) {
// 1. scheduling thread pool and scheduling queues
ThreadPoolBuilder("SchedulingThreadPool")
.set_min_threads(QUEUE_NUM)
@ -134,6 +134,7 @@ Status ScannerScheduler::init() {
ThreadPoolBuilder("local_scan_group")
.set_min_threads(config::doris_scanner_thread_pool_thread_num)
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
.set_cgroup_cpu_ctl(env->get_cgroup_cpu_ctl())
.build(&_group_local_scan_thread_pool);
for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) {
_group_local_scan_thread_pool->submit_func([this] {

View File

@ -64,7 +64,7 @@ public:
ScannerScheduler();
~ScannerScheduler();
[[nodiscard]] Status init();
[[nodiscard]] Status init(ExecEnv* env);
[[nodiscard]] Status submit(ScannerContext* ctx);

View File

@ -1579,6 +1579,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static boolean enable_query_queue = true;
@ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL)
public static boolean enable_cpu_hard_limit = false;
@ConfField(mutable = true)
public static boolean disable_shared_scan = false;

View File

@ -45,6 +45,8 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
public static final String CPU_SHARE = "cpu_share";
public static final String CPU_HARD_LIMIT = "cpu_hard_limit";
public static final String MEMORY_LIMIT = "memory_limit";
public static final String ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit";
@ -60,7 +62,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
private static final ImmutableSet<String> ALL_PROPERTIES_NAME = new ImmutableSet.Builder<String>()
.add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY)
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).build();
.add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).build();
@SerializedName(value = "id")
private long id;
@ -82,6 +84,8 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
private int maxQueueSize = 0;
private int queueTimeout = 0;
private int cpuHardLimit = 0;
private WorkloadGroup(long id, String name, Map<String, String> properties) {
this(id, name, properties, 0);
}
@ -96,6 +100,9 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) {
properties.put(ENABLE_MEMORY_OVERCOMMIT, properties.get(ENABLE_MEMORY_OVERCOMMIT).toLowerCase());
}
if (properties.containsKey(CPU_HARD_LIMIT)) {
this.cpuHardLimit = Integer.parseInt(properties.get(CPU_HARD_LIMIT));
}
}
// called when first create a resource group, load from image or user new create a group
@ -182,6 +189,13 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
throw new DdlException(CPU_SHARE + " " + cpuSchedulingWeight + " requires a positive integer.");
}
if (properties.containsKey(CPU_HARD_LIMIT)) {
String cpuHardLimit = properties.get(CPU_HARD_LIMIT);
if (!StringUtils.isNumeric(cpuHardLimit) || Long.parseLong(cpuHardLimit) <= 0) {
throw new DdlException(CPU_HARD_LIMIT + " " + cpuSchedulingWeight + " requires a positive integer.");
}
}
String memoryLimit = properties.get(MEMORY_LIMIT);
if (!memoryLimit.endsWith("%")) {
throw new DdlException(MEMORY_LIMIT + " " + memoryLimit + " requires a percentage and ends with a '%'");
@ -259,13 +273,21 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
}
}
public int getCpuHardLimit() {
return cpuHardLimit;
}
@Override
public String toString() {
return GsonUtils.GSON.toJson(this);
}
public TPipelineWorkloadGroup toThrift() {
return new TPipelineWorkloadGroup().setId(id).setName(name).setProperties(properties).setVersion(version);
//note(wb) we need add a new key-value to properties and then transfer it to be, so need a copy here
// see WorkloadGroupMgr.getWorkloadGroup
HashMap<String, String> clonedHashMap = new HashMap<>();
clonedHashMap.putAll(properties);
return new TPipelineWorkloadGroup().setId(id).setName(name).setProperties(clonedHashMap).setVersion(version);
}
@Override
@ -289,6 +311,9 @@ public class WorkloadGroup implements Writable, GsonPostProcessable {
this.memoryLimitPercent = 100;
this.properties.put(MEMORY_LIMIT, "100%");
}
if (properties.containsKey(CPU_HARD_LIMIT)) {
this.cpuHardLimit = Integer.parseInt(properties.get(CPU_HARD_LIMIT));
}
this.initQueryQueue();
}
}

View File

@ -76,6 +76,9 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public static final String QUERY_CPU_HARD_LIMIT = "query_cpu_hard_limit";
private int queryCPUHardLimit = 0;
public WorkloadGroupMgr() {
}
@ -118,6 +121,9 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
throw new UserException("Workload group " + groupName + " does not exist");
}
workloadGroups.add(workloadGroup.toThrift());
// note(wb) -1 to tell be no need to update cgroup
int thriftVal = Config.enable_cpu_hard_limit ? this.queryCPUHardLimit : -1;
workloadGroups.get(0).getProperties().put(QUERY_CPU_HARD_LIMIT, String.valueOf(thriftVal));
context.setWorkloadGroupName(groupName);
} finally {
readUnlock();
@ -181,6 +187,9 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
public void createWorkloadGroup(CreateWorkloadGroupStmt stmt) throws DdlException {
checkWorkloadGroupEnabled();
if (!Config.enable_cpu_hard_limit) {
stmt.getProperties().remove(WorkloadGroup.CPU_HARD_LIMIT);
}
WorkloadGroup workloadGroup = WorkloadGroup.create(stmt.getWorkloadGroupName(), stmt.getProperties());
String workloadGroupName = workloadGroup.getName();
writeLock();
@ -194,6 +203,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
checkGlobalUnlock(workloadGroup, null);
nameToWorkloadGroup.put(workloadGroupName, workloadGroup);
idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup);
calQueryCPUHardLimit();
Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(workloadGroup);
} finally {
writeUnlock();
@ -211,6 +221,19 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
throw new DdlException(
"The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " cannot be greater than 100.0%.");
}
if (!Config.enable_cpu_hard_limit) {
return;
}
int sumCPULimit = queryCPUHardLimit + workloadGroup.getCpuHardLimit();
if (!Objects.isNull(old)) {
sumCPULimit -= old.getCpuHardLimit();
}
if (sumCPULimit > 100 || sumCPULimit <= 0) {
throw new DdlException(
"The sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT
+ " can not be greater than 100% or less than or equal 0%");
}
}
public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws DdlException {
@ -218,6 +241,9 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
String workloadGroupName = stmt.getWorkloadGroupName();
Map<String, String> properties = stmt.getProperties();
if (!Config.enable_cpu_hard_limit) {
properties.remove(WorkloadGroup.CPU_HARD_LIMIT);
}
WorkloadGroup newWorkloadGroup;
writeLock();
try {
@ -229,6 +255,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
checkGlobalUnlock(newWorkloadGroup, workloadGroup);
nameToWorkloadGroup.put(workloadGroupName, newWorkloadGroup);
idToWorkloadGroup.put(newWorkloadGroup.getId(), newWorkloadGroup);
calQueryCPUHardLimit();
Env.getCurrentEnv().getEditLog().logAlterWorkloadGroup(newWorkloadGroup);
} finally {
writeUnlock();
@ -332,6 +359,11 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
return idToWorkloadGroup;
}
private void calQueryCPUHardLimit() {
this.queryCPUHardLimit =
idToWorkloadGroup.values().stream().mapToInt(WorkloadGroup::getCpuHardLimit).sum();
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
@ -347,6 +379,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable {
public void gsonPostProcess() throws IOException {
idToWorkloadGroup.forEach(
(id, workloadGroup) -> nameToWorkloadGroup.put(workloadGroup.getName(), workloadGroup));
calQueryCPUHardLimit();
}
public class ResourceProcNode {