diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp index 5263c06053..3fe0778ecb 100644 --- a/be/src/agent/cgroup_cpu_ctl.cpp +++ b/be/src/agent/cgroup_cpu_ctl.cpp @@ -132,7 +132,7 @@ Status CgroupV1CpuCtl::init() { if (_tg_id == -1) { // means current cgroup cpu ctl is just used to clear dir, - // it does not contains task group. + // it does not contains workload group. // todo(wb) rethinking whether need to refactor cgroup_cpu_ctl _init_succ = true; LOG(INFO) << "init cgroup cpu query path succ, path=" << _cgroup_v1_cpu_query_path; diff --git a/be/src/agent/workload_group_listener.cpp b/be/src/agent/workload_group_listener.cpp index 1d5a8544e1..f98315fa43 100644 --- a/be/src/agent/workload_group_listener.cpp +++ b/be/src/agent/workload_group_listener.cpp @@ -17,8 +17,8 @@ #include "agent/workload_group_listener.h" -#include "runtime/task_group/task_group.h" -#include "runtime/task_group/task_group_manager.h" +#include "runtime/workload_group/workload_group.h" +#include "runtime/workload_group/workload_group_manager.h" #include "util/mem_info.h" #include "util/parse_util.h" @@ -32,36 +32,37 @@ void WorkloadGroupListener::handle_topic_info(const std::vector& topi } // 1 parse topicinfo to group info - taskgroup::TaskGroupInfo task_group_info; - Status ret = taskgroup::TaskGroupInfo::parse_topic_info(topic_info.workload_group_info, - &task_group_info); + WorkloadGroupInfo workload_group_info; + Status ret = WorkloadGroupInfo::parse_topic_info(topic_info.workload_group_info, + &workload_group_info); if (!ret.ok()) { - LOG(INFO) << "parse topic info failed, tg_id=" << task_group_info.id + LOG(INFO) << "parse topic info failed, tg_id=" << workload_group_info.id << ", reason:" << ret.to_string(); continue; } - current_wg_ids.insert(task_group_info.id); + current_wg_ids.insert(workload_group_info.id); - // 2 update task group - auto tg = _exec_env->task_group_manager()->get_or_create_task_group(task_group_info); + // 2 update workload group + auto tg = + _exec_env->workload_group_mgr()->get_or_create_workload_group(workload_group_info); // 3 set cpu soft hard limit switch - _exec_env->task_group_manager()->_enable_cpu_hard_limit.store( - task_group_info.enable_cpu_hard_limit); + _exec_env->workload_group_mgr()->_enable_cpu_hard_limit.store( + workload_group_info.enable_cpu_hard_limit); // 4 create and update task scheduler - tg->upsert_task_scheduler(&task_group_info, _exec_env); + tg->upsert_task_scheduler(&workload_group_info, _exec_env); - LOG(INFO) << "update task group finish, tg info=" << tg->debug_string() + LOG(INFO) << "update workload group finish, tg info=" << tg->debug_string() << ", enable_cpu_hard_limit=" - << (_exec_env->task_group_manager()->enable_cpu_hard_limit() ? "true" : "false") - << ", cgroup cpu_shares=" << task_group_info.cgroup_cpu_shares - << ", cgroup cpu_hard_limit=" << task_group_info.cgroup_cpu_hard_limit + << (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false") + << ", cgroup cpu_shares=" << workload_group_info.cgroup_cpu_shares + << ", cgroup cpu_hard_limit=" << workload_group_info.cgroup_cpu_hard_limit << ", enable_cgroup_cpu_soft_limit=" << (config::enable_cgroup_cpu_soft_limit ? "true" : "false") << ", cgroup home path=" << config::doris_cgroup_cpu_path; } - _exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids); + _exec_env->workload_group_mgr()->delete_workload_group_by_ids(current_wg_ids); } } // namespace doris \ No newline at end of file diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index cb493ea807..4735c41c72 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -50,7 +50,7 @@ #include "runtime/memory/mem_tracker.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/runtime_query_statistics_mgr.h" -#include "runtime/task_group/task_group_manager.h" +#include "runtime/workload_group/workload_group_manager.h" #include "util/cpu_info.h" #include "util/debug_util.h" #include "util/disk_info.h" diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 517d6b8a8d..b9a5cb06ff 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -26,7 +26,7 @@ #include "common/status.h" #include "exec/operator.h" #include "pipeline.h" -#include "runtime/task_group/task_group.h" +#include "runtime/workload_group/workload_group.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" #include "vec/core/block.h" diff --git a/be/src/pipeline/pipeline_tracing.cpp b/be/src/pipeline/pipeline_tracing.cpp index 94675f77f6..f635f49682 100644 --- a/be/src/pipeline/pipeline_tracing.cpp +++ b/be/src/pipeline/pipeline_tracing.cpp @@ -46,10 +46,10 @@ void PipelineTracerContext::record(ScheduleRecord record) { } } -void PipelineTracerContext::end_query(TUniqueId query_id, uint64_t task_group) { +void PipelineTracerContext::end_query(TUniqueId query_id, uint64_t workload_group) { { std::unique_lock l(_tg_lock); - _id_to_taskgroup[query_id] = task_group; + _id_to_workload_group[query_id] = workload_group; } if (_dump_type == RecordType::PerQuery) { _dump(query_id); @@ -113,7 +113,7 @@ void PipelineTracerContext::_dump(TUniqueId query_id) { uint64_t v = 0; { std::unique_lock l(_tg_lock); - v = _id_to_taskgroup[query_id]; + v = _id_to_workload_group[query_id]; } auto tmp_str = record.to_string(v); auto text = Slice {tmp_str}; @@ -140,7 +140,7 @@ void PipelineTracerContext::_dump(TUniqueId query_id) { uint64_t v = 0; { std::unique_lock l(_tg_lock); - v = _id_to_taskgroup[query_id]; + v = _id_to_workload_group[query_id]; } auto tmp_str = record.to_string(v); auto text = Slice {tmp_str}; @@ -156,7 +156,7 @@ void PipelineTracerContext::_dump(TUniqueId query_id) { _datas.erase(query_id); { std::unique_lock l(_tg_lock); - _id_to_taskgroup.erase(query_id); + _id_to_workload_group.erase(query_id); } } } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_tracing.h b/be/src/pipeline/pipeline_tracing.h index 3160148c57..adebd84b05 100644 --- a/be/src/pipeline/pipeline_tracing.h +++ b/be/src/pipeline/pipeline_tracing.h @@ -61,7 +61,7 @@ public: }; void record(ScheduleRecord record); // record one schedule record void end_query(TUniqueId query_id, - uint64_t task_group); // tell context this query is end. may leads to dump. + uint64_t workload_group); // tell context this query is end. may leads to dump. Status change_record_params(const std::map& params); bool enabled() const { return !(_dump_type == RecordType::None); } @@ -72,7 +72,7 @@ private: std::mutex _data_lock; // lock for map, not map items. phmap::flat_hash_map _datas; std::mutex _tg_lock; //TODO: use an lockfree DS - phmap::flat_hash_map _id_to_taskgroup; + phmap::flat_hash_map _id_to_workload_group; RecordType _dump_type = RecordType::None; std::filesystem::path _dir = config::pipeline_tracing_log_dir; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 8707de4b54..c754af645e 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -28,7 +28,7 @@ #include "pipeline/pipeline.h" #include "pipeline/pipeline_task.h" #include "pipeline/pipeline_x/dependency.h" -#include "runtime/task_group/task_group.h" +#include "runtime/workload_group/workload_group.h" #include "util/runtime_profile.h" #include "util/stopwatch.hpp" #include "vec/core/block.h" diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 9440fc18d8..a6799fdb0c 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -30,7 +30,7 @@ #include "common/status.h" #include "pipeline_task.h" -#include "runtime/task_group/task_group.h" +#include "runtime/workload_group/workload_group.h" namespace doris { namespace pipeline { diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 3074cf02af..5bbf85fad4 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -30,7 +30,7 @@ #include "common/status.h" #include "gutil/ref_counted.h" #include "pipeline_task.h" -#include "runtime/task_group/task_group.h" +#include "runtime/workload_group/workload_group.h" #include "util/thread.h" namespace doris { diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 0b7e2101a4..9767b9bfc6 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -51,9 +51,7 @@ class TaskScheduler; class BlockedTaskScheduler; struct RuntimeFilterTimerQueue; } // namespace pipeline -namespace taskgroup { -class TaskGroupManager; -} +class WorkloadGroupMgr; namespace io { class FileCacheFactory; } // namespace io @@ -156,7 +154,7 @@ public: ClientCache* broker_client_cache() { return _broker_client_cache; } pipeline::TaskScheduler* pipeline_task_scheduler() { return _without_group_task_scheduler; } - taskgroup::TaskGroupManager* task_group_manager() { return _task_group_manager; } + WorkloadGroupMgr* workload_group_mgr() { return _workload_group_manager; } WorkloadSchedPolicyMgr* workload_sched_policy_mgr() { return _workload_sched_mgr; } RuntimeQueryStatiticsMgr* runtime_query_statistics_mgr() { return _runtime_query_statistics_mgr; @@ -338,7 +336,7 @@ private: FragmentMgr* _fragment_mgr = nullptr; pipeline::TaskScheduler* _without_group_task_scheduler = nullptr; - taskgroup::TaskGroupManager* _task_group_manager = nullptr; + WorkloadGroupMgr* _workload_group_manager = nullptr; ResultCache* _result_cache = nullptr; TMasterInfo* _master_info = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 04c2faeed2..b3d290e53b 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -75,9 +75,9 @@ #include "runtime/small_file_mgr.h" #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_executor.h" -#include "runtime/task_group/task_group_manager.h" #include "runtime/thread_context.h" #include "runtime/user_function_cache.h" +#include "runtime/workload_group/workload_group_manager.h" #include "runtime/workload_management/workload_sched_policy_mgr.h" #include "service/backend_options.h" #include "service/backend_service.h" @@ -210,7 +210,7 @@ Status ExecEnv::_init(const std::vector& store_paths, init_file_cache_factory(); _pipeline_tracer_ctx = std::make_unique(); // before query RETURN_IF_ERROR(init_pipeline_task_scheduler()); - _task_group_manager = new taskgroup::TaskGroupManager(); + _workload_group_manager = new WorkloadGroupMgr(); _scanner_scheduler = new doris::vectorized::ScannerScheduler(); _fragment_mgr = new FragmentMgr(this); _result_cache = new ResultCache(config::query_cache_max_size_mb, @@ -295,7 +295,7 @@ Status ExecEnv::init_pipeline_task_scheduler() { executors_size = CpuInfo::num_cores(); } - // TODO pipeline task group combie two blocked schedulers. + // TODO pipeline workload group combie two blocked schedulers. auto t_queue = std::make_shared(executors_size); _without_group_block_scheduler = std::make_shared("PipeNoGSchePool"); @@ -541,7 +541,7 @@ void ExecEnv::destroy() { SAFE_STOP(_without_group_task_scheduler); // stop pipline step 2, cgroup execution SAFE_SHUTDOWN(_global_block_scheduler.get()); - SAFE_STOP(_task_group_manager); + SAFE_STOP(_workload_group_manager); SAFE_STOP(_external_scan_context_mgr); SAFE_STOP(_fragment_mgr); @@ -606,7 +606,7 @@ void ExecEnv::destroy() { SAFE_DELETE(_result_cache); SAFE_DELETE(_fragment_mgr); SAFE_DELETE(_workload_sched_mgr); - SAFE_DELETE(_task_group_manager); + SAFE_DELETE(_workload_group_manager); SAFE_DELETE(_file_cache_factory); SAFE_DELETE(_runtime_filter_timer_queue); // TODO(zhiqiang): Maybe we should call shutdown before release thread pool? diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 1d905d5390..f6552f67fc 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -70,10 +70,10 @@ #include "runtime/stream_load/new_load_stream_mgr.h" #include "runtime/stream_load/stream_load_context.h" #include "runtime/stream_load/stream_load_executor.h" -#include "runtime/task_group/task_group.h" -#include "runtime/task_group/task_group_manager.h" #include "runtime/thread_context.h" #include "runtime/types.h" +#include "runtime/workload_group/workload_group.h" +#include "runtime/workload_group/workload_group_manager.h" #include "runtime/workload_management/workload_query_info.h" #include "service/backend_options.h" #include "util/debug_util.h" @@ -639,15 +639,15 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo if (params.__isset.workload_groups && !params.workload_groups.empty()) { uint64_t tg_id = params.workload_groups[0].id; - taskgroup::TaskGroupPtr task_group_ptr = - _exec_env->task_group_manager()->get_task_group_by_id(tg_id); - if (task_group_ptr != nullptr) { - RETURN_IF_ERROR(query_ctx->set_task_group(task_group_ptr)); + WorkloadGroupPtr workload_group_ptr = + _exec_env->workload_group_mgr()->get_task_group_by_id(tg_id); + if (workload_group_ptr != nullptr) { + RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr)); _exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id), tg_id); LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) - << ", use task group: " << task_group_ptr->debug_string() + << ", use workload group: " << workload_group_ptr->debug_string() << ", is pipeline: " << ((int)is_pipeline) << ", enable cgroup soft limit: " << ((int)config::enable_cgroup_cpu_soft_limit); @@ -657,7 +657,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo } } // There is some logic in query ctx's dctor, we could not check if exists and delete the - // temp query ctx now. For example, the query id maybe removed from task group's queryset. + // temp query ctx now. For example, the query id maybe removed from workload group's queryset. _query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx)); LOG(INFO) << "Register query/load memory tracker, query/load id: " << print_id(query_ctx->query_id()) diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 680c2917cb..83b81f5160 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -30,8 +30,8 @@ #include "olap/memtable_memory_limiter.h" #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" -#include "runtime/task_group/task_group.h" #include "runtime/thread_context.h" +#include "runtime/workload_group/workload_group.h" #include "service/backend_options.h" #include "util/mem_info.h" #include "util/perf_counters.h" @@ -396,8 +396,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, } int64_t MemTrackerLimiter::tg_free_top_memory_query( - int64_t min_free_mem, Type type, - std::vector& tracker_groups, + int64_t min_free_mem, Type type, std::vector& tracker_groups, const std::function& cancel_msg, RuntimeProfile* profile, GCType gctype) { return free_top_memory_query(min_free_mem, type, tracker_groups, cancel_msg, profile, gctype); @@ -531,8 +530,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, } int64_t MemTrackerLimiter::tg_free_top_overcommit_query( - int64_t min_free_mem, Type type, - std::vector& tracker_groups, + int64_t min_free_mem, Type type, std::vector& tracker_groups, const std::function& cancel_msg, RuntimeProfile* profile, GCType gctype) { return free_top_overcommit_query(min_free_mem, type, tracker_groups, cancel_msg, profile, diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 9703981af6..b10f41f669 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -45,11 +45,9 @@ class RuntimeProfile; constexpr auto MEM_TRACKER_GROUP_NUM = 1000; -namespace taskgroup { -struct TgTrackerLimiterGroup; -class TaskGroup; -using TaskGroupPtr = std::shared_ptr; -} // namespace taskgroup +struct WgTrackerLimiterGroup; +class WorkloadGroup; +using WorkloadGroupPtr = std::shared_ptr; // Track and limit the memory usage of process and query. // Contains an limit, arranged into a tree structure. @@ -195,8 +193,7 @@ public: RuntimeProfile* profile, GCType gctype); static int64_t tg_free_top_memory_query( - int64_t min_free_mem, Type type, - std::vector& tracker_groups, + int64_t min_free_mem, Type type, std::vector& tracker_groups, const std::function& cancel_msg, RuntimeProfile* profile, GCType gctype); @@ -219,8 +216,7 @@ public: RuntimeProfile* profile, GCType gctype); static int64_t tg_free_top_overcommit_query( - int64_t min_free_mem, Type type, - std::vector& tracker_groups, + int64_t min_free_mem, Type type, std::vector& tracker_groups, const std::function& cancel_msg, RuntimeProfile* profile, GCType gctype); diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 7dce8488ec..56001c4498 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -23,7 +23,7 @@ #include "pipeline/pipeline_fragment_context.h" #include "pipeline/pipeline_x/dependency.h" #include "runtime/runtime_query_statistics_mgr.h" -#include "runtime/task_group/task_group_manager.h" +#include "runtime/workload_group/workload_group_manager.h" #include "util/mem_info.h" namespace doris { @@ -102,10 +102,10 @@ QueryContext::~QueryContext() { MemTracker::print_bytes(query_mem_tracker->peak_consumption())); } uint64_t group_id = 0; - if (_task_group) { - group_id = _task_group->id(); // before remove - _task_group->remove_mem_tracker_limiter(query_mem_tracker); - _task_group->remove_query(_query_id); + if (_workload_group) { + group_id = _workload_group->id(); // before remove + _workload_group->remove_mem_tracker_limiter(query_mem_tracker); + _workload_group->remove_query(_query_id); } _exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id)); @@ -205,7 +205,7 @@ void QueryContext::register_cpu_statistics() { } doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() { - if (_task_group) { + if (_workload_group) { if (_task_scheduler) { return _task_scheduler; } @@ -214,21 +214,21 @@ doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() { } ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() { - if (_task_group) { + if (_workload_group) { return _non_pipe_thread_pool; } else { return nullptr; } } -Status QueryContext::set_task_group(taskgroup::TaskGroupPtr& tg) { - _task_group = tg; - // Should add query first, then the task group will not be deleted. - // see task_group_manager::delete_task_group_by_ids - RETURN_IF_ERROR(_task_group->add_query(_query_id)); - _task_group->add_mem_tracker_limiter(query_mem_tracker); - _task_group->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler, - &_non_pipe_thread_pool, &_remote_scan_task_scheduler); +Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) { + _workload_group = tg; + // Should add query first, then the workload group will not be deleted. + // see task_group_manager::delete_workload_group_by_ids + RETURN_IF_ERROR(_workload_group->add_query(_query_id)); + _workload_group->add_mem_tracker_limiter(query_mem_tracker); + _workload_group->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler, + &_non_pipe_thread_pool, &_remote_scan_task_scheduler); return Status::OK(); } diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 36c4f6ae11..e47c42b439 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -32,11 +32,11 @@ #include "runtime/query_statistics.h" #include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_predicate.h" -#include "task_group/task_group.h" #include "util/threadpool.h" #include "vec/exec/scan/scanner_scheduler.h" #include "vec/runtime/shared_hash_table_controller.h" #include "vec/runtime/shared_scanner_controller.h" +#include "workload_group/workload_group.h" namespace doris { @@ -163,7 +163,7 @@ public: } } - Status set_task_group(taskgroup::TaskGroupPtr& tg); + Status set_workload_group(WorkloadGroupPtr& tg); int execution_timeout() const { return _query_options.__isset.execution_timeout ? _query_options.execution_timeout @@ -292,7 +292,7 @@ private: std::shared_ptr _shared_scanner_controller; std::unordered_map _runtime_predicates; - taskgroup::TaskGroupPtr _task_group = nullptr; + WorkloadGroupPtr _workload_group = nullptr; std::unique_ptr _runtime_filter_mgr; const TQueryOptions _query_options; diff --git a/be/src/runtime/task_group/task_group.cpp b/be/src/runtime/workload_group/workload_group.cpp similarity index 81% rename from be/src/runtime/task_group/task_group.cpp rename to be/src/runtime/workload_group/workload_group.cpp index 53534444a5..88e5a1221e 100644 --- a/be/src/runtime/task_group/task_group.cpp +++ b/be/src/runtime/workload_group/workload_group.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "task_group.h" +#include "workload_group.h" #include #include @@ -38,7 +38,6 @@ #include "vec/exec/scan/scanner_scheduler.h" namespace doris { -namespace taskgroup { const static uint64_t CPU_SHARE_DEFAULT_VALUE = 1024; const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%"; @@ -46,7 +45,7 @@ const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true; const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1; const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024; -TaskGroup::TaskGroup(const TaskGroupInfo& tg_info) +WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info) : _id(tg_info.id), _name(tg_info.name), _version(tg_info.version), @@ -59,7 +58,7 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info) _max_remote_scan_thread_num(tg_info.max_remote_scan_thread_num), _min_remote_scan_thread_num(tg_info.min_remote_scan_thread_num) {} -std::string TaskGroup::debug_string() const { +std::string WorkloadGroup::debug_string() const { std::shared_lock rl {_mutex}; return fmt::format( "TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, enable_memory_overcommit = " @@ -70,7 +69,7 @@ std::string TaskGroup::debug_string() const { _scan_thread_num, _max_remote_scan_thread_num, _min_remote_scan_thread_num); } -void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) { +void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) { if (UNLIKELY(tg_info.id != _id)) { return; } @@ -98,7 +97,7 @@ void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) { } } -int64_t TaskGroup::memory_used() { +int64_t WorkloadGroup::memory_used() { int64_t used_memory = 0; for (auto& mem_tracker_group : _mem_tracker_limiter_pool) { std::lock_guard l(mem_tracker_group.group_lock); @@ -109,19 +108,19 @@ int64_t TaskGroup::memory_used() { return used_memory; } -void TaskGroup::add_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr) { +void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr) { auto group_num = mem_tracker_ptr->group_num(); std::lock_guard l(_mem_tracker_limiter_pool[group_num].group_lock); _mem_tracker_limiter_pool[group_num].trackers.insert(mem_tracker_ptr); } -void TaskGroup::remove_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr) { +void WorkloadGroup::remove_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr) { auto group_num = mem_tracker_ptr->group_num(); std::lock_guard l(_mem_tracker_limiter_pool[group_num].group_lock); _mem_tracker_limiter_pool[group_num].trackers.erase(mem_tracker_ptr); } -int64_t TaskGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile) { +int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile) { if (need_free_mem <= 0) { return 0; } @@ -202,97 +201,97 @@ int64_t TaskGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile) { return freed_mem; } -Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& workload_group_info, - taskgroup::TaskGroupInfo* task_group_info) { +Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_group_info, + WorkloadGroupInfo* workload_group_info) { // 1 id int tg_id = 0; - if (workload_group_info.__isset.id) { - tg_id = workload_group_info.id; + if (tworkload_group_info.__isset.id) { + tg_id = tworkload_group_info.id; } else { return Status::InternalError("workload group id is required"); } - task_group_info->id = tg_id; + workload_group_info->id = tg_id; // 2 name std::string name = "INVALID_NAME"; - if (workload_group_info.__isset.name) { - name = workload_group_info.name; + if (tworkload_group_info.__isset.name) { + name = tworkload_group_info.name; } - task_group_info->name = name; + workload_group_info->name = name; // 3 version int version = 0; - if (workload_group_info.__isset.version) { - version = workload_group_info.version; + if (tworkload_group_info.__isset.version) { + version = tworkload_group_info.version; } else { return Status::InternalError("workload group version is required"); } - task_group_info->version = version; + workload_group_info->version = version; // 4 cpu_share uint64_t cpu_share = CPU_SHARE_DEFAULT_VALUE; - if (workload_group_info.__isset.cpu_share) { - cpu_share = workload_group_info.cpu_share; + if (tworkload_group_info.__isset.cpu_share) { + cpu_share = tworkload_group_info.cpu_share; } - task_group_info->cpu_share = cpu_share; + workload_group_info->cpu_share = cpu_share; // 5 cpu hard limit int cpu_hard_limit = CPU_HARD_LIMIT_DEFAULT_VALUE; - if (workload_group_info.__isset.cpu_hard_limit) { - cpu_hard_limit = workload_group_info.cpu_hard_limit; + if (tworkload_group_info.__isset.cpu_hard_limit) { + cpu_hard_limit = tworkload_group_info.cpu_hard_limit; } - task_group_info->cpu_hard_limit = cpu_hard_limit; + workload_group_info->cpu_hard_limit = cpu_hard_limit; // 6 mem_limit std::string mem_limit_str = MEMORY_LIMIT_DEFAULT_VALUE; - if (workload_group_info.__isset.mem_limit) { - mem_limit_str = workload_group_info.mem_limit; + if (tworkload_group_info.__isset.mem_limit) { + mem_limit_str = tworkload_group_info.mem_limit; } bool is_percent = true; int64_t mem_limit = ParseUtil::parse_mem_spec(mem_limit_str, -1, MemInfo::mem_limit(), &is_percent); - task_group_info->memory_limit = mem_limit; + workload_group_info->memory_limit = mem_limit; // 7 mem overcommit bool enable_memory_overcommit = ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE; - if (workload_group_info.__isset.enable_memory_overcommit) { - enable_memory_overcommit = workload_group_info.enable_memory_overcommit; + if (tworkload_group_info.__isset.enable_memory_overcommit) { + enable_memory_overcommit = tworkload_group_info.enable_memory_overcommit; } - task_group_info->enable_memory_overcommit = enable_memory_overcommit; + workload_group_info->enable_memory_overcommit = enable_memory_overcommit; // 8 cpu soft limit or hard limit bool enable_cpu_hard_limit = false; - if (workload_group_info.__isset.enable_cpu_hard_limit) { - enable_cpu_hard_limit = workload_group_info.enable_cpu_hard_limit; + if (tworkload_group_info.__isset.enable_cpu_hard_limit) { + enable_cpu_hard_limit = tworkload_group_info.enable_cpu_hard_limit; } - task_group_info->enable_cpu_hard_limit = enable_cpu_hard_limit; + workload_group_info->enable_cpu_hard_limit = enable_cpu_hard_limit; // 9 scan thread num - task_group_info->scan_thread_num = config::doris_scanner_thread_pool_thread_num; - if (workload_group_info.__isset.scan_thread_num && workload_group_info.scan_thread_num > 0) { - task_group_info->scan_thread_num = workload_group_info.scan_thread_num; + workload_group_info->scan_thread_num = config::doris_scanner_thread_pool_thread_num; + if (tworkload_group_info.__isset.scan_thread_num && tworkload_group_info.scan_thread_num > 0) { + workload_group_info->scan_thread_num = tworkload_group_info.scan_thread_num; } // 10 max remote scan thread num - task_group_info->max_remote_scan_thread_num = config::doris_scanner_thread_pool_thread_num; - if (workload_group_info.__isset.max_remote_scan_thread_num && - workload_group_info.max_remote_scan_thread_num > 0) { - task_group_info->max_remote_scan_thread_num = - workload_group_info.max_remote_scan_thread_num; + workload_group_info->max_remote_scan_thread_num = config::doris_scanner_thread_pool_thread_num; + if (tworkload_group_info.__isset.max_remote_scan_thread_num && + tworkload_group_info.max_remote_scan_thread_num > 0) { + workload_group_info->max_remote_scan_thread_num = + tworkload_group_info.max_remote_scan_thread_num; } // 11 min remote scan thread num - task_group_info->min_remote_scan_thread_num = config::doris_scanner_thread_pool_thread_num; - if (workload_group_info.__isset.min_remote_scan_thread_num && - workload_group_info.min_remote_scan_thread_num > 0) { - task_group_info->min_remote_scan_thread_num = - workload_group_info.min_remote_scan_thread_num; + workload_group_info->min_remote_scan_thread_num = config::doris_scanner_thread_pool_thread_num; + if (tworkload_group_info.__isset.min_remote_scan_thread_num && + tworkload_group_info.min_remote_scan_thread_num > 0) { + workload_group_info->min_remote_scan_thread_num = + tworkload_group_info.min_remote_scan_thread_num; } return Status::OK(); } -void TaskGroup::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env) { +void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env) { uint64_t tg_id = tg_info->id; std::string tg_name = tg_info->name; int cpu_hard_limit = tg_info->cpu_hard_limit; @@ -407,10 +406,10 @@ void TaskGroup::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv } } -void TaskGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched, - vectorized::SimplifiedScanScheduler** scan_sched, - ThreadPool** non_pipe_thread_pool, - vectorized::SimplifiedScanScheduler** remote_scan_sched) { +void WorkloadGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched, + vectorized::SimplifiedScanScheduler** scan_sched, + ThreadPool** non_pipe_thread_pool, + vectorized::SimplifiedScanScheduler** remote_scan_sched) { std::shared_lock rlock(_task_sched_lock); *exec_sched = _task_sched.get(); *scan_sched = _scan_task_sched.get(); @@ -418,7 +417,7 @@ void TaskGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched, *non_pipe_thread_pool = _non_pipe_thread_pool.get(); } -void TaskGroup::try_stop_schedulers() { +void WorkloadGroup::try_stop_schedulers() { std::shared_lock rlock(_task_sched_lock); if (_task_sched) { _task_sched->stop(); @@ -435,5 +434,4 @@ void TaskGroup::try_stop_schedulers() { } } -} // namespace taskgroup } // namespace doris diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/workload_group/workload_group.h similarity index 87% rename from be/src/runtime/task_group/task_group.h rename to be/src/runtime/workload_group/workload_group.h index 938f73ac20..af77be493b 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/workload_group/workload_group.h @@ -49,18 +49,16 @@ class PipelineTask; class TaskScheduler; } // namespace pipeline -namespace taskgroup { - -class TaskGroup; -struct TaskGroupInfo; -struct TgTrackerLimiterGroup { +class WorkloadGroup; +struct WorkloadGroupInfo; +struct WgTrackerLimiterGroup { std::unordered_set> trackers; std::mutex group_lock; }; -class TaskGroup : public std::enable_shared_from_this { +class WorkloadGroup : public std::enable_shared_from_this { public: - explicit TaskGroup(const TaskGroupInfo& tg_info); + explicit WorkloadGroup(const WorkloadGroupInfo& tg_info); int64_t version() const { return _version; } @@ -86,7 +84,7 @@ public: std::string debug_string() const; - void check_and_update(const TaskGroupInfo& tg_info); + void check_and_update(const WorkloadGroupInfo& tg_info); void add_mem_tracker_limiter(std::shared_ptr mem_tracker_ptr); @@ -102,7 +100,7 @@ public: Status add_query(TUniqueId query_id) { std::unique_lock wlock(_mutex); if (_is_shutdown) { - // If the task group is set shutdown, then should not run any more, + // If the workload group is set shutdown, then should not run any more, // because the scheduler pool and other pointer may be released. return Status::InternalError( "Failed add query to workload group, the workload group is shutdown. host: {}", @@ -129,7 +127,7 @@ public: int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile); - void upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env); + void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env); void get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched, vectorized::SimplifiedScanScheduler** scan_sched, @@ -146,13 +144,13 @@ private: int64_t _memory_limit; // bytes bool _enable_memory_overcommit; std::atomic _cpu_share; - std::vector _mem_tracker_limiter_pool; + std::vector _mem_tracker_limiter_pool; std::atomic _cpu_hard_limit; std::atomic _scan_thread_num; std::atomic _max_remote_scan_thread_num; std::atomic _min_remote_scan_thread_num; - // means task group is mark dropped + // means workload group is mark dropped // new query can not submit // waiting running query to be cancelled or finish bool _is_shutdown = false; @@ -166,9 +164,9 @@ private: std::unique_ptr _non_pipe_thread_pool = nullptr; }; -using TaskGroupPtr = std::shared_ptr; +using WorkloadGroupPtr = std::shared_ptr; -struct TaskGroupInfo { +struct WorkloadGroupInfo { uint64_t id; std::string name; uint64_t cpu_share; @@ -184,9 +182,8 @@ struct TaskGroupInfo { uint64_t cgroup_cpu_shares = 0; int cgroup_cpu_hard_limit = 0; - static Status parse_topic_info(const TWorkloadGroupInfo& topic_info, - taskgroup::TaskGroupInfo* task_group_info); + static Status parse_topic_info(const TWorkloadGroupInfo& tworkload_group_info, + WorkloadGroupInfo* workload_group_info); }; -} // namespace taskgroup } // namespace doris diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/workload_group/workload_group_manager.cpp similarity index 59% rename from be/src/runtime/task_group/task_group_manager.cpp rename to be/src/runtime/workload_group/workload_group_manager.cpp index a336cccd3d..7ec0838754 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/workload_group/workload_group_manager.cpp @@ -15,75 +15,76 @@ // specific language governing permissions and limitations // under the License. -#include "task_group_manager.h" +#include "workload_group_manager.h" #include #include #include "pipeline/task_scheduler.h" #include "runtime/memory/mem_tracker_limiter.h" -#include "runtime/task_group/task_group.h" +#include "runtime/workload_group/workload_group.h" #include "util/threadpool.h" #include "util/time.h" #include "vec/exec/scan/scanner_scheduler.h" -namespace doris::taskgroup { +namespace doris { -TaskGroupPtr TaskGroupManager::get_or_create_task_group(const TaskGroupInfo& task_group_info) { +WorkloadGroupPtr WorkloadGroupMgr::get_or_create_workload_group( + const WorkloadGroupInfo& workload_group_info) { { std::shared_lock r_lock(_group_mutex); - if (LIKELY(_task_groups.count(task_group_info.id))) { - auto task_group = _task_groups[task_group_info.id]; - task_group->check_and_update(task_group_info); - return task_group; + if (LIKELY(_workload_groups.count(workload_group_info.id))) { + auto workload_group = _workload_groups[workload_group_info.id]; + workload_group->check_and_update(workload_group_info); + return workload_group; } } - auto new_task_group = std::make_shared(task_group_info); + auto new_task_group = std::make_shared(workload_group_info); std::lock_guard w_lock(_group_mutex); - if (_task_groups.count(task_group_info.id)) { - auto task_group = _task_groups[task_group_info.id]; - task_group->check_and_update(task_group_info); - return task_group; + if (_workload_groups.count(workload_group_info.id)) { + auto workload_group = _workload_groups[workload_group_info.id]; + workload_group->check_and_update(workload_group_info); + return workload_group; } - _task_groups[task_group_info.id] = new_task_group; + _workload_groups[workload_group_info.id] = new_task_group; return new_task_group; } -void TaskGroupManager::get_related_taskgroups( - const std::function& pred, - std::vector* task_groups) { +void WorkloadGroupMgr::get_related_workload_groups( + const std::function& pred, + std::vector* task_groups) { std::shared_lock r_lock(_group_mutex); - for (const auto& [id, task_group] : _task_groups) { - if (pred(task_group)) { - task_groups->push_back(task_group); + for (const auto& [id, workload_group] : _workload_groups) { + if (pred(workload_group)) { + task_groups->push_back(workload_group); } } } -TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t tg_id) { +WorkloadGroupPtr WorkloadGroupMgr::get_task_group_by_id(uint64_t tg_id) { std::shared_lock r_lock(_group_mutex); - if (_task_groups.find(tg_id) != _task_groups.end()) { - return _task_groups.at(tg_id); + if (_workload_groups.find(tg_id) != _workload_groups.end()) { + return _workload_groups.at(tg_id); } return nullptr; } -void TaskGroupManager::delete_task_group_by_ids(std::set used_wg_id) { +void WorkloadGroupMgr::delete_workload_group_by_ids(std::set used_wg_id) { int64_t begin_time = MonotonicMillis(); // 1 get delete group without running queries - std::vector deleted_task_groups; + std::vector deleted_task_groups; { std::lock_guard write_lock(_group_mutex); - for (auto iter = _task_groups.begin(); iter != _task_groups.end(); iter++) { + for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { uint64_t tg_id = iter->first; - auto task_group_ptr = iter->second; + auto workload_group_ptr = iter->second; if (used_wg_id.find(tg_id) == used_wg_id.end()) { - task_group_ptr->shutdown(); - // only when no query running in task group, its resource can be released in BE - if (task_group_ptr->query_num() == 0) { + workload_group_ptr->shutdown(); + // only when no query running in workload group, its resource can be released in BE + if (workload_group_ptr->query_num() == 0) { LOG(INFO) << "There is no query in wg " << tg_id << ", delete it."; - deleted_task_groups.push_back(task_group_ptr); + deleted_task_groups.push_back(workload_group_ptr); } } } @@ -100,7 +101,7 @@ void TaskGroupManager::delete_task_group_by_ids(std::set used_wg_id) { { std::lock_guard write_lock(_group_mutex); for (auto& tg : deleted_task_groups) { - _task_groups.erase(tg->id()); + _workload_groups.erase(tg->id()); } } @@ -119,7 +120,7 @@ void TaskGroupManager::delete_task_group_by_ids(std::set used_wg_id) { if (ret.ok()) { _is_init_succ = true; } else { - LOG(INFO) << "init task group mgr cpu ctl failed, " << ret.to_string(); + LOG(INFO) << "init workload group mgr cpu ctl failed, " << ret.to_string(); } } if (_is_init_succ) { @@ -130,14 +131,14 @@ void TaskGroupManager::delete_task_group_by_ids(std::set used_wg_id) { } } int64_t time_cost_ms = MonotonicMillis() - begin_time; - LOG(INFO) << "finish clear unused task group, time cost: " << time_cost_ms + LOG(INFO) << "finish clear unused workload group, time cost: " << time_cost_ms << "ms, deleted group size:" << deleted_task_groups.size(); } -void TaskGroupManager::stop() { - for (auto iter = _task_groups.begin(); iter != _task_groups.end(); iter++) { +void WorkloadGroupMgr::stop() { + for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) { iter->second->try_stop_schedulers(); } } -} // namespace doris::taskgroup +} // namespace doris diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/workload_group/workload_group_manager.h similarity index 70% rename from be/src/runtime/task_group/task_group_manager.h rename to be/src/runtime/workload_group/workload_group_manager.h index 21772bd3bc..1f680eb17c 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/workload_group/workload_group_manager.h @@ -21,7 +21,7 @@ #include #include -#include "task_group.h" +#include "workload_group.h" namespace doris { @@ -32,21 +32,19 @@ class TaskScheduler; class MultiCoreTaskQueue; } // namespace pipeline -namespace taskgroup { - -class TaskGroupManager { +class WorkloadGroupMgr { public: - TaskGroupManager() = default; - ~TaskGroupManager() = default; + WorkloadGroupMgr() = default; + ~WorkloadGroupMgr() = default; - TaskGroupPtr get_or_create_task_group(const TaskGroupInfo& task_group_info); + WorkloadGroupPtr get_or_create_workload_group(const WorkloadGroupInfo& workload_group_info); - void get_related_taskgroups(const std::function& pred, - std::vector* task_groups); + void get_related_workload_groups(const std::function& pred, + std::vector* task_groups); - void delete_task_group_by_ids(std::set id_set); + void delete_workload_group_by_ids(std::set id_set); - TaskGroupPtr get_task_group_by_id(uint64_t tg_id); + WorkloadGroupPtr get_task_group_by_id(uint64_t tg_id); void stop(); @@ -58,12 +56,11 @@ public: private: std::shared_mutex _group_mutex; - std::unordered_map _task_groups; + std::unordered_map _workload_groups; std::shared_mutex _init_cg_ctl_lock; std::unique_ptr _cg_cpu_ctl; bool _is_init_succ = false; }; -} // namespace taskgroup } // namespace doris diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 3d9c4c4b06..a5ca3b70f1 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -42,8 +42,8 @@ #include "runtime/exec_env.h" #include "runtime/memory/cache_manager.h" #include "runtime/memory/mem_tracker_limiter.h" -#include "runtime/task_group/task_group.h" -#include "runtime/task_group/task_group_manager.h" +#include "runtime/workload_group/workload_group.h" +#include "runtime/workload_group/workload_group_manager.h" #include "util/cgroup_util.h" #include "util/defer_op.h" #include "util/parse_util.h" @@ -241,23 +241,24 @@ bool MemInfo::process_full_gc() { int64_t MemInfo::tg_not_enable_overcommit_group_gc() { MonotonicStopWatch watch; watch.start(); - std::vector task_groups; + std::vector task_groups; std::unique_ptr tg_profile = std::make_unique("WorkloadGroup"); int64_t total_free_memory = 0; - ExecEnv::GetInstance()->task_group_manager()->get_related_taskgroups( - [](const taskgroup::TaskGroupPtr& task_group) { - return task_group->is_mem_limit_valid() && !task_group->enable_memory_overcommit(); + ExecEnv::GetInstance()->workload_group_mgr()->get_related_workload_groups( + [](const WorkloadGroupPtr& workload_group) { + return workload_group->is_mem_limit_valid() && + !workload_group->enable_memory_overcommit(); }, &task_groups); if (task_groups.empty()) { return 0; } - std::vector task_groups_overcommit; - for (const auto& task_group : task_groups) { - if (task_group->memory_used() > task_group->memory_limit()) { - task_groups_overcommit.push_back(task_group); + std::vector task_groups_overcommit; + for (const auto& workload_group : task_groups) { + if (workload_group->memory_used() > workload_group->memory_limit()) { + task_groups_overcommit.push_back(workload_group); } } if (task_groups_overcommit.empty()) { @@ -283,10 +284,10 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() { } }}; - for (const auto& task_group : task_groups_overcommit) { - auto used = task_group->memory_used(); + for (const auto& workload_group : task_groups_overcommit) { + auto used = workload_group->memory_used(); total_free_memory += - task_group->gc_memory(used - task_group->memory_limit(), tg_profile.get()); + workload_group->gc_memory(used - workload_group->memory_limit(), tg_profile.get()); } return total_free_memory; } @@ -295,10 +296,11 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory, RuntimeProfile* profile) { MonotonicStopWatch watch; watch.start(); - std::vector task_groups; - ExecEnv::GetInstance()->task_group_manager()->get_related_taskgroups( - [](const taskgroup::TaskGroupPtr& task_group) { - return task_group->is_mem_limit_valid() && task_group->enable_memory_overcommit(); + std::vector task_groups; + ExecEnv::GetInstance()->workload_group_mgr()->get_related_workload_groups( + [](const WorkloadGroupPtr& workload_group) { + return workload_group->is_mem_limit_valid() && + workload_group->enable_memory_overcommit(); }, &task_groups); if (task_groups.empty()) { @@ -308,9 +310,9 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory, int64_t total_exceeded_memory = 0; std::vector used_memorys; std::vector exceeded_memorys; - for (const auto& task_group : task_groups) { - int64_t used_memory = task_group->memory_used(); - int64_t exceeded = used_memory - task_group->memory_limit(); + for (const auto& workload_group : task_groups) { + int64_t used_memory = workload_group->memory_used(); + int64_t exceeded = used_memory - workload_group->memory_limit(); int64_t exceeded_memory = exceeded > 0 ? exceeded : 0; total_exceeded_memory += exceeded_memory; used_memorys.emplace_back(used_memory); @@ -356,8 +358,8 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory, gc_all_exceeded ? exceeded_memorys[i] : static_cast(exceeded_memorys[i]) / total_exceeded_memory * request_free_memory /* exceeded memory as a weight */; - auto task_group = task_groups[i]; - total_free_memory += task_group->gc_memory(tg_need_free_memory, profile); + auto workload_group = task_groups[i]; + total_free_memory += workload_group->gc_memory(tg_need_free_memory, profile); } return total_free_memory; } diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index 1052e77ae0..2488905409 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -39,16 +39,13 @@ namespace doris { class ThreadPoolToken; class RuntimeState; class TupleDescriptor; +class WorkloadGroup; namespace pipeline { class ScanLocalStateBase; class Dependency; } // namespace pipeline -namespace taskgroup { -class TaskGroup; -} // namespace taskgroup - namespace vectorized { class VScanner;