From 54780c62e0d0e8a03cc9a45d1a7edd1386fb5ede Mon Sep 17 00:00:00 2001 From: wangbo Date: Thu, 19 Oct 2023 18:56:26 +0800 Subject: [PATCH] [improvement](executor)Using cgroup to implement cpu hard limit (#25489) * Using cgroup to implement cpu hard limit * code style --- be/src/agent/cgroup_cpu_ctl.cpp | 25 ++++-- be/src/agent/cgroup_cpu_ctl.h | 23 +++--- be/src/pipeline/pipeline_fragment_context.cpp | 4 +- be/src/pipeline/pipeline_task.h | 1 + be/src/pipeline/task_scheduler.cpp | 12 ++- be/src/pipeline/task_scheduler.h | 6 +- be/src/runtime/exec_env.h | 14 +++- be/src/runtime/exec_env_init.cpp | 29 ++++--- be/src/runtime/fragment_mgr.cpp | 24 ++++-- be/src/runtime/query_context.h | 15 ++++ be/src/runtime/task_group/task_group.cpp | 11 +-- be/src/runtime/task_group/task_group.h | 3 +- .../runtime/task_group/task_group_manager.cpp | 72 +++++++++++++++++ .../runtime/task_group/task_group_manager.h | 26 +++++++ be/src/vec/exec/scan/scanner_context.cpp | 4 + be/src/vec/exec/scan/scanner_context.h | 5 ++ be/src/vec/exec/scan/scanner_scheduler.cpp | 16 +++- be/src/vec/exec/scan/scanner_scheduler.h | 60 ++++++++++++++ .../workloadgroup/WorkloadGroupMgr.java | 78 +++---------------- 19 files changed, 297 insertions(+), 131 deletions(-) diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp index d8a18c8c13..4a9aed2bb0 100644 --- a/be/src/agent/cgroup_cpu_ctl.cpp +++ b/be/src/agent/cgroup_cpu_ctl.cpp @@ -86,14 +86,25 @@ Status CgroupV1CpuCtl::init() { } } + // workload group path + _cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + std::to_string(_tg_id); + if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) { + int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU); + if (ret != 0) { + LOG(ERROR) << "cgroup v1 mkdir workload group failed, path=" << _cgroup_v1_cpu_tg_path; + return Status::InternalError("cgroup v1 mkdir workload group failed, path=", + _cgroup_v1_cpu_tg_path); + } + } + // quota path - _cgroup_v1_cpu_query_quota_path = _cgroup_v1_cpu_query_path + "/cpu.cfs_quota_us"; + _cgroup_v1_cpu_tg_quota_file = _cgroup_v1_cpu_tg_path + "/cpu.cfs_quota_us"; // task path - _cgroup_v1_cpu_query_task_path = _cgroup_v1_cpu_query_path + "/tasks"; + _cgroup_v1_cpu_tg_task_file = _cgroup_v1_cpu_tg_path + "/tasks"; LOG(INFO) << "cgroup v1 cpu path init success" - << ", query path=" << _cgroup_v1_cpu_query_path - << ", query quota path=" << _cgroup_v1_cpu_query_quota_path - << ", query tasks path=" << _cgroup_v1_cpu_query_task_path + << ", query tg path=" << _cgroup_v1_cpu_tg_path + << ", query tg quota file path=" << _cgroup_v1_cpu_tg_quota_file + << ", query tg tasks file path=" << _cgroup_v1_cpu_tg_task_file << ", core num=" << _cpu_core_num; _init_succ = true; return Status::OK(); @@ -102,7 +113,7 @@ Status CgroupV1CpuCtl::init() { Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) { int val = _cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100; std::string msg = "modify cpu quota value to " + std::to_string(val); - return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_query_quota_path, val, msg, false); + return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_quota_file, val, msg, false); } Status CgroupV1CpuCtl::add_thread_to_cgroup() { @@ -112,6 +123,6 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() { int tid = static_cast(syscall(SYS_gettid)); std::string msg = "add thread " + std::to_string(tid) + " to group"; std::lock_guard w_lock(_lock_mutex); - return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_query_task_path, tid, msg, true); + return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid, msg, true); } } // namespace doris diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h index 21eb367e44..c3a3066014 100644 --- a/be/src/agent/cgroup_cpu_ctl.h +++ b/be/src/agent/cgroup_cpu_ctl.h @@ -30,8 +30,8 @@ namespace doris { class CgroupCpuCtl { public: - CgroupCpuCtl() {} - virtual ~CgroupCpuCtl() {} + virtual ~CgroupCpuCtl() = default; + CgroupCpuCtl(uint64_t tg_id) { _tg_id = tg_id; } virtual Status init(); @@ -50,6 +50,7 @@ protected: uint64_t _cpu_hard_limit = 0; std::shared_mutex _lock_mutex; bool _init_succ = false; + uint64_t _tg_id; // workload group id }; /* @@ -66,23 +67,27 @@ protected: 4 doris query path /sys/fs/cgroup/cpu/{doris_home}/query - 5 doris query quota file: - /sys/fs/cgroup/cpu/{doris_home}/query/cpu.cfs_quota_us + 5 workload group path + /sys/fs/cgroup/cpu/{doris_home}/query/{workload group id} - 6 doris query tasks file: - /sys/fs/cgroup/cpu/{doris_home}/query/tasks + 6 workload group quota file: + /sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}/cpu.cfs_quota_us + + 7 workload group tasks file: + /sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}/tasks */ class CgroupV1CpuCtl : public CgroupCpuCtl { public: + CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {} Status init() override; Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override; Status add_thread_to_cgroup() override; private: - // todo(wb) support load/compaction path std::string _cgroup_v1_cpu_query_path; - std::string _cgroup_v1_cpu_query_quota_path; - std::string _cgroup_v1_cpu_query_task_path; + std::string _cgroup_v1_cpu_tg_path; // workload group path + std::string _cgroup_v1_cpu_tg_quota_file; + std::string _cgroup_v1_cpu_tg_task_file; }; } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index a9eedb3a48..22a533c233 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -687,7 +687,9 @@ Status PipelineFragmentContext::submit() { int submit_tasks = 0; Status st; auto* scheduler = _exec_env->pipeline_task_scheduler(); - if (_task_group_entity) { + if (_query_ctx->get_task_scheduler()) { + scheduler = _query_ctx->get_task_scheduler(); + } else if (_task_group_entity) { scheduler = _exec_env->pipeline_task_group_scheduler(); } for (auto& task : _tasks) { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 1f517d6c94..417fab35b9 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -193,6 +193,7 @@ public: taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const; void set_task_queue(TaskQueue* task_queue); + TaskQueue* get_task_queue() { return _task_queue; } static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL; static constexpr auto THREAD_TIME_SLICE_US = 100000L; // 100ms diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 1a44c57a74..c58e4c5cf0 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -45,14 +45,12 @@ namespace doris::pipeline { -BlockedTaskScheduler::BlockedTaskScheduler(std::shared_ptr task_queue) - : _task_queue(std::move(task_queue)), _started(false), _shutdown(false) {} +BlockedTaskScheduler::BlockedTaskScheduler() : _started(false), _shutdown(false) {} -Status BlockedTaskScheduler::start() { +Status BlockedTaskScheduler::start(std::string sche_name) { LOG(INFO) << "BlockedTaskScheduler start"; RETURN_IF_ERROR(Thread::create( - "BlockedTaskScheduler", "schedule_blocked_pipeline", [this]() { this->_schedule(); }, - &_thread)); + "BlockedTaskScheduler", sche_name, [this]() { this->_schedule(); }, &_thread)); while (!this->_started.load()) { std::this_thread::sleep_for(std::chrono::milliseconds(5)); } @@ -185,7 +183,7 @@ void BlockedTaskScheduler::_make_task_run(std::list& local_tasks, auto task = *task_itr; task->set_state(t_state); local_tasks.erase(task_itr++); - static_cast(_task_queue->push_back(task)); + static_cast(task->get_task_queue()->push_back(task)); } TaskScheduler::~TaskScheduler() { @@ -207,7 +205,7 @@ Status TaskScheduler::start() { RETURN_IF_ERROR( _fix_thread_pool->submit_func(std::bind(&TaskScheduler::_do_work, this, i))); } - return _blocked_task_scheduler->start(); + return Status::OK(); } Status TaskScheduler::schedule_task(PipelineTask* task) { diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 943b11a8a2..9b85ec420e 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -46,17 +46,15 @@ namespace doris::pipeline { class BlockedTaskScheduler { public: - explicit BlockedTaskScheduler(std::shared_ptr task_queue); + explicit BlockedTaskScheduler(); ~BlockedTaskScheduler() = default; - Status start(); + Status start(std::string sche_name); void shutdown(); Status add_blocked_task(PipelineTask* task); private: - std::shared_ptr _task_queue; - std::mutex _task_mutex; std::condition_variable _task_cond; std::list _blocked_tasks; diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index c1e1e77f87..c6aa8216d9 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -45,7 +45,8 @@ class DeltaWriterV2Pool; } // namespace vectorized namespace pipeline { class TaskScheduler; -} +class BlockedTaskScheduler; +} // namespace pipeline namespace taskgroup { class TaskGroupManager; } @@ -268,7 +269,9 @@ public: return _inverted_index_query_cache; } - CgroupCpuCtl* get_cgroup_cpu_ctl() { return _cgroup_cpu_ctl.get(); } + std::shared_ptr get_global_block_scheduler() { + return _global_block_scheduler; + } private: ExecEnv(); @@ -375,7 +378,12 @@ private: segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = nullptr; segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr; - std::unique_ptr _cgroup_cpu_ctl = nullptr; + // used for query with group cpu hard limit + std::shared_ptr _global_block_scheduler; + // used for query without workload group + std::shared_ptr _without_group_block_scheduler; + // used for query with workload group cpu soft limit + std::shared_ptr _with_group_block_scheduler; }; template <> diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index d9f7f31f7d..c526c836ef 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -280,29 +280,23 @@ Status ExecEnv::init_pipeline_task_scheduler() { executors_size = CpuInfo::num_cores(); } - if (!config::doris_cgroup_cpu_path.empty()) { - _cgroup_cpu_ctl = std::make_unique(); - Status ret = _cgroup_cpu_ctl->init(); - if (!ret.ok()) { - LOG(ERROR) << "init cgroup cpu controller failed"; - } - } else { - LOG(INFO) << "cgroup cpu controller is not inited"; - } - // TODO pipeline task group combie two blocked schedulers. auto t_queue = std::make_shared(executors_size); - auto b_scheduler = std::make_shared(t_queue); - _pipeline_task_scheduler = new pipeline::TaskScheduler(this, b_scheduler, t_queue, - "WithoutGroupTaskSchePool", nullptr); + _without_group_block_scheduler = std::make_shared(); + _pipeline_task_scheduler = new pipeline::TaskScheduler( + this, _without_group_block_scheduler, t_queue, "WithoutGroupTaskSchePool", nullptr); RETURN_IF_ERROR(_pipeline_task_scheduler->start()); + RETURN_IF_ERROR(_without_group_block_scheduler->start("WithoutGroupBlockSche")); auto tg_queue = std::make_shared(executors_size); - auto tg_b_scheduler = std::make_shared(tg_queue); + _with_group_block_scheduler = std::make_shared(); _pipeline_task_group_scheduler = new pipeline::TaskScheduler( - this, tg_b_scheduler, tg_queue, "WithGroupTaskSchePool", _cgroup_cpu_ctl.get()); + this, _with_group_block_scheduler, tg_queue, "WithGroupTaskSchePool", nullptr); RETURN_IF_ERROR(_pipeline_task_group_scheduler->start()); + RETURN_IF_ERROR(_with_group_block_scheduler->start("WithGroupBlockSche")); + _global_block_scheduler = std::make_shared(); + RETURN_IF_ERROR(_global_block_scheduler->start("GlobalBlockSche")); return Status::OK(); } @@ -547,6 +541,7 @@ void ExecEnv::destroy() { SAFE_STOP(_routine_load_task_executor); SAFE_STOP(_pipeline_task_scheduler); SAFE_STOP(_pipeline_task_group_scheduler); + SAFE_STOP(_task_group_manager); SAFE_STOP(_external_scan_context_mgr); SAFE_STOP(_fragment_mgr); // NewLoadStreamMgr should be destoried before storage_engine & after fragment_mgr stopped. @@ -644,6 +639,10 @@ void ExecEnv::destroy() { // info is deconstructed then BE process will core at coordinator back method in fragment mgr. SAFE_DELETE(_master_info); + SAFE_SHUTDOWN(_global_block_scheduler.get()); + SAFE_SHUTDOWN(_without_group_block_scheduler.get()); + SAFE_SHUTDOWN(_with_group_block_scheduler.get()); + LOG(INFO) << "Doris exec envorinment is destoried."; } diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index e3be7b9483..71297e0d4c 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -640,21 +640,29 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo if constexpr (std::is_same_v) { if (params.__isset.workload_groups && !params.workload_groups.empty()) { taskgroup::TaskGroupInfo task_group_info; - int query_cpu_hard_limit = -1; - auto status = taskgroup::TaskGroupInfo::parse_group_info( - params.workload_groups[0], &task_group_info, &query_cpu_hard_limit); + auto status = taskgroup::TaskGroupInfo::parse_group_info(params.workload_groups[0], + &task_group_info); if (status.ok()) { auto tg = _exec_env->task_group_manager()->get_or_create_task_group( task_group_info); tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker); - query_ctx->set_task_group(tg); + uint64_t tg_id = tg->id(); + std::string tg_name = tg->name(); LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) << " use task group: " << tg->debug_string() - << " query_cpu_hard_limit: " << query_cpu_hard_limit + << " cpu_hard_limit: " << task_group_info.cpu_hard_limit << " cpu_share:" << task_group_info.cpu_share; - if (query_cpu_hard_limit > 0 && _exec_env->get_cgroup_cpu_ctl() != nullptr) { - _exec_env->get_cgroup_cpu_ctl()->update_cpu_hard_limit( - query_cpu_hard_limit); + if (task_group_info.cpu_hard_limit > 0) { + Status ret = _exec_env->task_group_manager()->create_and_get_task_scheduler( + tg_id, tg_name, task_group_info.cpu_hard_limit, _exec_env, + query_ctx.get()); + if (!ret.ok()) { + LOG(INFO) << "workload group init failed " + << ", name=" << tg_name << ", id=" << tg_id + << ", reason=" << ret.to_string(); + } + } else { + query_ctx->set_task_group(tg); } } } else { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 2791291bf4..24bb52b163 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -225,6 +225,18 @@ public: TUniqueId query_id() const { return _query_id; } + void set_task_scheduler(pipeline::TaskScheduler* task_scheduler) { + _task_scheduler = task_scheduler; + } + + pipeline::TaskScheduler* get_task_scheduler() { return _task_scheduler; } + + void set_scan_task_scheduler(vectorized::SimplifiedScanScheduler* scan_task_scheduler) { + _scan_task_scheduler = scan_task_scheduler; + } + + vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return _scan_task_scheduler; } + public: DescriptorTbl* desc_tbl; bool set_rsc_info = false; @@ -283,6 +295,9 @@ private: // All pipeline tasks use the same query context to report status. So we need a `_exec_status` // to report the real message if failed. Status _exec_status = Status::OK(); + + pipeline::TaskScheduler* _task_scheduler = nullptr; + vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr; }; } // namespace doris diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/task_group/task_group.cpp index 758e543b03..7abc08d3c2 100644 --- a/be/src/runtime/task_group/task_group.cpp +++ b/be/src/runtime/task_group/task_group.cpp @@ -42,8 +42,7 @@ namespace taskgroup { const static std::string CPU_SHARE = "cpu_share"; const static std::string MEMORY_LIMIT = "memory_limit"; const static std::string ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit"; -const static std::string QUERY_CPU_HARD_LIMIT = - "query_cpu_hard_limit"; // sum of all query's cpu_hard_limit +const static std::string CPU_HARD_LIMIT = "cpu_hard_limit"; template TaskGroupEntity::TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type) @@ -202,7 +201,7 @@ void TaskGroup::task_group_info(TaskGroupInfo* tg_info) const { } Status TaskGroupInfo::parse_group_info(const TPipelineWorkloadGroup& resource_group, - TaskGroupInfo* task_group_info, int* query_cpu_hard_limit) { + TaskGroupInfo* task_group_info) { if (UNLIKELY(!check_group_info(resource_group))) { std::stringstream ss; ss << "incomplete resource group parameters: "; @@ -215,14 +214,16 @@ Status TaskGroupInfo::parse_group_info(const TPipelineWorkloadGroup& resource_gr uint64_t share = 0; std::from_chars(iter->second.c_str(), iter->second.c_str() + iter->second.size(), share); - auto iter2 = resource_group.properties.find(QUERY_CPU_HARD_LIMIT); + int cpu_hard_limit = 0; + auto iter2 = resource_group.properties.find(CPU_HARD_LIMIT); std::from_chars(iter2->second.c_str(), iter2->second.c_str() + iter2->second.size(), - *query_cpu_hard_limit); + cpu_hard_limit); task_group_info->id = resource_group.id; task_group_info->name = resource_group.name; task_group_info->version = resource_group.version; task_group_info->cpu_share = share; + task_group_info->cpu_hard_limit = cpu_hard_limit; bool is_percent = true; auto mem_limit_str = resource_group.properties.find(MEMORY_LIMIT)->second; diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index 8dd8b75fd1..1dc634469e 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -168,9 +168,10 @@ struct TaskGroupInfo { int64_t memory_limit; bool enable_memory_overcommit; int64_t version; + int cpu_hard_limit; static Status parse_group_info(const TPipelineWorkloadGroup& resource_group, - TaskGroupInfo* task_group_info, int* query_cpu_hard_limit); + TaskGroupInfo* task_group_info); private: static bool check_group_info(const TPipelineWorkloadGroup& resource_group); diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index 179bf8911a..e6ed60148e 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -59,4 +59,76 @@ void TaskGroupManager::get_resource_groups(const std::function lock(_task_scheduler_lock); + // step 1: init cgroup cpu controller + CgroupCpuCtl* cg_cu_ctl_ptr = nullptr; + if (_cgroup_ctl_map.find(tg_id) == _cgroup_ctl_map.end()) { + std::unique_ptr cgroup_cpu_ctl = std::make_unique(tg_id); + Status ret = cgroup_cpu_ctl->init(); + if (ret.ok()) { + cg_cu_ctl_ptr = cgroup_cpu_ctl.get(); + _cgroup_ctl_map.emplace(tg_id, std::move(cgroup_cpu_ctl)); + } else { + return Status::Error("cgroup init failed, gid={}", tg_id); + } + } + + // step 2: init task scheduler + if (_tg_sche_map.find(tg_id) == _tg_sche_map.end()) { + int32_t executors_size = config::pipeline_executor_size; + if (executors_size <= 0) { + executors_size = CpuInfo::num_cores(); + } + auto task_queue = std::make_shared(executors_size); + + auto pipeline_task_scheduler = std::make_unique( + exec_env, exec_env->get_global_block_scheduler(), std::move(task_queue), + "Exec_" + tg_name, cg_cu_ctl_ptr); + Status ret = pipeline_task_scheduler->start(); + if (ret.ok()) { + _tg_sche_map.emplace(tg_id, std::move(pipeline_task_scheduler)); + } else { + return Status::Error("task scheduler start failed, gid={}", + tg_id); + } + } + + // step 3: init scan scheduler + if (_tg_scan_sche_map.find(tg_id) == _tg_scan_sche_map.end()) { + auto scan_scheduler = + std::make_unique(tg_name, cg_cu_ctl_ptr); + Status ret = scan_scheduler->start(); + if (ret.ok()) { + _tg_scan_sche_map.emplace(tg_id, std::move(scan_scheduler)); + } else { + return Status::Error("scan scheduler start failed, gid={}", + tg_id); + } + } + + // step 4 set exec/scan task scheudler to query ctx + pipeline::TaskScheduler* task_sche = _tg_sche_map.at(tg_id).get(); + query_ctx_ptr->set_task_scheduler(task_sche); + + vectorized::SimplifiedScanScheduler* scan_task_sche = _tg_scan_sche_map.at(tg_id).get(); + query_ctx_ptr->set_scan_task_scheduler(scan_task_sche); + + // step 5 update cgroup cpu if needed + _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit); + + return Status::OK(); +} + +void TaskGroupManager::stop() { + for (auto& task_sche : _tg_sche_map) { + task_sche.second->stop(); + } + for (auto& task_sche : _tg_scan_sche_map) { + task_sche.second->stop(); + } +} + } // namespace doris::taskgroup diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h index baa6579b15..e45cdeca7e 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/task_group/task_group_manager.h @@ -21,10 +21,24 @@ #include #include +#include "pipeline/task_queue.h" +#include "pipeline/task_scheduler.h" #include "task_group.h" namespace doris { class ExecEnv; +class QueryContext; +class CgroupCpuCtl; + +namespace vectorized { +class SimplifiedScanScheduler; +} + +namespace pipeline { +class TaskScheduler; +class MultiCoreTaskQueue; +} // namespace pipeline + namespace taskgroup { class TaskGroupManager { @@ -37,9 +51,21 @@ public: void get_resource_groups(const std::function& pred, std::vector* task_groups); + Status create_and_get_task_scheduler(uint64_t wg_id, std::string wg_name, int cpu_hard_limit, + ExecEnv* exec_env, QueryContext* query_ctx_ptr); + + void stop(); + private: std::shared_mutex _group_mutex; std::unordered_map _task_groups; + + // map for workload group id and task scheduler pool + // used for cpu hard limit + std::mutex _task_scheduler_lock; + std::map> _tg_sche_map; + std::map> _tg_scan_sche_map; + std::map> _cgroup_ctl_map; }; } // namespace taskgroup diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 9363759941..e1c29d569a 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -123,6 +123,10 @@ Status ScannerContext::init() { // 3. get thread token if (_state->get_query_ctx()) { thread_token = _state->get_query_ctx()->get_token(); + _simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler(); + if (_simple_scan_scheduler) { + _should_reset_thread_name = false; + } } #endif diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 07f9f05551..932fd294ff 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -57,6 +57,7 @@ namespace vectorized { class VScanner; class VScanNode; class ScannerScheduler; +class SimplifiedScanScheduler; // ScannerContext is responsible for recording the execution status // of a group of Scanners corresponding to a ScanNode. @@ -164,6 +165,7 @@ public: } taskgroup::TaskGroup* get_task_group() const; + SimplifiedScanScheduler* get_simple_scan_scheduler() { return _simple_scan_scheduler; } void reschedule_scanner_ctx(); @@ -173,6 +175,8 @@ public: ThreadPoolToken* thread_token = nullptr; std::vector _btids; + bool _should_reset_thread_name = true; + private: template Status _close_and_clear_scanners(Parent* parent, RuntimeState* state); @@ -252,6 +256,7 @@ protected: const int64_t _max_bytes_in_queue; doris::vectorized::ScannerScheduler* _scanner_scheduler; + SimplifiedScanScheduler* _simple_scan_scheduler = nullptr; // used for cpu hard limit // List "scanners" saves all "unfinished" scanners. // The scanner scheduler will pop scanners from this list, run scanner, // and then if the scanner is not finished, will be pushed back to this list. diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index b7f05f0ec9..7c0064a1f4 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -136,7 +136,6 @@ Status ScannerScheduler::init(ExecEnv* env) { static_cast(ThreadPoolBuilder("local_scan_group") .set_min_threads(config::doris_scanner_thread_pool_thread_num) .set_max_threads(config::doris_scanner_thread_pool_thread_num) - .set_cgroup_cpu_ctl(env->get_cgroup_cpu_ctl()) .build(&_group_local_scan_thread_pool)); for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) { static_cast(_group_local_scan_thread_pool->submit_func([this] { @@ -237,7 +236,13 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { TabletStorageType type = (*iter)->get_storage_type(); bool ret = false; if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { - if (ctx->get_task_group() && config::enable_workload_group_for_scan) { + if (auto* scan_sche = ctx->get_simple_scan_scheduler()) { + auto work_func = [this, scanner = *iter, ctx] { + this->_scanner_scan(this, ctx, scanner); + }; + SimplifiedScanTask simple_scan_task = {work_func, ctx}; + ret = scan_sche->get_scan_queue()->try_put(simple_scan_task); + } else if (ctx->get_task_group() && config::enable_workload_group_for_scan) { auto work_func = [this, scanner = *iter, ctx] { this->_scanner_scan(this, ctx, scanner); }; @@ -312,10 +317,13 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* ctx) { void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext* ctx, VScannerSPtr scanner) { SCOPED_ATTACH_TASK(scanner->runtime_state()); + // for cpu hard limit, thread name should not be reset #if !defined(USE_BTHREAD_SCANNER) - Thread::set_self_name("_scanner_scan"); + if (ctx->_should_reset_thread_name) { + Thread::set_self_name("_scanner_scan"); + } #else - if (dynamic_cast(scanner) == nullptr) { + if (dynamic_cast(scanner) == nullptr && ctx->_should_reset_thread_name) { Thread::set_self_name("_scanner_scan"); } #endif diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 3a198b6c74..ad6f86c4f1 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -121,4 +121,64 @@ private: config::doris_scanner_thread_pool_thread_num + config::pipeline_executor_size; }; +struct SimplifiedScanTask { + SimplifiedScanTask() = default; + SimplifiedScanTask(std::function scan_func, + vectorized::ScannerContext* scanner_context) { + this->scan_func = scan_func; + this->scanner_context = scanner_context; + } + + std::function scan_func; + vectorized::ScannerContext* scanner_context; +}; + +// used for cpu hard limit +class SimplifiedScanScheduler { +public: + SimplifiedScanScheduler(std::string wg_name, CgroupCpuCtl* cgroup_cpu_ctl) { + _scan_task_queue = std::make_unique>( + config::doris_scanner_thread_pool_queue_size); + _is_stop.store(false); + _cgroup_cpu_ctl = cgroup_cpu_ctl; + _wg_name = wg_name; + } + + void stop() { + _is_stop.store(true); + _scan_thread_pool->shutdown(); + _scan_thread_pool->wait(); + } + + Status start() { + RETURN_IF_ERROR(ThreadPoolBuilder("Scan_" + _wg_name) + .set_min_threads(config::doris_scanner_thread_pool_thread_num) + .set_max_threads(config::doris_scanner_thread_pool_thread_num) + .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) + .build(&_scan_thread_pool)); + + for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) { + RETURN_IF_ERROR(_scan_thread_pool->submit_func([this] { this->_work(); })); + } + return Status::OK(); + } + + BlockingQueue* get_scan_queue() { return _scan_task_queue.get(); } + +private: + void _work() { + while (!_is_stop.load()) { + SimplifiedScanTask scan_task; + _scan_task_queue->blocking_get(&scan_task); + scan_task.scan_func(); + } + } + + std::unique_ptr _scan_thread_pool; + std::unique_ptr> _scan_task_queue; + std::atomic _is_stop; + CgroupCpuCtl* _cgroup_cpu_ctl; + std::string _wg_name; +}; + } // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 3f0053237a..1ec47d053f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -72,11 +72,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { private final ResourceProcNode procNode = new ResourceProcNode(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - public static final String QUERY_CPU_HARD_LIMIT = "query_cpu_hard_limit"; - private int queryCPUHardLimit = 0; - // works when user not set cpu hard limit, we fill a default value - private int cpuHardLimitDefaultVal = 0; - public WorkloadGroupMgr() { } @@ -124,19 +119,13 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { throw new UserException("Workload group " + groupName + " does not exist"); } workloadGroups.add(workloadGroup.toThrift()); - // note(wb) -1 to tell be no need to update cgroup - int thriftVal = -1; - if (Config.enable_cpu_hard_limit) { - // reset cpu_share according to cpu hard limit - int cpuHardLimitShare = workloadGroup.getCpuHardLimit() == 0 - ? this.cpuHardLimitDefaultVal : workloadGroup.getCpuHardLimit(); - workloadGroups.get(0).getProperties() - .put(WorkloadGroup.CPU_SHARE, String.valueOf(cpuHardLimitShare)); - - // reset sum of all groups cpu hard limit - thriftVal = this.queryCPUHardLimit; + // note(wb) -1 to tell be no need to not use cpu hard limit + int cpuHardLimitThriftVal = -1; + if (Config.enable_cpu_hard_limit && workloadGroup.getCpuHardLimit() > 0) { + cpuHardLimitThriftVal = workloadGroup.getCpuHardLimit(); } - workloadGroups.get(0).getProperties().put(QUERY_CPU_HARD_LIMIT, String.valueOf(thriftVal)); + workloadGroups.get(0).getProperties().put(WorkloadGroup.CPU_HARD_LIMIT, + String.valueOf(cpuHardLimitThriftVal)); context.setWorkloadGroupName(groupName); } finally { readUnlock(); @@ -213,7 +202,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { checkGlobalUnlock(workloadGroup, null); nameToWorkloadGroup.put(workloadGroupName, workloadGroup); idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup); - calQueryCPUHardLimit(); Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(workloadGroup); } finally { writeUnlock(); @@ -240,44 +228,20 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { + " value can not be greater than 100% or less than or equal 0%"); } - // 2, calculate new query hard cpu limit - int tmpCpuHardLimit = 0; - int zeroCpuHardLimitCount = 0; + // 2, check sum of all cpu hard limit + int sumOfAllCpuHardLimit = 0; for (Map.Entry entry : idToWorkloadGroup.entrySet()) { if (old != null && entry.getKey() == old.getId()) { continue; } - int cpuHardLimit = entry.getValue().getCpuHardLimit(); - if (cpuHardLimit == 0) { - zeroCpuHardLimitCount++; - } - tmpCpuHardLimit += cpuHardLimit; + sumOfAllCpuHardLimit += entry.getValue().getCpuHardLimit(); } - if (newGroupCpuHardLimit == 0) { - zeroCpuHardLimitCount++; - } - tmpCpuHardLimit += newGroupCpuHardLimit; + sumOfAllCpuHardLimit += newGroupCpuHardLimit; - if (tmpCpuHardLimit > 100) { + if (sumOfAllCpuHardLimit > 100) { throw new DdlException("sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT + " can not be greater than 100% "); } - - if (tmpCpuHardLimit == 100 && zeroCpuHardLimitCount > 0) { - throw new DdlException("some workload group may not be assigned " - + "cpu hard limit but all query cpu hard limit exceeds 100%"); - } - - int leftCpuHardLimitVal = 100 - tmpCpuHardLimit; - if (zeroCpuHardLimitCount != 0) { - int tmpCpuHardLimitDefaultVal = leftCpuHardLimitVal / zeroCpuHardLimitCount; - if (tmpCpuHardLimitDefaultVal == 0) { - throw new DdlException("remaining cpu can not be assigned to the " - + "workload group without cpu hard limit value; " - + leftCpuHardLimitVal + "%," + newGroupCpuHardLimit - + "%," + zeroCpuHardLimitCount); - } - } } public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws DdlException { @@ -296,7 +260,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { checkGlobalUnlock(newWorkloadGroup, workloadGroup); nameToWorkloadGroup.put(workloadGroupName, newWorkloadGroup); idToWorkloadGroup.put(newWorkloadGroup.getId(), newWorkloadGroup); - calQueryCPUHardLimit(); Env.getCurrentEnv().getEditLog().logAlterWorkloadGroup(newWorkloadGroup); } finally { writeUnlock(); @@ -331,7 +294,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { long groupId = workloadGroup.getId(); idToWorkloadGroup.remove(groupId); nameToWorkloadGroup.remove(workloadGroupName); - calQueryCPUHardLimit(); Env.getCurrentEnv().getEditLog().logDropWorkloadGroup(new DropWorkloadGroupOperationLog(groupId)); } finally { writeUnlock(); @@ -344,7 +306,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { try { nameToWorkloadGroup.put(workloadGroup.getName(), workloadGroup); idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup); - calQueryCPUHardLimit(); } finally { writeUnlock(); } @@ -377,7 +338,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { WorkloadGroup workloadGroup = idToWorkloadGroup.get(id); nameToWorkloadGroup.remove(workloadGroup.getName()); idToWorkloadGroup.remove(id); - calQueryCPUHardLimit(); } finally { writeUnlock(); } @@ -403,21 +363,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { return idToWorkloadGroup; } - private void calQueryCPUHardLimit() { - int zeroCpuHardLimitCount = 0; - int ret = 0; - for (Map.Entry entry : idToWorkloadGroup.entrySet()) { - if (entry.getValue().getCpuHardLimit() == 0) { - zeroCpuHardLimitCount++; - } - ret += entry.getValue().getCpuHardLimit(); - } - this.queryCPUHardLimit = ret; - if (zeroCpuHardLimitCount != 0) { - this.cpuHardLimitDefaultVal = (100 - this.queryCPUHardLimit) / zeroCpuHardLimitCount; - } - } - @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); @@ -428,7 +373,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { public void gsonPostProcess() throws IOException { idToWorkloadGroup.forEach( (id, workloadGroup) -> nameToWorkloadGroup.put(workloadGroup.getName(), workloadGroup)); - calQueryCPUHardLimit(); } public class ResourceProcNode {