diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp new file mode 100644 index 0000000000..7e1b6b8149 --- /dev/null +++ b/be/src/agent/cgroup_cpu_ctl.cpp @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "agent/cgroup_cpu_ctl.h" + +namespace doris { + +Status CgroupCpuCtl::init() { + _doris_cgroup_cpu_path = config::doris_cgroup_cpu_path; + if (_doris_cgroup_cpu_path.empty()) { + LOG(INFO) << "doris cgroup cpu path is not specify, path=" << _doris_cgroup_cpu_path; + return Status::InternalError("doris cgroup cpu path {} is not specify.", + _doris_cgroup_cpu_path); + } + + if (access(_doris_cgroup_cpu_path.c_str(), F_OK) != 0) { + LOG(ERROR) << "doris cgroup cpu path not exists, path=" << _doris_cgroup_cpu_path; + return Status::InternalError("doris cgroup cpu path {} not exists.", + _doris_cgroup_cpu_path); + } + + if (_doris_cgroup_cpu_path.back() != '/') { + _doris_cgroup_cpu_path = _doris_cgroup_cpu_path + "/"; + } + return Status::OK(); +} + +void CgroupCpuCtl::update_cpu_hard_limit(int cpu_hard_limit) { + if (!_init_succ) { + return; + } + std::lock_guard w_lock(_lock_mutex); + if (_cpu_hard_limit != cpu_hard_limit) { + Status ret = modify_cg_cpu_hard_limit_no_lock(cpu_hard_limit); + if (ret.ok()) { + _cpu_hard_limit = cpu_hard_limit; + } + } +} + +Status CgroupCpuCtl::write_cg_sys_file(std::string file_path, int value, std::string msg, + bool is_append) { + int fd = open(file_path.c_str(), is_append ? O_RDWR | O_APPEND : O_RDWR); + if (fd == -1) { + LOG(ERROR) << "open path failed, path=" << file_path; + return Status::InternalError("open path failed, path={}", file_path); + } + + std::stringstream ss; + ss << value << std::endl; + const std::string& str = ss.str(); + int ret = write(fd, str.c_str(), str.size()); + if (ret == -1) { + LOG(ERROR) << msg << " write sys file failed"; + return Status::InternalError("{} write sys file failed", msg); + } + LOG(INFO) << msg << " success"; + return Status::OK(); +} + +Status CgroupV1CpuCtl::init() { + RETURN_IF_ERROR(CgroupCpuCtl::init()); + + // query path + _cgroup_v1_cpu_query_path = _doris_cgroup_cpu_path + "query"; + if (access(_cgroup_v1_cpu_query_path.c_str(), F_OK) != 0) { + int ret = mkdir(_cgroup_v1_cpu_query_path.c_str(), S_IRWXU); + if (ret != 0) { + LOG(ERROR) << "cgroup v1 mkdir query failed, path=" << _cgroup_v1_cpu_query_path; + return Status::InternalError("cgroup v1 mkdir query failed, path=", + _cgroup_v1_cpu_query_path); + } + } + + // quota path + _cgroup_v1_cpu_query_quota_path = _cgroup_v1_cpu_query_path + "/cpu.cfs_quota_us"; + // task path + _cgroup_v1_cpu_query_task_path = _cgroup_v1_cpu_query_path + "/tasks"; + LOG(INFO) << "cgroup v1 cpu path init success" + << ", query path=" << _cgroup_v1_cpu_query_path + << ", query quota path=" << _cgroup_v1_cpu_query_quota_path + << ", query tasks path=" << _cgroup_v1_cpu_query_task_path + << ", core num=" << _cpu_core_num; + _init_succ = true; + return Status::OK(); +} + +Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) { + int val = _cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100; + std::string msg = "modify cpu quota value to " + std::to_string(val); + return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_query_quota_path, val, msg, false); +} + +Status CgroupV1CpuCtl::add_thread_to_cgroup() { + if (!_init_succ) { + return Status::OK(); + } + int tid = static_cast(syscall(SYS_gettid)); + std::string msg = "add thread " + std::to_string(tid) + " to group"; + std::lock_guard w_lock(_lock_mutex); + return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_query_task_path, tid, msg, true); +} +} // namespace doris \ No newline at end of file diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h new file mode 100644 index 0000000000..21eb367e44 --- /dev/null +++ b/be/src/agent/cgroup_cpu_ctl.h @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include + +#include + +#include "common/config.h" +#include "common/status.h" +#include "util/cpu_info.h" + +namespace doris { + +class CgroupCpuCtl { +public: + CgroupCpuCtl() {} + virtual ~CgroupCpuCtl() {} + + virtual Status init(); + + virtual Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) = 0; + + virtual Status add_thread_to_cgroup() = 0; + + void update_cpu_hard_limit(int cpu_hard_limit); + +protected: + Status write_cg_sys_file(std::string file_path, int value, std::string msg, bool is_append); + + std::string _doris_cgroup_cpu_path; + uint64_t _cpu_core_num = CpuInfo::num_cores(); + uint64_t _cpu_cfs_period_us = 100000; + uint64_t _cpu_hard_limit = 0; + std::shared_mutex _lock_mutex; + bool _init_succ = false; +}; + +/* + NOTE: directory structure + 1 sys cgroup root path: + /sys/fs/cgroup + + 2 sys cgroup cpu controller path: + /sys/fs/cgroup/cpu + + 3 doris home path: + /sys/fs/cgroup/cpu/{doris_home}/ + + 4 doris query path + /sys/fs/cgroup/cpu/{doris_home}/query + + 5 doris query quota file: + /sys/fs/cgroup/cpu/{doris_home}/query/cpu.cfs_quota_us + + 6 doris query tasks file: + /sys/fs/cgroup/cpu/{doris_home}/query/tasks +*/ +class CgroupV1CpuCtl : public CgroupCpuCtl { +public: + Status init() override; + Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override; + Status add_thread_to_cgroup() override; + +private: + // todo(wb) support load/compaction path + std::string _cgroup_v1_cpu_query_path; + std::string _cgroup_v1_cpu_query_quota_path; + std::string _cgroup_v1_cpu_query_task_path; +}; + +} // namespace doris \ No newline at end of file diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index c8f07f9095..5df93c8798 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1092,7 +1092,10 @@ DEFINE_Int32(group_commit_insert_threads, "10"); DEFINE_mInt32(scan_thread_nice_value, "0"); DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400"); -DEFINE_Bool(exit_on_exception, "false") +DEFINE_Bool(exit_on_exception, "false"); + +// cgroup +DEFINE_String(doris_cgroup_cpu_path, ""); // clang-format off #ifdef BE_TEST diff --git a/be/src/common/config.h b/be/src/common/config.h index a4d7eaf438..e2b8481d02 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1171,6 +1171,9 @@ DECLARE_mInt32(tablet_schema_cache_recycle_interval); // Use `LOG(FATAL)` to replace `throw` when true DECLARE_mBool(exit_on_exception); +// cgroup +DECLARE_String(doris_cgroup_cpu_path); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index c4278c3807..51da7d811c 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -199,6 +199,7 @@ Status TaskScheduler::start() { .set_min_threads(cores) .set_max_threads(cores) .set_max_queue_size(0) + .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) .build(&_fix_thread_pool); _markers.reserve(cores); for (size_t i = 0; i < cores; ++i) { diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 13b9e734d6..8238968750 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -77,11 +77,13 @@ private: class TaskScheduler { public: TaskScheduler(ExecEnv* exec_env, std::shared_ptr b_scheduler, - std::shared_ptr task_queue, std::string name) + std::shared_ptr task_queue, std::string name, + CgroupCpuCtl* cgroup_cpu_ctl) : _task_queue(std::move(task_queue)), _blocked_task_scheduler(std::move(b_scheduler)), _shutdown(false), - _name(name) {} + _name(name), + _cgroup_cpu_ctl(cgroup_cpu_ctl) {} ~TaskScheduler(); @@ -100,6 +102,7 @@ private: std::shared_ptr _blocked_task_scheduler; std::atomic _shutdown; std::string _name; + CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; void _do_work(size_t index); // after _try_close_task, task maybe destructed. diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 02801cfe12..ed59a81916 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -261,6 +261,8 @@ public: return _inverted_index_query_cache; } + CgroupCpuCtl* get_cgroup_cpu_ctl() { return _cgroup_cpu_ctl.get(); } + private: ExecEnv(); @@ -362,6 +364,8 @@ private: CacheManager* _cache_manager = nullptr; segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = nullptr; segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr; + + std::unique_ptr _cgroup_cpu_ctl = nullptr; }; template <> diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index ecaf52a133..ebdf0dd4fc 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -222,7 +222,7 @@ Status ExecEnv::_init(const std::vector& store_paths, } _broker_mgr->init(); _small_file_mgr->init(); - status = _scanner_scheduler->init(); + status = _scanner_scheduler->init(this); if (!status.ok()) { LOG(ERROR) << "Scanner scheduler init failed. " << status; return status; @@ -271,17 +271,27 @@ Status ExecEnv::init_pipeline_task_scheduler() { executors_size = CpuInfo::num_cores(); } + if (!config::doris_cgroup_cpu_path.empty()) { + _cgroup_cpu_ctl = std::make_unique(); + Status ret = _cgroup_cpu_ctl->init(); + if (!ret.ok()) { + LOG(ERROR) << "init cgroup cpu controller failed"; + } + } else { + LOG(INFO) << "cgroup cpu controller is not inited"; + } + // TODO pipeline task group combie two blocked schedulers. auto t_queue = std::make_shared(executors_size); auto b_scheduler = std::make_shared(t_queue); - _pipeline_task_scheduler = - new pipeline::TaskScheduler(this, b_scheduler, t_queue, "WithoutGroupTaskSchePool"); + _pipeline_task_scheduler = new pipeline::TaskScheduler(this, b_scheduler, t_queue, + "WithoutGroupTaskSchePool", nullptr); RETURN_IF_ERROR(_pipeline_task_scheduler->start()); auto tg_queue = std::make_shared(executors_size); auto tg_b_scheduler = std::make_shared(tg_queue); - _pipeline_task_group_scheduler = - new pipeline::TaskScheduler(this, tg_b_scheduler, tg_queue, "WithGroupTaskSchePool"); + _pipeline_task_group_scheduler = new pipeline::TaskScheduler( + this, tg_b_scheduler, tg_queue, "WithGroupTaskSchePool", _cgroup_cpu_ctl.get()); RETURN_IF_ERROR(_pipeline_task_group_scheduler->start()); return Status::OK(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 812ba1bb3d..dcded46aa4 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -616,15 +616,21 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo if constexpr (std::is_same_v) { if (params.__isset.workload_groups && !params.workload_groups.empty()) { taskgroup::TaskGroupInfo task_group_info; - auto status = taskgroup::TaskGroupInfo::parse_group_info(params.workload_groups[0], - &task_group_info); + int query_cpu_hard_limit = -1; + auto status = taskgroup::TaskGroupInfo::parse_group_info( + params.workload_groups[0], &task_group_info, &query_cpu_hard_limit); if (status.ok()) { auto tg = _exec_env->task_group_manager()->get_or_create_task_group( task_group_info); tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker); query_ctx->set_task_group(tg); LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) - << " use task group: " << tg->debug_string(); + << " use task group: " << tg->debug_string() + << " query_cpu_hard_limit: " << query_cpu_hard_limit; + if (query_cpu_hard_limit > 0 && _exec_env->get_cgroup_cpu_ctl() != nullptr) { + _exec_env->get_cgroup_cpu_ctl()->update_cpu_hard_limit( + query_cpu_hard_limit); + } } } else { VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id()) diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/task_group/task_group.cpp index 7c3d8ff42b..641b39f3e2 100644 --- a/be/src/runtime/task_group/task_group.cpp +++ b/be/src/runtime/task_group/task_group.cpp @@ -42,6 +42,7 @@ namespace taskgroup { const static std::string CPU_SHARE = "cpu_share"; const static std::string MEMORY_LIMIT = "memory_limit"; const static std::string ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit"; +const static std::string QUERY_CPU_HARD_LIMIT = "query_cpu_hard_limit"; template TaskGroupEntity::TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type) @@ -185,7 +186,7 @@ void TaskGroup::task_group_info(TaskGroupInfo* tg_info) const { } Status TaskGroupInfo::parse_group_info(const TPipelineWorkloadGroup& resource_group, - TaskGroupInfo* task_group_info) { + TaskGroupInfo* task_group_info, int* query_cpu_hard_limit) { if (UNLIKELY(!check_group_info(resource_group))) { std::stringstream ss; ss << "incomplete resource group parameters: "; @@ -198,6 +199,10 @@ Status TaskGroupInfo::parse_group_info(const TPipelineWorkloadGroup& resource_gr uint64_t share = 0; std::from_chars(iter->second.c_str(), iter->second.c_str() + iter->second.size(), share); + auto iter2 = resource_group.properties.find(QUERY_CPU_HARD_LIMIT); + std::from_chars(iter2->second.c_str(), iter2->second.c_str() + iter2->second.size(), + *query_cpu_hard_limit); + task_group_info->id = resource_group.id; task_group_info->name = resource_group.name; task_group_info->version = resource_group.version; diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index cd547c9c7e..88da2ff637 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -160,7 +160,7 @@ struct TaskGroupInfo { int64_t version; static Status parse_group_info(const TPipelineWorkloadGroup& resource_group, - TaskGroupInfo* task_group_info); + TaskGroupInfo* task_group_info, int* query_cpu_hard_limit); private: static bool check_group_info(const TPipelineWorkloadGroup& resource_group); diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index 6ac02e5cbd..5eddabaa68 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -75,6 +75,11 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) { return *this; } +ThreadPoolBuilder& ThreadPoolBuilder::set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl) { + _cgroup_cpu_ctl = cgroup_cpu_ctl; + return *this; +} + ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode, int max_concurrency) : _mode(mode), @@ -240,6 +245,7 @@ ThreadPool::ThreadPool(const ThreadPoolBuilder& builder) _num_threads_pending_start(0), _active_threads(0), _total_queued_tasks(0), + _cgroup_cpu_ctl(builder._cgroup_cpu_ctl), _tokenless(new_token(ExecutionMode::CONCURRENT)) {} ThreadPool::~ThreadPool() { @@ -470,6 +476,10 @@ void ThreadPool::dispatch_thread() { _num_threads++; _num_threads_pending_start--; + if (_cgroup_cpu_ctl != nullptr) { + _cgroup_cpu_ctl->add_thread_to_cgroup(); + } + // Owned by this worker thread and added/removed from _idle_threads as needed. IdleThread me; diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index 3bb0a76c1c..c842870bce 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -37,6 +37,7 @@ #include #include +#include "agent/cgroup_cpu_ctl.h" #include "common/status.h" #include "util/work_thread_pool.hpp" @@ -106,6 +107,7 @@ public: ThreadPoolBuilder& set_min_threads(int min_threads); ThreadPoolBuilder& set_max_threads(int max_threads); ThreadPoolBuilder& set_max_queue_size(int max_queue_size); + ThreadPoolBuilder& set_cgroup_cpu_ctl(CgroupCpuCtl* cgroup_cpu_ctl); template ThreadPoolBuilder& set_idle_timeout(const std::chrono::duration& idle_timeout) { _idle_timeout = std::chrono::duration_cast(idle_timeout); @@ -131,6 +133,7 @@ private: int _min_threads; int _max_threads; int _max_queue_size; + CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; std::chrono::milliseconds _idle_timeout; ThreadPoolBuilder(const ThreadPoolBuilder&) = delete; @@ -335,6 +338,8 @@ private: // Protected by _lock. int _total_queued_tasks; + CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; + // All allocated tokens. // // Protected by _lock. diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index d3370925e9..ed130d4425 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -94,7 +94,7 @@ void ScannerScheduler::stop() { LOG(INFO) << "ScannerScheduler stopped"; } -Status ScannerScheduler::init() { +Status ScannerScheduler::init(ExecEnv* env) { // 1. scheduling thread pool and scheduling queues ThreadPoolBuilder("SchedulingThreadPool") .set_min_threads(QUEUE_NUM) @@ -134,6 +134,7 @@ Status ScannerScheduler::init() { ThreadPoolBuilder("local_scan_group") .set_min_threads(config::doris_scanner_thread_pool_thread_num) .set_max_threads(config::doris_scanner_thread_pool_thread_num) + .set_cgroup_cpu_ctl(env->get_cgroup_cpu_ctl()) .build(&_group_local_scan_thread_pool); for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) { _group_local_scan_thread_pool->submit_func([this] { diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 366275eb41..37d85e1b29 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -64,7 +64,7 @@ public: ScannerScheduler(); ~ScannerScheduler(); - [[nodiscard]] Status init(); + [[nodiscard]] Status init(ExecEnv* env); [[nodiscard]] Status submit(ScannerContext* ctx); diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 6e9eff5905..92fd50ede1 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1579,6 +1579,9 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static boolean enable_query_queue = true; + @ConfField(mutable = true, varType = VariableAnnotation.EXPERIMENTAL) + public static boolean enable_cpu_hard_limit = false; + @ConfField(mutable = true) public static boolean disable_shared_scan = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 31b6d36037..adc6eca9c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -45,6 +45,8 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { public static final String CPU_SHARE = "cpu_share"; + public static final String CPU_HARD_LIMIT = "cpu_hard_limit"; + public static final String MEMORY_LIMIT = "memory_limit"; public static final String ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit"; @@ -60,7 +62,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { private static final ImmutableSet ALL_PROPERTIES_NAME = new ImmutableSet.Builder() .add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY) - .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).build(); + .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).build(); @SerializedName(value = "id") private long id; @@ -82,6 +84,8 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { private int maxQueueSize = 0; private int queueTimeout = 0; + private int cpuHardLimit = 0; + private WorkloadGroup(long id, String name, Map properties) { this(id, name, properties, 0); } @@ -96,6 +100,9 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { if (properties.containsKey(ENABLE_MEMORY_OVERCOMMIT)) { properties.put(ENABLE_MEMORY_OVERCOMMIT, properties.get(ENABLE_MEMORY_OVERCOMMIT).toLowerCase()); } + if (properties.containsKey(CPU_HARD_LIMIT)) { + this.cpuHardLimit = Integer.parseInt(properties.get(CPU_HARD_LIMIT)); + } } // called when first create a resource group, load from image or user new create a group @@ -182,6 +189,13 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { throw new DdlException(CPU_SHARE + " " + cpuSchedulingWeight + " requires a positive integer."); } + if (properties.containsKey(CPU_HARD_LIMIT)) { + String cpuHardLimit = properties.get(CPU_HARD_LIMIT); + if (!StringUtils.isNumeric(cpuHardLimit) || Long.parseLong(cpuHardLimit) <= 0) { + throw new DdlException(CPU_HARD_LIMIT + " " + cpuSchedulingWeight + " requires a positive integer."); + } + } + String memoryLimit = properties.get(MEMORY_LIMIT); if (!memoryLimit.endsWith("%")) { throw new DdlException(MEMORY_LIMIT + " " + memoryLimit + " requires a percentage and ends with a '%'"); @@ -259,13 +273,21 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { } } + public int getCpuHardLimit() { + return cpuHardLimit; + } + @Override public String toString() { return GsonUtils.GSON.toJson(this); } public TPipelineWorkloadGroup toThrift() { - return new TPipelineWorkloadGroup().setId(id).setName(name).setProperties(properties).setVersion(version); + //note(wb) we need add a new key-value to properties and then transfer it to be, so need a copy here + // see WorkloadGroupMgr.getWorkloadGroup + HashMap clonedHashMap = new HashMap<>(); + clonedHashMap.putAll(properties); + return new TPipelineWorkloadGroup().setId(id).setName(name).setProperties(clonedHashMap).setVersion(version); } @Override @@ -289,6 +311,9 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { this.memoryLimitPercent = 100; this.properties.put(MEMORY_LIMIT, "100%"); } + if (properties.containsKey(CPU_HARD_LIMIT)) { + this.cpuHardLimit = Integer.parseInt(properties.get(CPU_HARD_LIMIT)); + } this.initQueryQueue(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 13948442fc..89aae3c0bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -76,6 +76,9 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + public static final String QUERY_CPU_HARD_LIMIT = "query_cpu_hard_limit"; + private int queryCPUHardLimit = 0; + public WorkloadGroupMgr() { } @@ -118,6 +121,9 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { throw new UserException("Workload group " + groupName + " does not exist"); } workloadGroups.add(workloadGroup.toThrift()); + // note(wb) -1 to tell be no need to update cgroup + int thriftVal = Config.enable_cpu_hard_limit ? this.queryCPUHardLimit : -1; + workloadGroups.get(0).getProperties().put(QUERY_CPU_HARD_LIMIT, String.valueOf(thriftVal)); context.setWorkloadGroupName(groupName); } finally { readUnlock(); @@ -181,6 +187,9 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { public void createWorkloadGroup(CreateWorkloadGroupStmt stmt) throws DdlException { checkWorkloadGroupEnabled(); + if (!Config.enable_cpu_hard_limit) { + stmt.getProperties().remove(WorkloadGroup.CPU_HARD_LIMIT); + } WorkloadGroup workloadGroup = WorkloadGroup.create(stmt.getWorkloadGroupName(), stmt.getProperties()); String workloadGroupName = workloadGroup.getName(); writeLock(); @@ -194,6 +203,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { checkGlobalUnlock(workloadGroup, null); nameToWorkloadGroup.put(workloadGroupName, workloadGroup); idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup); + calQueryCPUHardLimit(); Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(workloadGroup); } finally { writeUnlock(); @@ -211,6 +221,19 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { throw new DdlException( "The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " cannot be greater than 100.0%."); } + + if (!Config.enable_cpu_hard_limit) { + return; + } + int sumCPULimit = queryCPUHardLimit + workloadGroup.getCpuHardLimit(); + if (!Objects.isNull(old)) { + sumCPULimit -= old.getCpuHardLimit(); + } + if (sumCPULimit > 100 || sumCPULimit <= 0) { + throw new DdlException( + "The sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT + + " can not be greater than 100% or less than or equal 0%"); + } } public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws DdlException { @@ -218,6 +241,9 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { String workloadGroupName = stmt.getWorkloadGroupName(); Map properties = stmt.getProperties(); + if (!Config.enable_cpu_hard_limit) { + properties.remove(WorkloadGroup.CPU_HARD_LIMIT); + } WorkloadGroup newWorkloadGroup; writeLock(); try { @@ -229,6 +255,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { checkGlobalUnlock(newWorkloadGroup, workloadGroup); nameToWorkloadGroup.put(workloadGroupName, newWorkloadGroup); idToWorkloadGroup.put(newWorkloadGroup.getId(), newWorkloadGroup); + calQueryCPUHardLimit(); Env.getCurrentEnv().getEditLog().logAlterWorkloadGroup(newWorkloadGroup); } finally { writeUnlock(); @@ -332,6 +359,11 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { return idToWorkloadGroup; } + private void calQueryCPUHardLimit() { + this.queryCPUHardLimit = + idToWorkloadGroup.values().stream().mapToInt(WorkloadGroup::getCpuHardLimit).sum(); + } + @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); @@ -347,6 +379,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { public void gsonPostProcess() throws IOException { idToWorkloadGroup.forEach( (id, workloadGroup) -> nameToWorkloadGroup.put(workloadGroup.getName(), workloadGroup)); + calQueryCPUHardLimit(); } public class ResourceProcNode {