// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. #include "task_group.h" #include #include #include #include #include #include #include #include "common/logging.h" #include "pipeline/task_queue.h" #include "pipeline/task_scheduler.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "util/mem_info.h" #include "util/parse_util.h" #include "vec/exec/scan/scanner_scheduler.h" namespace doris { namespace taskgroup { const static uint64_t CPU_SHARE_DEFAULT_VALUE = 1024; const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%"; const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true; const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1; template TaskGroupEntity::TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type) : _tg(tg), _type(type), _version(tg->version()), _cpu_share(tg->cpu_share()) { _task_queue = new QueueType(); } template TaskGroupEntity::~TaskGroupEntity() { delete _task_queue; } template QueueType* TaskGroupEntity::task_queue() { return _task_queue; } template void TaskGroupEntity::incr_runtime_ns(uint64_t runtime_ns) { auto v_time = runtime_ns / _cpu_share; _vruntime_ns += v_time; } template void TaskGroupEntity::adjust_vruntime_ns(uint64_t vruntime_ns) { VLOG_DEBUG << "adjust " << debug_string() << "vtime to " << vruntime_ns; _vruntime_ns = vruntime_ns; } template size_t TaskGroupEntity::task_size() const { return _task_queue->size(); } template uint64_t TaskGroupEntity::cpu_share() const { return _cpu_share; } template uint64_t TaskGroupEntity::task_group_id() const { return _tg->id(); } template void TaskGroupEntity::check_and_update_cpu_share(const TaskGroupInfo& tg_info) { if (tg_info.version > _version) { _cpu_share = tg_info.cpu_share; _version = tg_info.version; } } template std::string TaskGroupEntity::debug_string() const { return fmt::format("TGE[id = {}, name = {}-{}, cpu_share = {}, task size: {}, v_time:{} ns]", _tg->id(), _tg->name(), _type, cpu_share(), task_size(), _vruntime_ns); } template class TaskGroupEntity>; TaskGroup::TaskGroup(const TaskGroupInfo& tg_info) : _id(tg_info.id), _name(tg_info.name), _version(tg_info.version), _memory_limit(tg_info.memory_limit), _enable_memory_overcommit(tg_info.enable_memory_overcommit), _cpu_share(tg_info.cpu_share), _task_entity(this, "pipeline task entity"), _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM), _cpu_hard_limit(tg_info.cpu_hard_limit) {} std::string TaskGroup::debug_string() const { std::shared_lock rl {_mutex}; return fmt::format( "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, enable_memory_overcommit = " "{}, version = {}, cpu_hard_limit = {}]", _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES), _enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit()); } void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) { if (UNLIKELY(tg_info.id != _id)) { return; } { std::shared_lock rl {_mutex}; if (LIKELY(tg_info.version <= _version)) { return; } } { std::lock_guard wl {_mutex}; if (tg_info.version > _version) { _name = tg_info.name; _version = tg_info.version; _memory_limit = tg_info.memory_limit; _enable_memory_overcommit = tg_info.enable_memory_overcommit; _cpu_share = tg_info.cpu_share; _cpu_hard_limit = tg_info.cpu_hard_limit; } else { return; } } ExecEnv::GetInstance()->pipeline_task_group_scheduler()->task_queue()->update_tg_cpu_share( tg_info, &_task_entity); } int64_t TaskGroup::memory_used() { int64_t used_memory = 0; for (auto& mem_tracker_group : _mem_tracker_limiter_pool) { std::lock_guard l(mem_tracker_group.group_lock); for (const auto& tracker : mem_tracker_group.trackers) { used_memory += tracker->is_query_cancelled() ? 0 : tracker->consumption(); } } return used_memory; } void TaskGroup::add_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr) { auto group_num = mem_tracker_ptr->group_num(); std::lock_guard l(_mem_tracker_limiter_pool[group_num].group_lock); _mem_tracker_limiter_pool[group_num].trackers.insert(mem_tracker_ptr); } void TaskGroup::remove_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr) { auto group_num = mem_tracker_ptr->group_num(); std::lock_guard l(_mem_tracker_limiter_pool[group_num].group_lock); _mem_tracker_limiter_pool[group_num].trackers.erase(mem_tracker_ptr); } void TaskGroup::task_group_info(TaskGroupInfo* tg_info) const { std::shared_lock r_lock(_mutex); tg_info->id = _id; tg_info->name = _name; tg_info->cpu_share = _cpu_share; tg_info->memory_limit = _memory_limit; tg_info->enable_memory_overcommit = _enable_memory_overcommit; tg_info->version = _version; } Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& workload_group_info, taskgroup::TaskGroupInfo* task_group_info) { // 1 id int tg_id = 0; if (workload_group_info.__isset.id) { tg_id = workload_group_info.id; } else { return Status::InternalError("workload group id is required"); } task_group_info->id = tg_id; // 2 name std::string name = "INVALID_NAME"; if (workload_group_info.__isset.name) { name = workload_group_info.name; } task_group_info->name = name; // 3 version int version = 0; if (workload_group_info.__isset.version) { version = workload_group_info.version; } else { return Status::InternalError("workload group version is required"); } task_group_info->version = version; // 4 cpu_share uint64_t cpu_share = CPU_SHARE_DEFAULT_VALUE; if (workload_group_info.__isset.cpu_share) { cpu_share = workload_group_info.cpu_share; } task_group_info->cpu_share = cpu_share; // 5 cpu hard limit int cpu_hard_limit = CPU_HARD_LIMIT_DEFAULT_VALUE; if (workload_group_info.__isset.cpu_hard_limit) { cpu_hard_limit = workload_group_info.cpu_hard_limit; } task_group_info->cpu_hard_limit = cpu_hard_limit; // 6 mem_limit std::string mem_limit_str = MEMORY_LIMIT_DEFAULT_VALUE; if (workload_group_info.__isset.mem_limit) { mem_limit_str = workload_group_info.mem_limit; } bool is_percent = true; int64_t mem_limit = ParseUtil::parse_mem_spec(mem_limit_str, -1, MemInfo::mem_limit(), &is_percent); task_group_info->memory_limit = mem_limit; // 7 mem overcommit bool enable_memory_overcommit = ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE; if (workload_group_info.__isset.enable_memory_overcommit) { enable_memory_overcommit = workload_group_info.enable_memory_overcommit; } task_group_info->enable_memory_overcommit = enable_memory_overcommit; // 8 cpu soft limit or hard limit bool enable_cpu_hard_limit = false; if (workload_group_info.__isset.enable_cpu_hard_limit) { enable_cpu_hard_limit = workload_group_info.enable_cpu_hard_limit; } task_group_info->enable_cpu_hard_limit = enable_cpu_hard_limit; return Status::OK(); } } // namespace taskgroup } // namespace doris