From be7273da83e12bdc2d2c2d3d2acf9e05f20268dc Mon Sep 17 00:00:00 2001 From: wangbo Date: Sat, 18 Nov 2023 11:19:38 +0800 Subject: [PATCH] [refactor](executor)Refactor workload meta update to be #26710 --- be/src/agent/cgroup_cpu_ctl.cpp | 26 +++-- be/src/agent/cgroup_cpu_ctl.h | 3 + be/src/agent/topic_listener.h | 2 +- be/src/agent/topic_subscriber.cpp | 7 +- be/src/agent/workload_group_listener.cpp | 40 ++++++-- be/src/agent/workload_group_listener.h | 2 +- be/src/common/config.cpp | 4 +- be/src/common/config.h | 4 +- be/src/runtime/fragment_mgr.cpp | 53 ++++------ be/src/runtime/task_group/task_group.cpp | 97 +++++++++++++------ be/src/runtime/task_group/task_group.h | 12 ++- .../runtime/task_group/task_group_manager.cpp | 73 +++++++++----- .../runtime/task_group/task_group_manager.h | 14 ++- .../publish/WorkloadGroupPublisher.java | 8 +- .../resource/workloadgroup/WorkloadGroup.java | 47 ++++++--- .../workloadgroup/WorkloadGroupMgr.java | 7 -- .../workloadgroup/WorkloadGroupMgrTest.java | 16 +-- gensrc/thrift/BackendService.thrift | 17 +++- 18 files changed, 277 insertions(+), 155 deletions(-) diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp index d16a32b7be..ac09725cb2 100644 --- a/be/src/agent/cgroup_cpu_ctl.cpp +++ b/be/src/agent/cgroup_cpu_ctl.cpp @@ -25,14 +25,14 @@ Status CgroupCpuCtl::init() { _doris_cgroup_cpu_path = config::doris_cgroup_cpu_path; if (_doris_cgroup_cpu_path.empty()) { LOG(INFO) << "doris cgroup cpu path is not specify, path=" << _doris_cgroup_cpu_path; - return Status::InternalError("doris cgroup cpu path {} is not specify.", - _doris_cgroup_cpu_path); + return Status::InternalError("doris cgroup cpu path {} is not specify.", + _doris_cgroup_cpu_path); } if (access(_doris_cgroup_cpu_path.c_str(), F_OK) != 0) { LOG(ERROR) << "doris cgroup cpu path not exists, path=" << _doris_cgroup_cpu_path; - return Status::InternalError("doris cgroup cpu path {} not exists.", - _doris_cgroup_cpu_path); + return Status::InternalError("doris cgroup cpu path {} not exists.", + _doris_cgroup_cpu_path); } if (_doris_cgroup_cpu_path.back() != '/') { @@ -41,6 +41,12 @@ Status CgroupCpuCtl::init() { return Status::OK(); } +void CgroupCpuCtl::get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t* cpu_hard_limit) { + std::lock_guard w_lock(_lock_mutex); + *cpu_shares = this->_cpu_shares; + *cpu_hard_limit = this->_cpu_hard_limit; +} + void CgroupCpuCtl::update_cpu_hard_limit(int cpu_hard_limit) { if (!_init_succ) { return; @@ -72,14 +78,14 @@ Status CgroupCpuCtl::write_cg_sys_file(std::string file_path, int value, std::st int fd = open(file_path.c_str(), is_append ? O_RDWR | O_APPEND : O_RDWR); if (fd == -1) { LOG(ERROR) << "open path failed, path=" << file_path; - return Status::InternalError("open path failed, path={}", file_path); + return Status::InternalError("open path failed, path={}", file_path); } auto str = fmt::format("{}\n", value); int ret = write(fd, str.c_str(), str.size()); if (ret == -1) { LOG(ERROR) << msg << " write sys file failed"; - return Status::InternalError("{} write sys file failed", msg); + return Status::InternalError("{} write sys file failed", msg); } LOG(INFO) << msg << " success"; return Status::OK(); @@ -94,8 +100,8 @@ Status CgroupV1CpuCtl::init() { int ret = mkdir(_cgroup_v1_cpu_query_path.c_str(), S_IRWXU); if (ret != 0) { LOG(ERROR) << "cgroup v1 mkdir query failed, path=" << _cgroup_v1_cpu_query_path; - return Status::InternalError("cgroup v1 mkdir query failed, path=", - _cgroup_v1_cpu_query_path); + return Status::InternalError("cgroup v1 mkdir query failed, path=", + _cgroup_v1_cpu_query_path); } } @@ -105,8 +111,8 @@ Status CgroupV1CpuCtl::init() { int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU); if (ret != 0) { LOG(ERROR) << "cgroup v1 mkdir workload group failed, path=" << _cgroup_v1_cpu_tg_path; - return Status::InternalError("cgroup v1 mkdir workload group failed, path=", - _cgroup_v1_cpu_tg_path); + return Status::InternalError("cgroup v1 mkdir workload group failed, path=", + _cgroup_v1_cpu_tg_path); } } diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h index b98e268da0..4d78ca82ab 100644 --- a/be/src/agent/cgroup_cpu_ctl.h +++ b/be/src/agent/cgroup_cpu_ctl.h @@ -47,6 +47,9 @@ public: void update_cpu_soft_limit(int cpu_shares); + // for log + void get_cgroup_cpu_info(uint64_t* cpu_shares, uint64_t* cpu_hard_limit); + protected: Status write_cg_sys_file(std::string file_path, int value, std::string msg, bool is_append); diff --git a/be/src/agent/topic_listener.h b/be/src/agent/topic_listener.h index af99a78545..40cf0ba2a2 100644 --- a/be/src/agent/topic_listener.h +++ b/be/src/agent/topic_listener.h @@ -25,6 +25,6 @@ class TopicListener { public: virtual ~TopicListener() {} - virtual void handle_topic_info(const TPublishTopicRequest& topic_request) = 0; + virtual void handle_topic_info(const std::vector& topic_info_list) = 0; }; } // namespace doris diff --git a/be/src/agent/topic_subscriber.cpp b/be/src/agent/topic_subscriber.cpp index c3bcc29c62..c29533bf61 100644 --- a/be/src/agent/topic_subscriber.cpp +++ b/be/src/agent/topic_subscriber.cpp @@ -42,8 +42,11 @@ void TopicSubscriber::handle_topic_info(const TPublishTopicRequest& topic_reques std::shared_lock lock(_listener_mtx); LOG(INFO) << "begin handle topic info"; for (auto& listener_pair : _registered_listeners) { - listener_pair.second->handle_topic_info(topic_request); - LOG(INFO) << "handle topic " << listener_pair.first << " succ"; + if (topic_request.topic_map.find(listener_pair.first) != topic_request.topic_map.end()) { + listener_pair.second->handle_topic_info( + topic_request.topic_map.at(listener_pair.first)); + LOG(INFO) << "handle topic " << listener_pair.first << " succ"; + } } } } // namespace doris diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp index bf27861c28..bc1d3294f6 100644 --- a/be/src/agent/workload_group_listener.cpp +++ b/be/src/agent/workload_group_listener.cpp @@ -24,18 +24,44 @@ namespace doris { -void WorkloadGroupListener::handle_topic_info(const TPublishTopicRequest& topic_request) { +void WorkloadGroupListener::handle_topic_info(const std::vector& topic_info_list) { std::set current_wg_ids; - for (const TopicInfo& topic_info : topic_request.topic_list) { - if (topic_info.topic_type != doris::TTopicInfoType::type::WORKLOAD_GROUP) { + for (const TopicInfo& topic_info : topic_info_list) { + if (!topic_info.__isset.workload_group_info) { continue; } - int wg_id = 0; - auto iter2 = topic_info.info_map.find("id"); - std::from_chars(iter2->second.c_str(), iter2->second.c_str() + iter2->second.size(), wg_id); + // 1 parse topicinfo to group info + taskgroup::TaskGroupInfo task_group_info; + Status ret = taskgroup::TaskGroupInfo::parse_topic_info(topic_info.workload_group_info, + &task_group_info); + if (!ret.ok()) { + LOG(INFO) << "parse topic info failed, tg_id=" << task_group_info.id + << ", reason:" << ret.to_string(); + continue; + } + current_wg_ids.insert(task_group_info.id); - current_wg_ids.insert(wg_id); + // 2 update task group + auto tg = _exec_env->task_group_manager()->get_or_create_task_group(task_group_info); + + // 3 set cpu soft hard limit switch + _exec_env->task_group_manager()->_enable_cpu_hard_limit.store( + task_group_info.enable_cpu_hard_limit); + + // 4 create and update task scheduler + Status ret2 = + _exec_env->task_group_manager()->upsert_task_scheduler(&task_group_info, _exec_env); + if (!ret2.ok()) { + LOG(WARNING) << "upsert task sche failed, tg_id=" << task_group_info.id + << ", reason=" << ret2.to_string(); + } + + LOG(INFO) << "update task group success, tg info=" << tg->debug_string() + << ", enable_cpu_hard_limit=" + << _exec_env->task_group_manager()->enable_cpu_hard_limit() + << ", cgroup cpu_shares=" << task_group_info.cgroup_cpu_shares + << ", cgroup cpu_hard_limit=" << task_group_info.cgroup_cpu_hard_limit; } _exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids); diff --git a/be/src/agent/workload_group_listener.h b/be/src/agent/workload_group_listener.h index d31b1c4ef6..732f5752e4 100644 --- a/be/src/agent/workload_group_listener.h +++ b/be/src/agent/workload_group_listener.h @@ -29,7 +29,7 @@ public: ~WorkloadGroupListener() {} WorkloadGroupListener(ExecEnv* exec_env) : _exec_env(exec_env) {} - void handle_topic_info(const TPublishTopicRequest& topic_request) override; + void handle_topic_info(const std::vector& topic_info_list) override; private: ExecEnv* _exec_env; diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 870ba5c9b0..5f867429bf 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1090,8 +1090,8 @@ DEFINE_Bool(exit_on_exception, "false"); DEFINE_Bool(enable_flush_file_cache_async, "true"); // cgroup -DEFINE_String(doris_cgroup_cpu_path, ""); -DEFINE_Bool(enable_cgroup_cpu_soft_limit, "false"); +DEFINE_mString(doris_cgroup_cpu_path, ""); +DEFINE_mBool(enable_cgroup_cpu_soft_limit, "true"); DEFINE_Bool(ignore_always_true_predicate_for_segment, "true"); diff --git a/be/src/common/config.h b/be/src/common/config.h index ac560d1f1c..6b76d37387 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1163,8 +1163,8 @@ DECLARE_mInt32(tablet_schema_cache_recycle_interval); DECLARE_mBool(exit_on_exception); // cgroup -DECLARE_String(doris_cgroup_cpu_path); -DECLARE_Bool(enable_cgroup_cpu_soft_limit); +DECLARE_mString(doris_cgroup_cpu_path); +DECLARE_mBool(enable_cgroup_cpu_soft_limit); // This config controls whether the s3 file writer would flush cache asynchronously DECLARE_Bool(enable_flush_file_cache_async); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index a7a89bfd42..210de93a7e 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -654,45 +654,28 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo if constexpr (std::is_same_v) { if (params.__isset.workload_groups && !params.workload_groups.empty()) { - taskgroup::TaskGroupInfo task_group_info; - auto status = taskgroup::TaskGroupInfo::parse_group_info(params.workload_groups[0], - &task_group_info); - if (status.ok()) { - auto tg = _exec_env->task_group_manager()->get_or_create_task_group( - task_group_info); - tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker); - uint64_t tg_id = tg->id(); - std::string tg_name = tg->name(); - LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) - << " use task group: " << tg->debug_string() - << " cpu_hard_limit: " << task_group_info.cpu_hard_limit - << " cpu_share:" << task_group_info.cpu_share - << " enable cgroup soft cpu:" << config::enable_cgroup_cpu_soft_limit; - if (task_group_info.cpu_hard_limit > 0) { - Status ret = _exec_env->task_group_manager()->create_and_get_task_scheduler( - tg_id, tg_name, task_group_info.cpu_hard_limit, - task_group_info.cpu_share, _exec_env, query_ctx.get()); - if (!ret.ok()) { - LOG(INFO) << "workload group init failed " - << ", name=" << tg_name << ", id=" << tg_id - << ", reason=" << ret.to_string(); - } + uint64_t tg_id = params.workload_groups[0].id; + auto* tg_mgr = _exec_env->task_group_manager(); + if (auto task_group_ptr = tg_mgr->get_task_group_by_id(tg_id)) { + std::stringstream ss; + ss << "Query/load id: " << print_id(query_ctx->query_id()); + ss << " use task group " << task_group_ptr->debug_string(); + if (tg_mgr->enable_cpu_soft_limit() && !config::enable_cgroup_cpu_soft_limit) { + query_ctx->set_task_group(task_group_ptr); + ss << ", cpu soft limit based doris sche"; } else { - if (!config::enable_cgroup_cpu_soft_limit) { - query_ctx->set_task_group(tg); + bool ret = tg_mgr->set_task_sche_for_query_ctx(tg_id, query_ctx.get()); + if (tg_mgr->enable_cpu_hard_limit()) { + ss << ", cpu hard limit based cgroup"; } else { - Status ret = - _exec_env->task_group_manager()->create_and_get_task_scheduler( - tg_id, tg_name, task_group_info.cpu_hard_limit, - task_group_info.cpu_share, _exec_env, query_ctx.get()); - if (!ret.ok()) { - LOG(INFO) << "workload group cpu soft limit init failed " - << ", name=" << tg_name << ", id=" << tg_id - << ", reason=" << ret.to_string(); - } + ss << ", cpu soft limit based cgroup"; + } + if (!ret) { + ss << ", but cgroup init failed, scan or exec fallback to no group"; } } - } + LOG(INFO) << ss.str(); + } // else, query run with no group } else { VLOG_DEBUG << "Query/load id: " << print_id(query_ctx->query_id()) << " does not use task group."; diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/task_group/task_group.cpp index 37e4b9ae59..5878e38ed3 100644 --- a/be/src/runtime/task_group/task_group.cpp +++ b/be/src/runtime/task_group/task_group.cpp @@ -113,15 +113,16 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info) _cpu_share(tg_info.cpu_share), _task_entity(this, "pipeline task entity"), _local_scan_entity(this, "local scan entity"), - _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM) {} + _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM), + _cpu_hard_limit(tg_info.cpu_hard_limit) {} std::string TaskGroup::debug_string() const { std::shared_lock rl {_mutex}; return fmt::format( "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, enable_memory_overcommit = " - "{}, version = {}]", + "{}, version = {}, cpu_hard_limit = {}]", _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES), - _enable_memory_overcommit ? "true" : "false", _version); + _enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit()); } void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) { @@ -142,6 +143,7 @@ void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) { _memory_limit = tg_info.memory_limit; _enable_memory_overcommit = tg_info.enable_memory_overcommit; _cpu_share = tg_info.cpu_share; + _cpu_hard_limit = tg_info.cpu_hard_limit; } else { return; } @@ -185,49 +187,80 @@ void TaskGroup::task_group_info(TaskGroupInfo* tg_info) const { tg_info->version = _version; } -Status TaskGroupInfo::parse_group_info(const TPipelineWorkloadGroup& resource_group, - TaskGroupInfo* task_group_info) { - if (UNLIKELY(!check_group_info(resource_group))) { - std::stringstream ss; - ss << "incomplete resource group parameters: "; - resource_group.printTo(ss); - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); +Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& workload_group_info, + taskgroup::TaskGroupInfo* task_group_info) { + // 1 id + int tg_id = 0; + if (workload_group_info.__isset.id) { + tg_id = workload_group_info.id; + } else { + return Status::InternalError("workload group id is required"); } + task_group_info->id = tg_id; - auto iter = resource_group.properties.find(CPU_SHARE); - uint64_t share = 0; - std::from_chars(iter->second.c_str(), iter->second.c_str() + iter->second.size(), share); + // 2 name + std::string name = "INVALID_NAME"; + if (workload_group_info.__isset.name) { + name = workload_group_info.name; + } + task_group_info->name = name; - int cpu_hard_limit = 0; - auto iter2 = resource_group.properties.find(CPU_HARD_LIMIT); - std::from_chars(iter2->second.c_str(), iter2->second.c_str() + iter2->second.size(), - cpu_hard_limit); + // 3 version + int version = 0; + if (workload_group_info.__isset.version) { + version = workload_group_info.version; + } else { + return Status::InternalError("workload group version is required"); + } + task_group_info->version = version; - task_group_info->id = resource_group.id; - task_group_info->name = resource_group.name; - task_group_info->version = resource_group.version; - task_group_info->cpu_share = share; + // 4 cpu_share + uint64_t cpu_share = 1024; + if (workload_group_info.__isset.cpu_share) { + cpu_share = workload_group_info.cpu_share; + } + task_group_info->cpu_share = cpu_share; + + // 5 cpu hard limit + int cpu_hard_limit = -1; + if (workload_group_info.__isset.cpu_hard_limit) { + cpu_hard_limit = workload_group_info.cpu_hard_limit; + } task_group_info->cpu_hard_limit = cpu_hard_limit; + // 6 mem_limit bool is_percent = true; - auto mem_limit_str = resource_group.properties.find(MEMORY_LIMIT)->second; - auto mem_limit = + std::string mem_limit_str; + if (workload_group_info.__isset.mem_limit) { + mem_limit_str = workload_group_info.mem_limit; + } else { + return Status::InternalError("workload group mem_limit is required"); + } + int64_t mem_limit = ParseUtil::parse_mem_spec(mem_limit_str, -1, MemInfo::mem_limit(), &is_percent); if (UNLIKELY(mem_limit <= 0)) { std::stringstream ss; - ss << "parse memory limit from TPipelineWorkloadGroup error, " << MEMORY_LIMIT << ": " - << mem_limit_str; + ss << "parse memory limit error, " << MEMORY_LIMIT << ": " << mem_limit_str; LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); + return Status::InternalError("invalid value for {}, val={}", MEMORY_LIMIT, + mem_limit); } task_group_info->memory_limit = mem_limit; - auto enable_memory_overcommit_iter = resource_group.properties.find(ENABLE_MEMORY_OVERCOMMIT); - task_group_info->enable_memory_overcommit = - enable_memory_overcommit_iter != resource_group.properties.end() && - enable_memory_overcommit_iter->second == - "true" /* fe guarantees it is 'true' or 'false' */; + // 7 mem overcommit + bool enable_memory_overcommit = true; + if (workload_group_info.__isset.enable_memory_overcommit) { + enable_memory_overcommit = workload_group_info.enable_memory_overcommit; + } + task_group_info->enable_memory_overcommit = enable_memory_overcommit; + + // 8 cpu soft limit or hard limit + bool enable_cpu_hard_limit = false; + if (workload_group_info.__isset.enable_cpu_hard_limit) { + enable_cpu_hard_limit = workload_group_info.enable_cpu_hard_limit; + } + task_group_info->enable_cpu_hard_limit = enable_cpu_hard_limit; + return Status::OK(); } diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index a948bf53ec..b3daf58257 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include @@ -106,6 +107,8 @@ public: uint64_t cpu_share() const { return _cpu_share.load(); } + int cpu_hard_limit() const { return _cpu_hard_limit.load(); } + uint64_t id() const { return _id; } std::string name() const { return _name; }; @@ -147,6 +150,7 @@ private: TaskGroupPipelineTaskEntity _task_entity; TaskGroupScanTaskEntity _local_scan_entity; std::vector _mem_tracker_limiter_pool; + std::atomic _cpu_hard_limit; }; using TaskGroupPtr = std::shared_ptr; @@ -159,9 +163,13 @@ struct TaskGroupInfo { bool enable_memory_overcommit; int64_t version; int cpu_hard_limit; + bool enable_cpu_hard_limit; + // log cgroup cpu info + uint64_t cgroup_cpu_shares = 0; + uint64_t cgroup_cpu_hard_limit = 0; - static Status parse_group_info(const TPipelineWorkloadGroup& resource_group, - TaskGroupInfo* task_group_info); + static Status parse_topic_info(const TWorkloadGroupInfo& topic_info, + taskgroup::TaskGroupInfo* task_group_info); private: static bool check_group_info(const TPipelineWorkloadGroup& resource_group); diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index fb94006978..f37ed7ff71 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -59,10 +59,38 @@ void TaskGroupManager::get_resource_groups(const std::function r_lock(_group_mutex); + if (_task_groups.find(tg_id) != _task_groups.end()) { + return _task_groups.at(tg_id); + } + return nullptr; +} + +bool TaskGroupManager::set_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* query_ctx_ptr) { + std::lock_guard lock(_task_scheduler_lock); + if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) { + query_ctx_ptr->set_task_scheduler(_tg_sche_map.at(tg_id).get()); + } else { + return false; + } + + if (_tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) { + query_ctx_ptr->set_scan_task_scheduler(_tg_scan_sche_map.at(tg_id).get()); + } else { + return false; + } + return true; +} + +Status TaskGroupManager::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, + ExecEnv* exec_env) { + uint64_t tg_id = tg_info->id; + std::string tg_name = tg_info->name; + int cpu_hard_limit = tg_info->cpu_hard_limit; + uint64_t cpu_shares = tg_info->cpu_share; + bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit; + std::lock_guard lock(_task_scheduler_lock); // step 1: init cgroup cpu controller CgroupCpuCtl* cg_cu_ctl_ptr = nullptr; @@ -73,7 +101,7 @@ Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri cg_cu_ctl_ptr = cgroup_cpu_ctl.get(); _cgroup_ctl_map.emplace(tg_id, std::move(cgroup_cpu_ctl)); } else { - return Status::Error("cgroup init failed, gid={}", tg_id); + return Status::InternalError("cgroup init failed, gid={}", tg_id); } } @@ -92,8 +120,7 @@ Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri if (ret.ok()) { _tg_sche_map.emplace(tg_id, std::move(pipeline_task_scheduler)); } else { - return Status::Error("task scheduler start failed, gid={}", - tg_id); + return Status::InternalError("task scheduler start failed, gid={}", tg_id); } } @@ -105,27 +132,27 @@ Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, std::stri if (ret.ok()) { _tg_scan_sche_map.emplace(tg_id, std::move(scan_scheduler)); } else { - return Status::Error("scan scheduler start failed, gid={}", - tg_id); + return Status::InternalError("scan scheduler start failed, gid={}", tg_id); } } - // step 4 set exec/scan task scheudler to query ctx - pipeline::TaskScheduler* task_sche = _tg_sche_map.at(tg_id).get(); - query_ctx_ptr->set_task_scheduler(task_sche); - - vectorized::SimplifiedScanScheduler* scan_task_sche = _tg_scan_sche_map.at(tg_id).get(); - query_ctx_ptr->set_scan_task_scheduler(scan_task_sche); - - // step 5 update cgroup cpu if needed - if (cpu_hard_limit > 0) { - _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit); - _cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE); + // step 4 update cgroup cpu if needed + if (enable_cpu_hard_limit) { + if (cpu_hard_limit > 0) { + _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit); + _cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE); + } else { + return Status::InternalError("enable cpu hard limit but value is illegal"); + } } else { - _cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(cpu_shares); - _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit( - CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit + if (config::enable_cgroup_cpu_soft_limit) { + _cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(cpu_shares); + _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit( + CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit + } } + _cgroup_ctl_map.at(tg_id)->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares), + &(tg_info->cgroup_cpu_hard_limit)); return Status::OK(); } diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h index cf44f53544..552cddfe9d 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/task_group/task_group_manager.h @@ -51,14 +51,22 @@ public: void get_resource_groups(const std::function& pred, std::vector* task_groups); - Status create_and_get_task_scheduler(uint64_t wg_id, std::string wg_name, int cpu_hard_limit, - int cpu_shares, ExecEnv* exec_env, - QueryContext* query_ctx_ptr); + Status upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env); void delete_task_group_by_ids(std::set id_set); + TaskGroupPtr get_task_group_by_id(uint64_t tg_id); + void stop(); + std::atomic _enable_cpu_hard_limit = false; + + bool enable_cpu_soft_limit() { return !_enable_cpu_hard_limit.load(); } + + bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); } + + bool set_task_sche_for_query_ctx(uint64_t tg_id, QueryContext* query_ctx_ptr); + private: std::shared_mutex _group_mutex; std::unordered_map _task_groups; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java index 2330700ce7..6c5ce9e4c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/publish/WorkloadGroupPublisher.java @@ -19,7 +19,7 @@ package org.apache.doris.common.publish; import org.apache.doris.catalog.Env; import org.apache.doris.thrift.TPublishTopicRequest; -import org.apache.doris.thrift.TopicInfo; +import org.apache.doris.thrift.TTopicInfoType; public class WorkloadGroupPublisher implements TopicPublisher { @@ -31,9 +31,7 @@ public class WorkloadGroupPublisher implements TopicPublisher { @Override public void getTopicInfo(TPublishTopicRequest req) { - for (TopicInfo topicInfo : env.getWorkloadGroupMgr() - .getPublishTopicInfo()) { - req.addToTopicList(topicInfo); - } + req.putToTopicMap(TTopicInfoType.WORKLOAD_GROUP, + env.getWorkloadGroupMgr().getPublishTopicInfo()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 84a526aa7f..53faa1a913 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -18,6 +18,7 @@ package org.apache.doris.resource.workloadgroup; import org.apache.doris.catalog.Env; +import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; @@ -25,7 +26,7 @@ import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.thrift.TPipelineWorkloadGroup; -import org.apache.doris.thrift.TTopicInfoType; +import org.apache.doris.thrift.TWorkloadGroupInfo; import org.apache.doris.thrift.TopicInfo; import com.google.common.base.Strings; @@ -311,20 +312,44 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { } public TPipelineWorkloadGroup toThrift() { - //note(wb) we need add a new key-value to properties and then transfer it to be, so need a copy here - // see WorkloadGroupMgr.getWorkloadGroup - HashMap clonedHashMap = new HashMap<>(); - clonedHashMap.putAll(properties); - return new TPipelineWorkloadGroup().setId(id).setName(name).setProperties(clonedHashMap).setVersion(version); + return new TPipelineWorkloadGroup().setId(id); } public TopicInfo toTopicInfo() { - HashMap newHashMap = new HashMap<>(); - newHashMap.put("id", String.valueOf(id)); + TWorkloadGroupInfo tWorkloadGroupInfo = new TWorkloadGroupInfo(); + tWorkloadGroupInfo.setId(id); + tWorkloadGroupInfo.setName(name); + tWorkloadGroupInfo.setVersion(version); + + String cpuShareStr = properties.get(CPU_SHARE); + if (cpuShareStr != null) { + tWorkloadGroupInfo.setCpuShare(Long.valueOf(cpuShareStr)); + } + + String cpuHardLimitStr = properties.get(CPU_HARD_LIMIT); + if (cpuHardLimitStr != null) { + tWorkloadGroupInfo.setCpuHardLimit(Integer.valueOf(cpuHardLimitStr)); + } + + String memLimitStr = properties.get(MEMORY_LIMIT); + if (memLimitStr != null) { + tWorkloadGroupInfo.setMemLimit(memLimitStr); + } + String memOvercommitStr = properties.get(ENABLE_MEMORY_OVERCOMMIT); + if (memOvercommitStr != null) { + tWorkloadGroupInfo.setEnableMemoryOvercommit(Boolean.valueOf(memOvercommitStr)); + } + // enable_cpu_hard_limit = true, using cpu hard limit + // enable_cpu_hard_limit = false, using cpu soft limit + tWorkloadGroupInfo.setEnableCpuHardLimit(Config.enable_cpu_hard_limit); + + if (Config.enable_cpu_hard_limit && cpuHardLimit <= 0) { + LOG.warn("enable_cpu_hard_limit=true but cpuHardLimit value not illegal," + + "id=" + id + ",name=" + name); + } + TopicInfo topicInfo = new TopicInfo(); - topicInfo.setTopicType(TTopicInfoType.WORKLOAD_GROUP); - topicInfo.setInfoMap(newHashMap); - topicInfo.setTopicKey(name); + topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo); return topicInfo; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 44c43c19fa..26b11f0cb8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -125,13 +125,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { throw new UserException("Workload group " + groupName + " does not exist"); } workloadGroups.add(workloadGroup.toThrift()); - // note(wb) -1 to tell be no need to not use cpu hard limit - int cpuHardLimitThriftVal = -1; - if (Config.enable_cpu_hard_limit && workloadGroup.getCpuHardLimit() > 0) { - cpuHardLimitThriftVal = workloadGroup.getCpuHardLimit(); - } - workloadGroups.get(0).getProperties().put(WorkloadGroup.CPU_HARD_LIMIT, - String.valueOf(cpuHardLimitThriftVal)); context.setWorkloadGroupName(groupName); } finally { readUnlock(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java index 89062d87cd..e4501fd646 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgrTest.java @@ -30,7 +30,7 @@ import org.apache.doris.mysql.privilege.Auth; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.EditLog; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.thrift.TPipelineWorkloadGroup; +import org.apache.doris.thrift.TopicInfo; import com.google.common.collect.Maps; import mockit.Delegate; @@ -169,11 +169,11 @@ public class WorkloadGroupMgrTest { CreateWorkloadGroupStmt stmt1 = new CreateWorkloadGroupStmt(false, name1, properties1); workloadGroupMgr.createWorkloadGroup(stmt1); context.getSessionVariable().setWorkloadGroup(name1); - List tWorkloadGroups1 = workloadGroupMgr.getWorkloadGroup(context); + List tWorkloadGroups1 = workloadGroupMgr.getPublishTopicInfo(); Assert.assertEquals(1, tWorkloadGroups1.size()); - TPipelineWorkloadGroup tWorkloadGroup1 = tWorkloadGroups1.get(0); - Assert.assertEquals(name1, tWorkloadGroup1.getName()); - Assert.assertTrue(tWorkloadGroup1.getProperties().containsKey(WorkloadGroup.CPU_SHARE)); + TopicInfo tWorkloadGroup1 = tWorkloadGroups1.get(0); + Assert.assertEquals(name1, tWorkloadGroup1.getWorkloadGroupInfo().getName()); + Assert.assertTrue(tWorkloadGroup1.getWorkloadGroupInfo().getCpuShare() == 10); try { context.getSessionVariable().setWorkloadGroup("g2"); @@ -242,9 +242,9 @@ public class WorkloadGroupMgrTest { workloadGroupMgr.alterWorkloadGroup(stmt2); context.getSessionVariable().setWorkloadGroup(name); - List tWorkloadGroups = workloadGroupMgr.getWorkloadGroup(context); + List tWorkloadGroups = workloadGroupMgr.getPublishTopicInfo(); Assert.assertEquals(1, tWorkloadGroups.size()); - TPipelineWorkloadGroup tWorkloadGroup1 = tWorkloadGroups.get(0); - Assert.assertEquals(tWorkloadGroup1.getProperties().get(WorkloadGroup.CPU_SHARE), "5"); + TopicInfo tWorkloadGroup1 = tWorkloadGroups.get(0); + Assert.assertTrue(tWorkloadGroup1.getWorkloadGroupInfo().getCpuShare() == 5); } } diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index d35d6166d3..1f2e8185a6 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -164,14 +164,23 @@ enum TTopicInfoType { WORKLOAD_GROUP } +struct TWorkloadGroupInfo { + 1: optional i64 id + 2: optional string name + 3: optional i64 version + 4: optional i64 cpu_share + 5: optional i32 cpu_hard_limit + 6: optional string mem_limit + 7: optional bool enable_memory_overcommit + 8: optional bool enable_cpu_hard_limit +} + struct TopicInfo { - 1: optional string topic_key - 2: required TTopicInfoType topic_type - 3: optional map info_map + 1: optional TWorkloadGroupInfo workload_group_info } struct TPublishTopicRequest { - 1: required list topic_list + 1: required map> topic_map } struct TPublishTopicResult {