diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 753aef2524..78b0a2c79d 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1104,6 +1104,7 @@ DEFINE_Bool(exit_on_exception, "false"); // cgroup DEFINE_String(doris_cgroup_cpu_path, ""); +DEFINE_Bool(enable_cpu_hard_limit, "false"); // clang-format off #ifdef BE_TEST diff --git a/be/src/common/config.h b/be/src/common/config.h index 3aaed3c006..72ff579a81 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1180,6 +1180,7 @@ DECLARE_mBool(exit_on_exception); // cgroup DECLARE_String(doris_cgroup_cpu_path); +DECLARE_Bool(enable_cpu_hard_limit); #ifdef BE_TEST // test s3 diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index c1257d5cd5..8061253d2e 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -211,14 +211,29 @@ void PipelineTask::set_task_queue(TaskQueue* task_queue) { _task_queue = task_queue; } +void PipelineTask::yield() { + int64_t time_spent = 0; + Defer defer {[&]() { + time_spent = time_spent * _core_num / _total_query_thread_num; + _task_queue->update_statistics(this, time_spent); + }}; + SCOPED_RAW_TIMER(&time_spent); + usleep(THREAD_TIME_SLICE_US); +} + Status PipelineTask::execute(bool* eos) { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_CPU_TIMER(_task_cpu_timer); SCOPED_TIMER(_exec_timer); SCOPED_ATTACH_TASK(_state); int64_t time_spent = 0; + + // todo(wb) use a more lightweight timer + RuntimeProfile::Counter tmp_timer(TUnit::TIME_NS); + Defer defer {[&]() { if (_task_queue) { + time_spent = tmp_timer.value(); _task_queue->update_statistics(this, time_spent); } }}; @@ -226,7 +241,7 @@ Status PipelineTask::execute(bool* eos) { *eos = false; if (!_opened) { { - SCOPED_RAW_TIMER(&time_spent); + SCOPED_CPU_TIMER(&tmp_timer); auto st = _open(); if (st.is()) { set_state(PipelineTaskState::BLOCKED_FOR_RF); @@ -261,12 +276,12 @@ Status PipelineTask::execute(bool* eos) { set_state(PipelineTaskState::BLOCKED_FOR_SINK); break; } - if (time_spent > THREAD_TIME_SLICE) { + if (tmp_timer.value() > THREAD_TIME_SLICE) { COUNTER_UPDATE(_yield_counts, 1); break; } // TODO llj: Pipeline entity should_yield - SCOPED_RAW_TIMER(&time_spent); + SCOPED_CPU_TIMER(&tmp_timer); _block->clear_column_data(_root->row_desc().num_materialized_slots()); auto* block = _block.get(); @@ -445,6 +460,9 @@ std::string PipelineTask::debug_string() { } taskgroup::TaskGroupPipelineTaskEntity* PipelineTask::get_task_group_entity() const { + if (_is_empty_task) { + return _empty_group_entity; + } return _fragment_context->get_task_group_entity(); } diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 8a31fae647..1f517d6c94 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -117,6 +117,8 @@ public: PipelineFragmentContext* fragment_context, RuntimeProfile* parent_profile); virtual ~PipelineTask() = default; + PipelineTask() = default; + virtual Status prepare(RuntimeState* state); virtual Status execute(bool* eos); @@ -193,6 +195,7 @@ public: void set_task_queue(TaskQueue* task_queue); static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL; + static constexpr auto THREAD_TIME_SLICE_US = 100000L; // 100ms // 1 used for update priority queue // note(wb) an ugly implementation, need refactor later @@ -246,6 +249,17 @@ public: TUniqueId instance_id() const { return _state->fragment_instance_id(); } + void set_empty_task(bool is_empty_task) { _is_empty_task = is_empty_task; } + + bool is_empty_task() const { return _is_empty_task; } + + void yield(); + + void set_task_group_entity( + taskgroup::TaskGroupEntity>* empty_group_entity) { + _empty_group_entity = empty_group_entity; + } + protected: void _finish_p_dependency() { for (const auto& p : _pipeline->_parents) { @@ -285,6 +299,12 @@ protected: bool _try_close_flag = false; + bool _is_empty_task = false; + taskgroup::TaskGroupEntity>* _empty_group_entity; + int _core_num = CpuInfo::num_cores(); + int _total_query_thread_num = + config::doris_scanner_thread_pool_thread_num + config::pipeline_executor_size; + RuntimeProfile* _parent_profile; std::unique_ptr _task_profile; RuntimeProfile::Counter* _task_cpu_timer; diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index a68a2ba4a7..366e299f7b 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -222,9 +222,17 @@ bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()( } TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size) - : TaskQueue(core_size), _min_tg_entity(nullptr) {} + : TaskQueue(core_size), _min_tg_entity(nullptr) { + _empty_pip_task->set_empty_task(true); + _empty_pip_task->set_task_queue(this); + _empty_pip_task->set_task_group_entity(_empty_group_entity); + _empty_group_entity->set_empty_group_entity(true); +} -TaskGroupTaskQueue::~TaskGroupTaskQueue() = default; +TaskGroupTaskQueue::~TaskGroupTaskQueue() { + delete _empty_group_entity; + delete _empty_pip_task; +} void TaskGroupTaskQueue::close() { std::unique_lock lock(_rs_mutex); @@ -248,6 +256,9 @@ Status TaskGroupTaskQueue::_push_back(PipelineTask* task) { entity->task_queue()->emplace(task); if (_group_entities.find(entity) == _group_entities.end()) { _enqueue_task_group(entity); + if (_enable_cpu_hard_limit) { + reset_empty_group_entity(); + } } _wait_task.notify_one(); return Status::OK(); @@ -270,9 +281,15 @@ PipelineTask* TaskGroupTaskQueue::take(size_t core_id) { } } } + if (entity->is_empty_group_entity()) { + return _empty_pip_task; + } DCHECK(entity->task_size() > 0); if (entity->task_size() == 1) { _dequeue_task_group(entity); + if (_enable_cpu_hard_limit) { + reset_empty_group_entity(); + } } auto task = entity->task_queue()->front(); if (task) { @@ -374,5 +391,30 @@ void TaskGroupTaskQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo& tas } } +void TaskGroupTaskQueue::reset_empty_group_entity() { + int user_g_cpu_hard_limit = 0; + bool contains_empty_group = false; + for (auto* entity : _group_entities) { + if (!entity->is_empty_group_entity()) { + user_g_cpu_hard_limit += entity->cpu_share(); + } else { + contains_empty_group = true; + } + } + + // 0 <= user_g_cpu_hard_limit <= 100, bound by FE + // user_g_cpu_hard_limit = 0 means no group exists + int empty_group_cpu_share = 100 - user_g_cpu_hard_limit; + if (empty_group_cpu_share > 0 && empty_group_cpu_share < 100 && !contains_empty_group) { + _empty_group_entity->update_empty_cpu_share(empty_group_cpu_share); + _enqueue_task_group(_empty_group_entity); + } else if ((empty_group_cpu_share == 0 || empty_group_cpu_share == 100) && + contains_empty_group) { + // no need to update empty group here + // only update empty group's cpu share when exec enqueue + _dequeue_task_group(_empty_group_entity); + } +} + } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index d693cbe216..e6ed54417f 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -187,6 +187,8 @@ public: void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info, taskgroup::TGPTEntityPtr entity) override; + void reset_empty_group_entity(); + private: template Status _push_back(PipelineTask* task); @@ -209,6 +211,13 @@ private: int _total_cpu_share = 0; std::atomic _min_tg_entity = nullptr; uint64_t _min_tg_v_runtime_ns = 0; + + // empty group + taskgroup::TaskGroupEntity>* _empty_group_entity = + new taskgroup::TaskGroupEntity>(); + PipelineTask* _empty_pip_task = new PipelineTask(); + // todo(wb) support auto-switch cpu mode between soft limit and hard limit + bool _enable_cpu_hard_limit = config::enable_cpu_hard_limit; }; } // namespace pipeline diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 0a8ca74257..1a44c57a74 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -222,6 +222,10 @@ void TaskScheduler::_do_work(size_t index) { if (!task) { continue; } + if (task->is_empty_task()) { + task->yield(); + continue; + } task->set_task_queue(_task_queue.get()); auto* fragment_ctx = task->fragment_context(); signal::query_id_hi = fragment_ctx->get_query_id().hi; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 4bc94b38d1..e3be7b9483 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -650,7 +650,8 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo query_ctx->set_task_group(tg); LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) << " use task group: " << tg->debug_string() - << " query_cpu_hard_limit: " << query_cpu_hard_limit; + << " query_cpu_hard_limit: " << query_cpu_hard_limit + << " cpu_share:" << task_group_info.cpu_share; if (query_cpu_hard_limit > 0 && _exec_env->get_cgroup_cpu_ctl() != nullptr) { _exec_env->get_cgroup_cpu_ctl()->update_cpu_hard_limit( query_cpu_hard_limit); diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/task_group/task_group.cpp index 641b39f3e2..3974d04561 100644 --- a/be/src/runtime/task_group/task_group.cpp +++ b/be/src/runtime/task_group/task_group.cpp @@ -42,7 +42,9 @@ namespace taskgroup { const static std::string CPU_SHARE = "cpu_share"; const static std::string MEMORY_LIMIT = "memory_limit"; const static std::string ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit"; -const static std::string QUERY_CPU_HARD_LIMIT = "query_cpu_hard_limit"; +const static std::string QUERY_CPU_HARD_LIMIT = + "query_cpu_hard_limit"; // sum of all query's cpu_hard_limit +const static std::string CPU_HARD_LIMIT = "cpu_hard_limit"; // a property for workload group template TaskGroupEntity::TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type) @@ -84,7 +86,7 @@ uint64_t TaskGroupEntity::cpu_share() const { template uint64_t TaskGroupEntity::task_group_id() const { - return _tg->id(); + return _is_empty_group_entity ? -1 : _tg->id(); } template @@ -101,6 +103,21 @@ std::string TaskGroupEntity::debug_string() const { _tg->id(), _tg->name(), _type, cpu_share(), task_size(), _vruntime_ns); } +template +void TaskGroupEntity::set_empty_group_entity(bool is_empty_group_entity) { + _is_empty_group_entity = is_empty_group_entity; +} + +template +bool TaskGroupEntity::is_empty_group_entity() { + return _is_empty_group_entity; +} + +template +void TaskGroupEntity::update_empty_cpu_share(uint64_t empty_cpu_share) { + _cpu_share = empty_cpu_share; +} + template class TaskGroupEntity>; template class TaskGroupEntity; diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index 88da2ff637..8dd8b75fd1 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -50,6 +50,8 @@ public: explicit TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type); ~TaskGroupEntity(); + TaskGroupEntity() = default; // used for empty group entity + uint64_t vruntime_ns() const { return _vruntime_ns; } QueueType* task_queue(); @@ -68,6 +70,12 @@ public: void check_and_update_cpu_share(const TaskGroupInfo& tg_info); + void set_empty_group_entity(bool is_empty_group_entity); + + bool is_empty_group_entity(); + + void update_empty_cpu_share(uint64_t empty_cpu_share); + private: QueueType* _task_queue; @@ -81,6 +89,8 @@ private: // independent updates. int64_t _version; uint64_t _cpu_share; + + bool _is_empty_group_entity = false; }; // TODO llj tg use PriorityTaskQueue to replace std::queue diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp b/be/src/vec/exec/scan/scan_task_queue.cpp index 89235b6b7a..90171bc87c 100644 --- a/be/src/vec/exec/scan/scan_task_queue.cpp +++ b/be/src/vec/exec/scan/scan_task_queue.cpp @@ -53,8 +53,16 @@ bool ScanTaskQueue::try_get(ScanTask* scan_task, uint32_t timeout_ms) { return r; } -ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) : _core_size(core_size) {} -ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() = default; +ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) : _core_size(core_size) { + _empty_scan_task->scan_entity = _empty_group_entity; + _empty_scan_task->is_empty_task = true; + _empty_group_entity->set_empty_group_entity(true); +} + +ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() { + delete _empty_group_entity; + delete _empty_scan_task; +} void ScanTaskTaskGroupQueue::close() { std::unique_lock lock(_rs_mutex); @@ -78,9 +86,16 @@ bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) { } } } + if (entity->is_empty_group_entity()) { + *scan_task = *_empty_scan_task; + return true; + } DCHECK(entity->task_size() > 0); if (entity->task_size() == 1) { _dequeue_task_group(entity); + if (_enable_cpu_hard_limit) { + reset_empty_group_entity(); + } } return entity->task_queue()->try_get(scan_task, WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); } @@ -95,6 +110,9 @@ bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) { } if (_group_entities.find(entity) == _group_entities.end()) { _enqueue_task_group(entity); + if (_enable_cpu_hard_limit) { + reset_empty_group_entity(); + } } _wait_task.notify_one(); return true; @@ -132,6 +150,31 @@ void ScanTaskTaskGroupQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo& } } +void ScanTaskTaskGroupQueue::reset_empty_group_entity() { + int user_g_cpu_hard_limit = 0; + bool contains_empty_group = false; + for (auto* entity : _group_entities) { + if (!entity->is_empty_group_entity()) { + user_g_cpu_hard_limit += entity->cpu_share(); + } else { + contains_empty_group = true; + } + } + + // 0 <= user_g_cpu_hard_limit <= 100, bound by FE + // user_g_cpu_hard_limit = 0 means no group exists + int empty_group_cpu_share = 100 - user_g_cpu_hard_limit; + if (empty_group_cpu_share > 0 && empty_group_cpu_share < 100 && !contains_empty_group) { + _empty_group_entity->update_empty_cpu_share(empty_group_cpu_share); + _enqueue_task_group(_empty_group_entity); + } else if ((empty_group_cpu_share == 0 || empty_group_cpu_share == 100) && + contains_empty_group) { + // no need to update empty group here + // only update empty group's cpu share when exec enqueue + _dequeue_task_group(_empty_group_entity); + } +} + void ScanTaskTaskGroupQueue::_enqueue_task_group(TGSTEntityPtr tg_entity) { _total_cpu_share += tg_entity->cpu_share(); // TODO llj tg If submitted back to this queue from the scanner thread, `adjust_vruntime_ns` diff --git a/be/src/vec/exec/scan/scan_task_queue.h b/be/src/vec/exec/scan/scan_task_queue.h index c694859e3c..4afd685c79 100644 --- a/be/src/vec/exec/scan/scan_task_queue.h +++ b/be/src/vec/exec/scan/scan_task_queue.h @@ -29,6 +29,7 @@ namespace taskgroup { using WorkFunction = std::function; static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; +static constexpr auto SCAN_THREAD_TIME_SLICE_US = 100000L; // 100ms // Like PriorityThreadPool::Task struct ScanTask { @@ -45,6 +46,7 @@ struct ScanTask { vectorized::ScannerContext* scanner_context; TGSTEntityPtr scan_entity; int priority; + bool is_empty_task = false; }; // Like pipeline::PriorityTaskQueue use BlockingPriorityQueue directly? @@ -73,6 +75,8 @@ public: void update_tg_cpu_share(const taskgroup::TaskGroupInfo&, taskgroup::TGSTEntityPtr); + void reset_empty_group_entity(); + private: TGSTEntityPtr _task_entity(ScanTask& scan_task); void _enqueue_task_group(TGSTEntityPtr); @@ -94,6 +98,11 @@ private: std::atomic _min_tg_entity = nullptr; uint64_t _min_tg_v_runtime_ns = 0; size_t _core_size; + + TaskGroupEntity* _empty_group_entity = new TaskGroupEntity(); + taskgroup::ScanTask* _empty_scan_task = new taskgroup::ScanTask(); + // todo(wb) support auto-switch cpu mode between soft limit and hard limit + bool _enable_cpu_hard_limit = config::enable_cpu_hard_limit; }; } // namespace taskgroup diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index c7155bca2a..b7f05f0ec9 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -371,8 +371,8 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext bool should_stop = false; // Has to wait at least one full block, or it will cause a lot of schedule task in priority // queue, it will affect query latency and query concurrency for example ssb 3.3. - while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read < raw_rows_threshold && - num_rows_in_block < state->batch_size()) { + while (!eos && raw_bytes_read < raw_bytes_threshold && + (raw_rows_read < raw_rows_threshold || num_rows_in_block < state->batch_size())) { // TODO llj task group should should_yield? if (UNLIKELY(ctx->done())) { // No need to set status on error here. @@ -447,9 +447,19 @@ void ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler, auto success = scan_queue->take(&scan_task); if (success) { int64_t time_spent = 0; - { - SCOPED_RAW_TIMER(&time_spent); - scan_task.scan_func(); + if (!scan_task.is_empty_task) { + RuntimeProfile::Counter tmp_timer(TUnit::TIME_NS); + { + SCOPED_CPU_TIMER(&tmp_timer); + scan_task.scan_func(); + } + time_spent = tmp_timer.value(); + } else { + { + SCOPED_RAW_TIMER(&time_spent); + usleep(taskgroup::SCAN_THREAD_TIME_SLICE_US); + } + time_spent = time_spent * _core_num / _total_query_thread_num; } scan_queue->update_statistics(scan_task, time_spent); } diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index a07d8e5998..3a198b6c74 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -115,6 +115,10 @@ private: // true is the scheduler is closed. std::atomic_bool _is_closed = {false}; bool _is_init = false; + + int _core_num = CpuInfo::num_cores(); + int _total_query_thread_num = + config::doris_scanner_thread_pool_thread_num + config::pipeline_executor_size; }; } // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index adc6eca9c2..68115c4c84 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -192,7 +192,7 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { if (properties.containsKey(CPU_HARD_LIMIT)) { String cpuHardLimit = properties.get(CPU_HARD_LIMIT); if (!StringUtils.isNumeric(cpuHardLimit) || Long.parseLong(cpuHardLimit) <= 0) { - throw new DdlException(CPU_HARD_LIMIT + " " + cpuSchedulingWeight + " requires a positive integer."); + throw new DdlException(CPU_HARD_LIMIT + " " + cpuHardLimit + " requires a positive integer."); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 89aae3c0bd..3f0053237a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -54,34 +54,37 @@ import java.io.DataOutput; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.concurrent.locks.ReentrantReadWriteLock; public class WorkloadGroupMgr implements Writable, GsonPostProcessable { - private static final Logger LOG = LogManager.getLogger(WorkloadGroupMgr.class); - public static final String DEFAULT_GROUP_NAME = "normal"; - public static final ImmutableList WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES = new ImmutableList.Builder() .add("Id").add("Name").add("Item").add("Value") .build(); + private static final Logger LOG = LogManager.getLogger(WorkloadGroupMgr.class); @SerializedName(value = "idToWorkloadGroup") private final Map idToWorkloadGroup = Maps.newHashMap(); - private final Map nameToWorkloadGroup = Maps.newHashMap(); - private final ResourceProcNode procNode = new ResourceProcNode(); - private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); public static final String QUERY_CPU_HARD_LIMIT = "query_cpu_hard_limit"; private int queryCPUHardLimit = 0; + // works when user not set cpu hard limit, we fill a default value + private int cpuHardLimitDefaultVal = 0; public WorkloadGroupMgr() { } + public static WorkloadGroupMgr read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, WorkloadGroupMgr.class); + } + private void readLock() { lock.readLock().lock(); } @@ -122,7 +125,17 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { } workloadGroups.add(workloadGroup.toThrift()); // note(wb) -1 to tell be no need to update cgroup - int thriftVal = Config.enable_cpu_hard_limit ? this.queryCPUHardLimit : -1; + int thriftVal = -1; + if (Config.enable_cpu_hard_limit) { + // reset cpu_share according to cpu hard limit + int cpuHardLimitShare = workloadGroup.getCpuHardLimit() == 0 + ? this.cpuHardLimitDefaultVal : workloadGroup.getCpuHardLimit(); + workloadGroups.get(0).getProperties() + .put(WorkloadGroup.CPU_SHARE, String.valueOf(cpuHardLimitShare)); + + // reset sum of all groups cpu hard limit + thriftVal = this.queryCPUHardLimit; + } workloadGroups.get(0).getProperties().put(QUERY_CPU_HARD_LIMIT, String.valueOf(thriftVal)); context.setWorkloadGroupName(groupName); } finally { @@ -187,9 +200,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { public void createWorkloadGroup(CreateWorkloadGroupStmt stmt) throws DdlException { checkWorkloadGroupEnabled(); - if (!Config.enable_cpu_hard_limit) { - stmt.getProperties().remove(WorkloadGroup.CPU_HARD_LIMIT); - } WorkloadGroup workloadGroup = WorkloadGroup.create(stmt.getWorkloadGroupName(), stmt.getProperties()); String workloadGroupName = workloadGroup.getName(); writeLock(); @@ -222,17 +232,51 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { "The sum of all workload group " + WorkloadGroup.MEMORY_LIMIT + " cannot be greater than 100.0%."); } - if (!Config.enable_cpu_hard_limit) { - return; - } - int sumCPULimit = queryCPUHardLimit + workloadGroup.getCpuHardLimit(); - if (!Objects.isNull(old)) { - sumCPULimit -= old.getCpuHardLimit(); - } - if (sumCPULimit > 100 || sumCPULimit <= 0) { + // 1, check new group + int newGroupCpuHardLimit = workloadGroup.getCpuHardLimit(); + if (newGroupCpuHardLimit > 100 || newGroupCpuHardLimit < 0) { throw new DdlException( - "The sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT - + " can not be greater than 100% or less than or equal 0%"); + "new group's " + WorkloadGroup.CPU_HARD_LIMIT + + " value can not be greater than 100% or less than or equal 0%"); + } + + // 2, calculate new query hard cpu limit + int tmpCpuHardLimit = 0; + int zeroCpuHardLimitCount = 0; + for (Map.Entry entry : idToWorkloadGroup.entrySet()) { + if (old != null && entry.getKey() == old.getId()) { + continue; + } + int cpuHardLimit = entry.getValue().getCpuHardLimit(); + if (cpuHardLimit == 0) { + zeroCpuHardLimitCount++; + } + tmpCpuHardLimit += cpuHardLimit; + } + if (newGroupCpuHardLimit == 0) { + zeroCpuHardLimitCount++; + } + tmpCpuHardLimit += newGroupCpuHardLimit; + + if (tmpCpuHardLimit > 100) { + throw new DdlException("sum of all workload group " + WorkloadGroup.CPU_HARD_LIMIT + + " can not be greater than 100% "); + } + + if (tmpCpuHardLimit == 100 && zeroCpuHardLimitCount > 0) { + throw new DdlException("some workload group may not be assigned " + + "cpu hard limit but all query cpu hard limit exceeds 100%"); + } + + int leftCpuHardLimitVal = 100 - tmpCpuHardLimit; + if (zeroCpuHardLimitCount != 0) { + int tmpCpuHardLimitDefaultVal = leftCpuHardLimitVal / zeroCpuHardLimitCount; + if (tmpCpuHardLimitDefaultVal == 0) { + throw new DdlException("remaining cpu can not be assigned to the " + + "workload group without cpu hard limit value; " + + leftCpuHardLimitVal + "%," + newGroupCpuHardLimit + + "%," + zeroCpuHardLimitCount); + } } } @@ -241,9 +285,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { String workloadGroupName = stmt.getWorkloadGroupName(); Map properties = stmt.getProperties(); - if (!Config.enable_cpu_hard_limit) { - properties.remove(WorkloadGroup.CPU_HARD_LIMIT); - } WorkloadGroup newWorkloadGroup; writeLock(); try { @@ -290,6 +331,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { long groupId = workloadGroup.getId(); idToWorkloadGroup.remove(groupId); nameToWorkloadGroup.remove(workloadGroupName); + calQueryCPUHardLimit(); Env.getCurrentEnv().getEditLog().logDropWorkloadGroup(new DropWorkloadGroupOperationLog(groupId)); } finally { writeUnlock(); @@ -302,6 +344,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { try { nameToWorkloadGroup.put(workloadGroup.getName(), workloadGroup); idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup); + calQueryCPUHardLimit(); } finally { writeUnlock(); } @@ -334,6 +377,7 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { WorkloadGroup workloadGroup = idToWorkloadGroup.get(id); nameToWorkloadGroup.remove(workloadGroup.getName()); idToWorkloadGroup.remove(id); + calQueryCPUHardLimit(); } finally { writeUnlock(); } @@ -360,8 +404,18 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { } private void calQueryCPUHardLimit() { - this.queryCPUHardLimit = - idToWorkloadGroup.values().stream().mapToInt(WorkloadGroup::getCpuHardLimit).sum(); + int zeroCpuHardLimitCount = 0; + int ret = 0; + for (Map.Entry entry : idToWorkloadGroup.entrySet()) { + if (entry.getValue().getCpuHardLimit() == 0) { + zeroCpuHardLimitCount++; + } + ret += entry.getValue().getCpuHardLimit(); + } + this.queryCPUHardLimit = ret; + if (zeroCpuHardLimitCount != 0) { + this.cpuHardLimitDefaultVal = (100 - this.queryCPUHardLimit) / zeroCpuHardLimitCount; + } } @Override @@ -370,11 +424,6 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { Text.writeString(out, json); } - public static WorkloadGroupMgr read(DataInput in) throws IOException { - String json = Text.readString(in); - return GsonUtils.GSON.fromJson(json, WorkloadGroupMgr.class); - } - @Override public void gsonPostProcess() throws IOException { idToWorkloadGroup.forEach(