diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp index c94a3c05f1..5263c06053 100644 --- a/be/src/agent/cgroup_cpu_ctl.cpp +++ b/be/src/agent/cgroup_cpu_ctl.cpp @@ -172,7 +172,7 @@ Status CgroupV1CpuCtl::modify_cg_cpu_soft_limit_no_lock(int cpu_shares) { Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) { int val = cpu_hard_limit > 0 ? (_cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100) - : CPU_HARD_LIMIT_DEFAULT_VALUE; + : CGROUP_CPU_HARD_LIMIT_DEFAULT_VALUE; std::string msg = "modify cpu quota value to " + std::to_string(val); return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_quota_file, val, msg, false); } diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h index 94514c8e2e..1289f26307 100644 --- a/be/src/agent/cgroup_cpu_ctl.h +++ b/be/src/agent/cgroup_cpu_ctl.h @@ -29,10 +29,7 @@ namespace doris { // cgroup cpu.cfs_quota_us default value, it means disable cpu hard limit -const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1; - -// cgroup cpu.shares default value -const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024; +const static int CGROUP_CPU_HARD_LIMIT_DEFAULT_VALUE = -1; class CgroupCpuCtl { public: diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp index 5f9da64bd2..1d5a8544e1 100644 --- a/be/src/agent/workload_group_listener.cpp +++ b/be/src/agent/workload_group_listener.cpp @@ -50,7 +50,7 @@ void WorkloadGroupListener::handle_topic_info(const std::vector& topi task_group_info.enable_cpu_hard_limit); // 4 create and update task scheduler - _exec_env->task_group_manager()->upsert_cg_task_scheduler(&task_group_info, _exec_env); + tg->upsert_task_scheduler(&task_group_info, _exec_env); LOG(INFO) << "update task group finish, tg info=" << tg->debug_string() << ", enable_cpu_hard_limit=" diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index dadb6ada17..2c25d37d14 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -210,8 +210,8 @@ Status QueryContext::set_task_group(taskgroup::TaskGroupPtr& tg) { // see task_group_manager::delete_task_group_by_ids RETURN_IF_ERROR(_task_group->add_query(_query_id)); _task_group->add_mem_tracker_limiter(query_mem_tracker); - _exec_env->task_group_manager()->get_query_scheduler( - _task_group->id(), &_task_scheduler, &_scan_task_scheduler, &_non_pipe_thread_pool); + _task_group->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler, + &_non_pipe_thread_pool, &_remote_scan_task_scheduler); return Status::OK(); } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index d7b3813dce..a639268c55 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -196,6 +196,10 @@ public: vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return _scan_task_scheduler; } + vectorized::SimplifiedScanScheduler* get_remote_scan_scheduler() { + return _remote_scan_task_scheduler; + } + pipeline::Dependency* get_execution_dependency() { return _execution_dependency.get(); } void register_query_statistics(std::shared_ptr qs); @@ -283,6 +287,7 @@ private: doris::pipeline::TaskScheduler* _task_scheduler = nullptr; vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr; ThreadPool* _non_pipe_thread_pool = nullptr; + vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr; std::unique_ptr _execution_dependency; std::shared_ptr _cpu_statistics = nullptr; diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/task_group/task_group.cpp index ddddf39dbc..53534444a5 100644 --- a/be/src/runtime/task_group/task_group.cpp +++ b/be/src/runtime/task_group/task_group.cpp @@ -34,6 +34,7 @@ #include "util/mem_info.h" #include "util/parse_util.h" #include "util/runtime_profile.h" +#include "util/threadpool.h" #include "vec/exec/scan/scanner_scheduler.h" namespace doris { @@ -43,6 +44,7 @@ const static uint64_t CPU_SHARE_DEFAULT_VALUE = 1024; const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%"; const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true; const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1; +const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024; TaskGroup::TaskGroup(const TaskGroupInfo& tg_info) : _id(tg_info.id), @@ -53,16 +55,19 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info) _cpu_share(tg_info.cpu_share), _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM), _cpu_hard_limit(tg_info.cpu_hard_limit), - _scan_thread_num(tg_info.scan_thread_num) {} + _scan_thread_num(tg_info.scan_thread_num), + _max_remote_scan_thread_num(tg_info.max_remote_scan_thread_num), + _min_remote_scan_thread_num(tg_info.min_remote_scan_thread_num) {} std::string TaskGroup::debug_string() const { std::shared_lock rl {_mutex}; return fmt::format( "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, enable_memory_overcommit = " - "{}, version = {}, cpu_hard_limit = {}, scan_thread_num = {}]", + "{}, version = {}, cpu_hard_limit = {}, scan_thread_num = " + "{}, max_remote_scan_thread_num = {}, min_remote_scan_thread_num = {}]", _id, _name, cpu_share(), PrettyPrinter::print(_memory_limit, TUnit::BYTES), _enable_memory_overcommit ? "true" : "false", _version, cpu_hard_limit(), - _scan_thread_num); + _scan_thread_num, _max_remote_scan_thread_num, _min_remote_scan_thread_num); } void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) { @@ -85,6 +90,8 @@ void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) { _cpu_share = tg_info.cpu_share; _cpu_hard_limit = tg_info.cpu_hard_limit; _scan_thread_num = tg_info.scan_thread_num; + _max_remote_scan_thread_num = tg_info.max_remote_scan_thread_num; + _min_remote_scan_thread_num = tg_info.min_remote_scan_thread_num; } else { return; } @@ -266,8 +273,167 @@ Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& workload_group_ task_group_info->scan_thread_num = workload_group_info.scan_thread_num; } + // 10 max remote scan thread num + task_group_info->max_remote_scan_thread_num = config::doris_scanner_thread_pool_thread_num; + if (workload_group_info.__isset.max_remote_scan_thread_num && + workload_group_info.max_remote_scan_thread_num > 0) { + task_group_info->max_remote_scan_thread_num = + workload_group_info.max_remote_scan_thread_num; + } + + // 11 min remote scan thread num + task_group_info->min_remote_scan_thread_num = config::doris_scanner_thread_pool_thread_num; + if (workload_group_info.__isset.min_remote_scan_thread_num && + workload_group_info.min_remote_scan_thread_num > 0) { + task_group_info->min_remote_scan_thread_num = + workload_group_info.min_remote_scan_thread_num; + } + return Status::OK(); } +void TaskGroup::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env) { + uint64_t tg_id = tg_info->id; + std::string tg_name = tg_info->name; + int cpu_hard_limit = tg_info->cpu_hard_limit; + uint64_t cpu_shares = tg_info->cpu_share; + bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit; + int scan_thread_num = tg_info->scan_thread_num; + int max_remote_scan_thread_num = tg_info->max_remote_scan_thread_num; + int min_remote_scan_thread_num = tg_info->min_remote_scan_thread_num; + + std::lock_guard wlock(_task_sched_lock); + if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) { + std::unique_ptr cgroup_cpu_ctl = std::make_unique(tg_id); + Status ret = cgroup_cpu_ctl->init(); + if (ret.ok()) { + _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl); + LOG(INFO) << "[upsert wg thread pool] cgroup init success"; + } else { + LOG(INFO) << "[upsert wg thread pool] cgroup init failed, gid= " << tg_id + << ", reason=" << ret.to_string(); + } + } + + CgroupCpuCtl* cg_cpu_ctl_ptr = _cgroup_cpu_ctl.get(); + + if (_task_sched == nullptr) { + int32_t executors_size = config::pipeline_executor_size; + if (executors_size <= 0) { + executors_size = CpuInfo::num_cores(); + } + auto task_queue = std::make_shared(executors_size); + std::unique_ptr pipeline_task_scheduler = + std::make_unique( + exec_env, exec_env->get_global_block_scheduler(), std::move(task_queue), + "Pipe_" + tg_name, cg_cpu_ctl_ptr); + Status ret = pipeline_task_scheduler->start(); + if (ret.ok()) { + _task_sched = std::move(pipeline_task_scheduler); + } else { + LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, gid= " << tg_id; + } + } + + if (_scan_task_sched == nullptr) { + std::unique_ptr scan_scheduler = + std::make_unique("Scan_" + tg_name, + cg_cpu_ctl_ptr); + Status ret = scan_scheduler->start(); + if (ret.ok()) { + _scan_task_sched = std::move(scan_scheduler); + } else { + LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, gid=" << tg_id; + } + } + if (scan_thread_num > 0 && _scan_task_sched) { + _scan_task_sched->reset_thread_num(scan_thread_num); + } + + if (_remote_scan_task_sched == nullptr) { + std::unique_ptr remote_scan_scheduler = + std::make_unique("RScan_" + tg_name, + cg_cpu_ctl_ptr); + Status ret = remote_scan_scheduler->start(); + if (ret.ok()) { + _remote_scan_task_sched = std::move(remote_scan_scheduler); + } else { + LOG(INFO) << "[upsert wg thread pool] remote scan scheduler start failed, gid=" + << tg_id; + } + } + if (max_remote_scan_thread_num > 0 && _remote_scan_task_sched) { + _remote_scan_task_sched->reset_max_thread_num(max_remote_scan_thread_num); + } + if (min_remote_scan_thread_num > 0 && _remote_scan_task_sched) { + _remote_scan_task_sched->reset_min_thread_num(min_remote_scan_thread_num); + } + + if (_non_pipe_thread_pool == nullptr) { + std::unique_ptr thread_pool = nullptr; + auto ret = ThreadPoolBuilder("nonPip_" + tg_name) + .set_min_threads(1) + .set_max_threads(config::fragment_pool_thread_num_max) + .set_max_queue_size(config::fragment_pool_queue_size) + .set_cgroup_cpu_ctl(cg_cpu_ctl_ptr) + .build(&thread_pool); + if (!ret.ok()) { + LOG(INFO) << "[upsert wg thread pool] create non-pipline thread pool failed, gid=" + << tg_id; + } else { + _non_pipe_thread_pool = std::move(thread_pool); + } + } + + // step 6: update cgroup cpu if needed + if (_cgroup_cpu_ctl) { + if (enable_cpu_hard_limit) { + if (cpu_hard_limit > 0) { + _cgroup_cpu_ctl->update_cpu_hard_limit(cpu_hard_limit); + _cgroup_cpu_ctl->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE); + } else { + LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is illegal: " + << cpu_hard_limit << ", gid=" << tg_id; + } + } else { + if (config::enable_cgroup_cpu_soft_limit) { + _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares); + _cgroup_cpu_ctl->update_cpu_hard_limit( + CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit + } + } + _cgroup_cpu_ctl->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares), + &(tg_info->cgroup_cpu_hard_limit)); + } +} + +void TaskGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched, + vectorized::SimplifiedScanScheduler** scan_sched, + ThreadPool** non_pipe_thread_pool, + vectorized::SimplifiedScanScheduler** remote_scan_sched) { + std::shared_lock rlock(_task_sched_lock); + *exec_sched = _task_sched.get(); + *scan_sched = _scan_task_sched.get(); + *remote_scan_sched = _remote_scan_task_sched.get(); + *non_pipe_thread_pool = _non_pipe_thread_pool.get(); +} + +void TaskGroup::try_stop_schedulers() { + std::shared_lock rlock(_task_sched_lock); + if (_task_sched) { + _task_sched->stop(); + } + if (_scan_task_sched) { + _scan_task_sched->stop(); + } + if (_remote_scan_task_sched) { + _remote_scan_task_sched->stop(); + } + if (_non_pipe_thread_pool) { + _non_pipe_thread_pool->shutdown(); + _non_pipe_thread_pool->wait(); + } +} + } // namespace taskgroup } // namespace doris diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index 7604ee4512..c54fce2ab6 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -36,9 +36,17 @@ namespace doris { class MemTrackerLimiter; class RuntimeProfile; +class ThreadPool; +class ExecEnv; +class CgroupCpuCtl; + +namespace vectorized { +class SimplifiedScanScheduler; +} namespace pipeline { class PipelineTask; +class TaskScheduler; } // namespace pipeline namespace taskgroup { @@ -121,6 +129,15 @@ public: int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile); + void upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env); + + void get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched, + vectorized::SimplifiedScanScheduler** scan_sched, + ThreadPool** non_pipe_thread_pool, + vectorized::SimplifiedScanScheduler** remote_scan_sched); + + void try_stop_schedulers(); + private: mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit const uint64_t _id; @@ -132,12 +149,21 @@ private: std::vector _mem_tracker_limiter_pool; std::atomic _cpu_hard_limit; std::atomic _scan_thread_num; + std::atomic _max_remote_scan_thread_num; + std::atomic _min_remote_scan_thread_num; // means task group is mark dropped // new query can not submit // waiting running query to be cancelled or finish bool _is_shutdown = false; std::unordered_set _query_id_set; + + std::shared_mutex _task_sched_lock; + std::unique_ptr _cgroup_cpu_ctl = nullptr; + std::unique_ptr _task_sched = nullptr; + std::unique_ptr _scan_task_sched = nullptr; + std::unique_ptr _remote_scan_task_sched = nullptr; + std::unique_ptr _non_pipe_thread_pool = nullptr; }; using TaskGroupPtr = std::shared_ptr; @@ -152,6 +178,8 @@ struct TaskGroupInfo { int cpu_hard_limit; bool enable_cpu_hard_limit; int scan_thread_num; + int max_remote_scan_thread_num; + int min_remote_scan_thread_num; // log cgroup cpu info uint64_t cgroup_cpu_shares = 0; int cgroup_cpu_hard_limit = 0; diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index b0b84a0eb8..819e63c855 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -21,9 +21,10 @@ #include #include "pipeline/task_scheduler.h" -#include "runtime/exec_env.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/task_group/task_group.h" +#include "util/threadpool.h" +#include "util/time.h" #include "vec/exec/scan/scanner_scheduler.h" namespace doris::taskgroup { @@ -68,126 +69,6 @@ TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t tg_id) { return nullptr; } -void TaskGroupManager::get_query_scheduler(uint64_t tg_id, - doris::pipeline::TaskScheduler** exec_sched, - vectorized::SimplifiedScanScheduler** scan_sched, - ThreadPool** non_pipe_thread_pool) { - std::shared_lock r_lock(_task_scheduler_lock); - auto tg_sche_it = _tg_sche_map.find(tg_id); - if (tg_sche_it != _tg_sche_map.end()) { - *exec_sched = tg_sche_it->second.get(); - } - - auto tg_scan_sche_it = _tg_scan_sche_map.find(tg_id); - if (tg_scan_sche_it != _tg_scan_sche_map.end()) { - *scan_sched = tg_scan_sche_it->second.get(); - } - - auto non_pipe_thread_pool_iter = _non_pipe_thread_pool_map.find(tg_id); - if (non_pipe_thread_pool_iter != _non_pipe_thread_pool_map.end()) { - *non_pipe_thread_pool = non_pipe_thread_pool_iter->second.get(); - } -} - -void TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info, - ExecEnv* exec_env) { - uint64_t tg_id = tg_info->id; - std::string tg_name = tg_info->name; - int cpu_hard_limit = tg_info->cpu_hard_limit; - uint64_t cpu_shares = tg_info->cpu_share; - bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit; - int scan_thread_num = tg_info->scan_thread_num; - - std::lock_guard write_lock(_task_scheduler_lock); - // step 1: init cgroup cpu controller - CgroupCpuCtl* cg_cu_ctl_ptr = nullptr; - if (config::doris_cgroup_cpu_path != "" && - _cgroup_ctl_map.find(tg_id) == _cgroup_ctl_map.end()) { - std::unique_ptr cgroup_cpu_ctl = std::make_unique(tg_id); - Status ret = cgroup_cpu_ctl->init(); - if (ret.ok()) { - cg_cu_ctl_ptr = cgroup_cpu_ctl.get(); - _cgroup_ctl_map.emplace(tg_id, std::move(cgroup_cpu_ctl)); - LOG(INFO) << "[upsert wg thread pool] cgroup init success"; - } else { - LOG(INFO) << "[upsert wg thread pool] cgroup init failed, gid= " << tg_id - << ", reason=" << ret.to_string(); - } - } - - // step 2: init task scheduler - if (_tg_sche_map.find(tg_id) == _tg_sche_map.end()) { - int32_t executors_size = config::pipeline_executor_size; - if (executors_size <= 0) { - executors_size = CpuInfo::num_cores(); - } - auto task_queue = std::make_shared(executors_size); - - auto pipeline_task_scheduler = std::make_unique( - exec_env, exec_env->get_global_block_scheduler(), std::move(task_queue), - "Exec_" + tg_name, cg_cu_ctl_ptr); - Status ret = pipeline_task_scheduler->start(); - if (ret.ok()) { - _tg_sche_map.emplace(tg_id, std::move(pipeline_task_scheduler)); - } else { - LOG(INFO) << "[upsert wg thread pool] task scheduler start failed, gid= " << tg_id; - } - } - - // step 3: init scan scheduler - if (_tg_scan_sche_map.find(tg_id) == _tg_scan_sche_map.end()) { - auto scan_scheduler = - std::make_unique(tg_name, cg_cu_ctl_ptr); - Status ret = scan_scheduler->start(); - if (ret.ok()) { - _tg_scan_sche_map.emplace(tg_id, std::move(scan_scheduler)); - } else { - LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed, gid=" << tg_id; - } - } - if (scan_thread_num > 0 && _tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) { - _tg_scan_sche_map.at(tg_id)->reset_thread_num(scan_thread_num); - } - - // step 4: init non-pipe scheduler - if (_non_pipe_thread_pool_map.find(tg_id) == _non_pipe_thread_pool_map.end()) { - std::unique_ptr thread_pool = nullptr; - auto ret = ThreadPoolBuilder("nonPip_" + tg_name) - .set_min_threads(1) - .set_max_threads(config::fragment_pool_thread_num_max) - .set_max_queue_size(config::fragment_pool_queue_size) - .set_cgroup_cpu_ctl(cg_cu_ctl_ptr) - .build(&thread_pool); - if (!ret.ok()) { - LOG(INFO) << "[upsert wg thread pool] create non-pipline thread pool failed, gid=" - << tg_id; - } else { - _non_pipe_thread_pool_map.emplace(tg_id, std::move(thread_pool)); - } - } - - // step 5: update cgroup cpu if needed - if (_cgroup_ctl_map.find(tg_id) != _cgroup_ctl_map.end()) { - if (enable_cpu_hard_limit) { - if (cpu_hard_limit > 0) { - _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit); - _cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE); - } else { - LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit but value is illegal: " - << cpu_hard_limit << ", gid=" << tg_id; - } - } else { - if (config::enable_cgroup_cpu_soft_limit) { - _cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(cpu_shares); - _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit( - CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit - } - } - _cgroup_ctl_map.at(tg_id)->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares), - &(tg_info->cgroup_cpu_hard_limit)); - } -} - void TaskGroupManager::delete_task_group_by_ids(std::set used_wg_id) { int64_t begin_time = MonotonicMillis(); // 1 get delete group without running queries @@ -209,45 +90,11 @@ void TaskGroupManager::delete_task_group_by_ids(std::set used_wg_id) { } // 2 stop active thread - std::vector task_sched_to_stop; - std::vector scan_task_sched_to_stop; - std::vector non_pip_thread_pool_to_stop; - { - std::shared_lock read_lock(_task_scheduler_lock); - for (uint64_t tg_id : deleted_tg_ids) { - if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) { - task_sched_to_stop.emplace_back(_tg_sche_map.at(tg_id).get()); - } - if (_tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) { - scan_task_sched_to_stop.emplace_back(_tg_scan_sche_map.at(tg_id).get()); - } - if (_non_pipe_thread_pool_map.find(tg_id) != _non_pipe_thread_pool_map.end()) { - non_pip_thread_pool_to_stop.emplace_back(_non_pipe_thread_pool_map.at(tg_id).get()); - } - } - } - for (auto* ptr1 : task_sched_to_stop) { - ptr1->stop(); - } - for (auto* ptr2 : scan_task_sched_to_stop) { - ptr2->stop(); - } - for (auto& ptr3 : non_pip_thread_pool_to_stop) { - ptr3->shutdown(); - ptr3->wait(); + for (uint64_t tg_id : deleted_tg_ids) { + _task_groups.at(tg_id)->try_stop_schedulers(); } // 3 release resource in memory - { - std::lock_guard write_lock(_task_scheduler_lock); - for (uint64_t tg_id : deleted_tg_ids) { - _tg_sche_map.erase(tg_id); - _tg_scan_sche_map.erase(tg_id); - _cgroup_ctl_map.erase(tg_id); - _non_pipe_thread_pool_map.erase(tg_id); - } - } - { std::lock_guard write_lock(_group_mutex); for (uint64_t tg_id : deleted_tg_ids) { @@ -286,14 +133,8 @@ void TaskGroupManager::delete_task_group_by_ids(std::set used_wg_id) { } void TaskGroupManager::stop() { - for (auto& task_sche : _tg_sche_map) { - task_sche.second->stop(); - } - for (auto& task_sche : _tg_scan_sche_map) { - task_sche.second->stop(); - } - for (auto& no_pip_sche : _non_pipe_thread_pool_map) { - no_pip_sche.second->shutdown(); + for (auto iter = _task_groups.begin(); iter != _task_groups.end(); iter++) { + iter->second->try_stop_schedulers(); } } diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h index 29e5c30a59..21772bd3bc 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/task_group/task_group_manager.h @@ -21,18 +21,11 @@ #include #include -#include "pipeline/task_queue.h" -#include "pipeline/task_scheduler.h" #include "task_group.h" namespace doris { -class ExecEnv; -class QueryContext; -class CgroupCpuCtl; -namespace vectorized { -class SimplifiedScanScheduler; -} +class CgroupCpuCtl; namespace pipeline { class TaskScheduler; @@ -51,8 +44,6 @@ public: void get_related_taskgroups(const std::function& pred, std::vector* task_groups); - void upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env); - void delete_task_group_by_ids(std::set id_set); TaskGroupPtr get_task_group_by_id(uint64_t tg_id); @@ -65,22 +56,10 @@ public: bool enable_cpu_hard_limit() { return _enable_cpu_hard_limit.load(); } - void get_query_scheduler(uint64_t tg_id, doris::pipeline::TaskScheduler** exec_sched, - vectorized::SimplifiedScanScheduler** scan_sched, - ThreadPool** non_pipe_thread_pool); - private: std::shared_mutex _group_mutex; std::unordered_map _task_groups; - // map for workload group id and task scheduler pool - // used for cpu hard limit - std::shared_mutex _task_scheduler_lock; - std::map> _tg_sche_map; - std::map> _tg_scan_sche_map; - std::map> _cgroup_ctl_map; - std::map> _non_pipe_thread_pool_map; - std::shared_mutex _init_cg_ctl_lock; std::unique_ptr _cg_cpu_ctl; bool _is_init_succ = false; diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 1edf77798d..82e95708ef 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -128,6 +128,7 @@ Status ScannerContext::init() { if (_simple_scan_scheduler) { _should_reset_thread_name = false; } + _remote_scan_task_scheduler = _state->get_query_ctx()->get_remote_scan_scheduler(); } #endif diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 393920a746..1052e77ae0 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -147,6 +147,8 @@ public: SimplifiedScanScheduler* get_simple_scan_scheduler() { return _simple_scan_scheduler; } + SimplifiedScanScheduler* get_remote_scan_scheduler() { return _remote_scan_task_scheduler; } + void stop_scanners(RuntimeState* state); int32_t get_max_thread_num() const { return _max_thread_num; } @@ -205,6 +207,7 @@ protected: int64_t _max_bytes_in_queue; doris::vectorized::ScannerScheduler* _scanner_scheduler; SimplifiedScanScheduler* _simple_scan_scheduler = nullptr; + SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr; moodycamel::ConcurrentQueue> _scanners; int32_t _num_scheduled_scanners = 0; int32_t _num_finished_scanners = 0; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 75bd04b0e7..dd3be9f8f3 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -157,12 +157,12 @@ void ScannerScheduler::submit(std::shared_ptr ctx, TabletStorageType type = scanner_delegate->_scanner->get_storage_type(); bool ret = false; if (type == TabletStorageType::STORAGE_TYPE_LOCAL) { - if (auto* scan_sche = ctx->get_simple_scan_scheduler()) { + if (auto* scan_sched = ctx->get_simple_scan_scheduler()) { auto work_func = [this, scanner_ref = scan_task, ctx]() { this->_scanner_scan(ctx, scanner_ref); }; SimplifiedScanTask simple_scan_task = {work_func, ctx}; - ret = scan_sche->submit_scan_task(simple_scan_task); + ret = scan_sched->submit_scan_task(simple_scan_task); } else { PriorityThreadPool::Task task; task.work_function = [this, scanner_ref = scan_task, ctx]() { @@ -172,12 +172,20 @@ void ScannerScheduler::submit(std::shared_ptr ctx, ret = _local_scan_thread_pool->offer(task); } } else { - PriorityThreadPool::Task task; - task.work_function = [this, scanner_ref = scan_task, ctx]() { - this->_scanner_scan(ctx, scanner_ref); - }; - task.priority = nice; - ret = _remote_scan_thread_pool->offer(task); + if (auto* remote_scan_sched = ctx->get_remote_scan_scheduler()) { + auto work_func = [this, scanner_ref = scan_task, ctx]() { + this->_scanner_scan(ctx, scanner_ref); + }; + SimplifiedScanTask simple_scan_task = {work_func, ctx}; + ret = remote_scan_sched->submit_scan_task(simple_scan_task); + } else { + PriorityThreadPool::Task task; + task.work_function = [this, scanner_ref = scan_task, ctx]() { + this->_scanner_scan(ctx, scanner_ref); + }; + task.priority = nice; + ret = _remote_scan_thread_pool->offer(task); + } } if (!ret) { scan_task->set_status( diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index 746aa34ff9..f3f9caaa4d 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -101,15 +101,15 @@ struct SimplifiedScanTask { class SimplifiedScanScheduler { public: - SimplifiedScanScheduler(std::string wg_name, CgroupCpuCtl* cgroup_cpu_ctl) { + SimplifiedScanScheduler(std::string sched_name, CgroupCpuCtl* cgroup_cpu_ctl) { _is_stop.store(false); _cgroup_cpu_ctl = cgroup_cpu_ctl; - _wg_name = wg_name; + _sched_name = sched_name; } ~SimplifiedScanScheduler() { stop(); - LOG(INFO) << "Scanner sche " << _wg_name << " shutdown"; + LOG(INFO) << "Scanner sche " << _sched_name << " shutdown"; } void stop() { @@ -119,7 +119,7 @@ public: } Status start() { - RETURN_IF_ERROR(ThreadPoolBuilder("Scan_" + _wg_name) + RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name) .set_min_threads(config::doris_scanner_thread_pool_thread_num) .set_max_threads(config::doris_scanner_thread_pool_thread_num) .set_cgroup_cpu_ctl(_cgroup_cpu_ctl) @@ -131,7 +131,7 @@ public: if (!_is_stop) { return _scan_thread_pool->submit_func([scan_task] { scan_task.scan_func(); }); } else { - return Status::InternalError("scanner pool {} is shutdown.", _wg_name); + return Status::InternalError("scanner pool {} is shutdown.", _sched_name); } } @@ -148,11 +148,33 @@ public: } } + void reset_max_thread_num(int thread_num) { + int max_thread_num = _scan_thread_pool->max_threads(); + + if (max_thread_num != thread_num) { + Status st = _scan_thread_pool->set_max_threads(thread_num); + if (!st.ok()) { + LOG(INFO) << "reset max thread num failed, sche name=" << _sched_name; + } + } + } + + void reset_min_thread_num(int thread_num) { + int min_thread_num = _scan_thread_pool->min_threads(); + + if (min_thread_num != thread_num) { + Status st = _scan_thread_pool->set_min_threads(thread_num); + if (!st.ok()) { + LOG(INFO) << "reset min thread num failed, sche name=" << _sched_name; + } + } + } + private: std::unique_ptr _scan_thread_pool; std::atomic _is_stop; CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; - std::string _wg_name; + std::string _sched_name; }; } // namespace doris::vectorized diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java index 0def933c8c..b14b3afec0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java @@ -63,12 +63,17 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { public static final String SCAN_THREAD_NUM = "scan_thread_num"; + public static final String MAX_REMOTE_SCAN_THREAD_NUM = "max_remote_scan_thread_num"; + + public static final String MIN_REMOTE_SCAN_THREAD_NUM = "min_remote_scan_thread_num"; + // NOTE(wb): all property is not required, some properties default value is set in be // default value is as followed // cpu_share=1024, memory_limit=0%(0 means not limit), enable_memory_overcommit=true private static final ImmutableSet ALL_PROPERTIES_NAME = new ImmutableSet.Builder() .add(CPU_SHARE).add(MEMORY_LIMIT).add(ENABLE_MEMORY_OVERCOMMIT).add(MAX_CONCURRENCY) - .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM).build(); + .add(MAX_QUEUE_SIZE).add(QUEUE_TIMEOUT).add(CPU_HARD_LIMIT).add(SCAN_THREAD_NUM) + .add(MAX_REMOTE_SCAN_THREAD_NUM).add(MIN_REMOTE_SCAN_THREAD_NUM).build(); @SerializedName(value = "id") private long id; @@ -225,6 +230,32 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { } } + if (properties.containsKey(MAX_REMOTE_SCAN_THREAD_NUM)) { + String value = properties.get(MAX_REMOTE_SCAN_THREAD_NUM); + try { + int intValue = Integer.parseInt(value); + if (intValue <= 0 && intValue != -1) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new DdlException( + MAX_REMOTE_SCAN_THREAD_NUM + " must be a positive integer or -1. but input value is " + value); + } + } + + if (properties.containsKey(MIN_REMOTE_SCAN_THREAD_NUM)) { + String value = properties.get(MIN_REMOTE_SCAN_THREAD_NUM); + try { + int intValue = Integer.parseInt(value); + if (intValue <= 0 && intValue != -1) { + throw new NumberFormatException(); + } + } catch (NumberFormatException e) { + throw new DdlException( + MAX_REMOTE_SCAN_THREAD_NUM + " must be a positive integer or -1. but input value is " + value); + } + } + // check queue property if (properties.containsKey(MAX_CONCURRENCY)) { try { @@ -309,6 +340,10 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { row.add("true"); } else if (SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) { row.add("-1"); + } else if (MAX_REMOTE_SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) { + row.add("-1"); + } else if (MIN_REMOTE_SCAN_THREAD_NUM.equals(key) && !properties.containsKey(key)) { + row.add("-1"); } else { row.add(properties.get(key)); } @@ -369,6 +404,16 @@ public class WorkloadGroup implements Writable, GsonPostProcessable { tWorkloadGroupInfo.setScanThreadNum(Integer.parseInt(scanThreadNumStr)); } + String maxRemoteScanThreadNumStr = properties.get(MAX_REMOTE_SCAN_THREAD_NUM); + if (maxRemoteScanThreadNumStr != null) { + tWorkloadGroupInfo.setMaxRemoteScanThreadNum(Integer.parseInt(maxRemoteScanThreadNumStr)); + } + + String minRemoteScanThreadNumStr = properties.get(MIN_REMOTE_SCAN_THREAD_NUM); + if (minRemoteScanThreadNumStr != null) { + tWorkloadGroupInfo.setMinRemoteScanThreadNum(Integer.parseInt(minRemoteScanThreadNumStr)); + } + TopicInfo topicInfo = new TopicInfo(); topicInfo.setWorkloadGroupInfo(tWorkloadGroupInfo); return topicInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java index 60d7e296ed..a7c26f7cec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java @@ -70,7 +70,8 @@ public class WorkloadGroupMgr implements Writable, GsonPostProcessable { .add(WorkloadGroup.ENABLE_MEMORY_OVERCOMMIT) .add(WorkloadGroup.MAX_CONCURRENCY).add(WorkloadGroup.MAX_QUEUE_SIZE) .add(WorkloadGroup.QUEUE_TIMEOUT).add(WorkloadGroup.CPU_HARD_LIMIT) - .add(WorkloadGroup.SCAN_THREAD_NUM) + .add(WorkloadGroup.SCAN_THREAD_NUM).add(WorkloadGroup.MAX_REMOTE_SCAN_THREAD_NUM) + .add(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM) .add(QueryQueue.RUNNING_QUERY_NUM).add(QueryQueue.WAITING_QUERY_NUM) .build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index 6feb8fa94f..8a3df743e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -378,14 +378,18 @@ public class MetadataGenerator { trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(1))); // name trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(2)))); // cpu_share trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(3))); // mem_limit - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(4))); //mem overcommit + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(4))); // mem overcommit trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(5)))); // max concurrent trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(6)))); // max queue size trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(7)))); // queue timeout trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(8))); // cpu hard limit trow.addToColumnValue(new TCell().setIntVal(Integer.parseInt(rGroupsInfo.get(9)))); // scan thread num - trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10)))); // running query num - trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(11)))); // waiting query num + // max remote scan thread num + trow.addToColumnValue(new TCell().setIntVal(Integer.parseInt(rGroupsInfo.get(10)))); + // min remote scan thread num + trow.addToColumnValue(new TCell().setIntVal(Integer.parseInt(rGroupsInfo.get(11)))); + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(12)))); // running query num + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(13)))); // waiting query num dataBatch.add(trow); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadGroupsTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadGroupsTableValuedFunction.java index a50fb7ca85..27e011d60d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadGroupsTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/WorkloadGroupsTableValuedFunction.java @@ -50,6 +50,8 @@ public class WorkloadGroupsTableValuedFunction extends MetadataTableValuedFuncti new Column(WorkloadGroup.QUEUE_TIMEOUT, ScalarType.createType(PrimitiveType.BIGINT)), new Column(WorkloadGroup.CPU_HARD_LIMIT, ScalarType.createStringType()), new Column(WorkloadGroup.SCAN_THREAD_NUM, ScalarType.createType(PrimitiveType.INT)), + new Column(WorkloadGroup.MAX_REMOTE_SCAN_THREAD_NUM, ScalarType.createType(PrimitiveType.INT)), + new Column(WorkloadGroup.MIN_REMOTE_SCAN_THREAD_NUM, ScalarType.createType(PrimitiveType.INT)), new Column(QueryQueue.RUNNING_QUERY_NUM, ScalarType.createType(PrimitiveType.BIGINT)), new Column(QueryQueue.WAITING_QUERY_NUM, ScalarType.createType(PrimitiveType.BIGINT))); diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 24edaefc10..b803618af4 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -177,6 +177,8 @@ struct TWorkloadGroupInfo { 7: optional bool enable_memory_overcommit 8: optional bool enable_cpu_hard_limit 9: optional i32 scan_thread_num + 10: optional i32 max_remote_scan_thread_num + 11: optional i32 min_remote_scan_thread_num } enum TWorkloadMetricType { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 88cf25b9c6..1e433d42f5 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -412,7 +412,7 @@ struct TQueryStatistics { struct TReportWorkloadRuntimeStatusParams { 1: optional i64 backend_id - 2: map query_statistics_map + 2: optional map query_statistics_map } // The results of an INSERT query, sent to the coordinator as part of