[refactor](rename) rename task group to workload group in be (#32204)

---------

Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
yiguolei
2024-03-14 14:56:47 +08:00
committed by yiguolei
parent 0578b28d54
commit 62023d705d
22 changed files with 213 additions and 228 deletions

View File

@ -132,7 +132,7 @@ Status CgroupV1CpuCtl::init() {
if (_tg_id == -1) {
// means current cgroup cpu ctl is just used to clear dir,
// it does not contains task group.
// it does not contains workload group.
// todo(wb) rethinking whether need to refactor cgroup_cpu_ctl
_init_succ = true;
LOG(INFO) << "init cgroup cpu query path succ, path=" << _cgroup_v1_cpu_query_path;

View File

@ -17,8 +17,8 @@
#include "agent/workload_group_listener.h"
#include "runtime/task_group/task_group.h"
#include "runtime/task_group/task_group_manager.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/mem_info.h"
#include "util/parse_util.h"
@ -32,36 +32,37 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
}
// 1 parse topicinfo to group info
taskgroup::TaskGroupInfo task_group_info;
Status ret = taskgroup::TaskGroupInfo::parse_topic_info(topic_info.workload_group_info,
&task_group_info);
WorkloadGroupInfo workload_group_info;
Status ret = WorkloadGroupInfo::parse_topic_info(topic_info.workload_group_info,
&workload_group_info);
if (!ret.ok()) {
LOG(INFO) << "parse topic info failed, tg_id=" << task_group_info.id
LOG(INFO) << "parse topic info failed, tg_id=" << workload_group_info.id
<< ", reason:" << ret.to_string();
continue;
}
current_wg_ids.insert(task_group_info.id);
current_wg_ids.insert(workload_group_info.id);
// 2 update task group
auto tg = _exec_env->task_group_manager()->get_or_create_task_group(task_group_info);
// 2 update workload group
auto tg =
_exec_env->workload_group_mgr()->get_or_create_workload_group(workload_group_info);
// 3 set cpu soft hard limit switch
_exec_env->task_group_manager()->_enable_cpu_hard_limit.store(
task_group_info.enable_cpu_hard_limit);
_exec_env->workload_group_mgr()->_enable_cpu_hard_limit.store(
workload_group_info.enable_cpu_hard_limit);
// 4 create and update task scheduler
tg->upsert_task_scheduler(&task_group_info, _exec_env);
tg->upsert_task_scheduler(&workload_group_info, _exec_env);
LOG(INFO) << "update task group finish, tg info=" << tg->debug_string()
LOG(INFO) << "update workload group finish, tg info=" << tg->debug_string()
<< ", enable_cpu_hard_limit="
<< (_exec_env->task_group_manager()->enable_cpu_hard_limit() ? "true" : "false")
<< ", cgroup cpu_shares=" << task_group_info.cgroup_cpu_shares
<< ", cgroup cpu_hard_limit=" << task_group_info.cgroup_cpu_hard_limit
<< (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false")
<< ", cgroup cpu_shares=" << workload_group_info.cgroup_cpu_shares
<< ", cgroup cpu_hard_limit=" << workload_group_info.cgroup_cpu_hard_limit
<< ", enable_cgroup_cpu_soft_limit="
<< (config::enable_cgroup_cpu_soft_limit ? "true" : "false")
<< ", cgroup home path=" << config::doris_cgroup_cpu_path;
}
_exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids);
_exec_env->workload_group_mgr()->delete_workload_group_by_ids(current_wg_ids);
}
} // namespace doris

View File

@ -50,7 +50,7 @@
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/task_group/task_group_manager.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/cpu_info.h"
#include "util/debug_util.h"
#include "util/disk_info.h"

View File

@ -26,7 +26,7 @@
#include "common/status.h"
#include "exec/operator.h"
#include "pipeline.h"
#include "runtime/task_group/task_group.h"
#include "runtime/workload_group/workload_group.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/core/block.h"

View File

@ -46,10 +46,10 @@ void PipelineTracerContext::record(ScheduleRecord record) {
}
}
void PipelineTracerContext::end_query(TUniqueId query_id, uint64_t task_group) {
void PipelineTracerContext::end_query(TUniqueId query_id, uint64_t workload_group) {
{
std::unique_lock<std::mutex> l(_tg_lock);
_id_to_taskgroup[query_id] = task_group;
_id_to_workload_group[query_id] = workload_group;
}
if (_dump_type == RecordType::PerQuery) {
_dump(query_id);
@ -113,7 +113,7 @@ void PipelineTracerContext::_dump(TUniqueId query_id) {
uint64_t v = 0;
{
std::unique_lock<std::mutex> l(_tg_lock);
v = _id_to_taskgroup[query_id];
v = _id_to_workload_group[query_id];
}
auto tmp_str = record.to_string(v);
auto text = Slice {tmp_str};
@ -140,7 +140,7 @@ void PipelineTracerContext::_dump(TUniqueId query_id) {
uint64_t v = 0;
{
std::unique_lock<std::mutex> l(_tg_lock);
v = _id_to_taskgroup[query_id];
v = _id_to_workload_group[query_id];
}
auto tmp_str = record.to_string(v);
auto text = Slice {tmp_str};
@ -156,7 +156,7 @@ void PipelineTracerContext::_dump(TUniqueId query_id) {
_datas.erase(query_id);
{
std::unique_lock<std::mutex> l(_tg_lock);
_id_to_taskgroup.erase(query_id);
_id_to_workload_group.erase(query_id);
}
}
} // namespace doris::pipeline

View File

@ -61,7 +61,7 @@ public:
};
void record(ScheduleRecord record); // record one schedule record
void end_query(TUniqueId query_id,
uint64_t task_group); // tell context this query is end. may leads to dump.
uint64_t workload_group); // tell context this query is end. may leads to dump.
Status change_record_params(const std::map<std::string, std::string>& params);
bool enabled() const { return !(_dump_type == RecordType::None); }
@ -72,7 +72,7 @@ private:
std::mutex _data_lock; // lock for map, not map items.
phmap::flat_hash_map<TUniqueId, OneQueryTraces> _datas;
std::mutex _tg_lock; //TODO: use an lockfree DS
phmap::flat_hash_map<TUniqueId, uint64_t> _id_to_taskgroup;
phmap::flat_hash_map<TUniqueId, uint64_t> _id_to_workload_group;
RecordType _dump_type = RecordType::None;
std::filesystem::path _dir = config::pipeline_tracing_log_dir;

View File

@ -28,7 +28,7 @@
#include "pipeline/pipeline.h"
#include "pipeline/pipeline_task.h"
#include "pipeline/pipeline_x/dependency.h"
#include "runtime/task_group/task_group.h"
#include "runtime/workload_group/workload_group.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/core/block.h"

View File

@ -30,7 +30,7 @@
#include "common/status.h"
#include "pipeline_task.h"
#include "runtime/task_group/task_group.h"
#include "runtime/workload_group/workload_group.h"
namespace doris {
namespace pipeline {

View File

@ -30,7 +30,7 @@
#include "common/status.h"
#include "gutil/ref_counted.h"
#include "pipeline_task.h"
#include "runtime/task_group/task_group.h"
#include "runtime/workload_group/workload_group.h"
#include "util/thread.h"
namespace doris {

View File

@ -51,9 +51,7 @@ class TaskScheduler;
class BlockedTaskScheduler;
struct RuntimeFilterTimerQueue;
} // namespace pipeline
namespace taskgroup {
class TaskGroupManager;
}
class WorkloadGroupMgr;
namespace io {
class FileCacheFactory;
} // namespace io
@ -156,7 +154,7 @@ public:
ClientCache<TPaloBrokerServiceClient>* broker_client_cache() { return _broker_client_cache; }
pipeline::TaskScheduler* pipeline_task_scheduler() { return _without_group_task_scheduler; }
taskgroup::TaskGroupManager* task_group_manager() { return _task_group_manager; }
WorkloadGroupMgr* workload_group_mgr() { return _workload_group_manager; }
WorkloadSchedPolicyMgr* workload_sched_policy_mgr() { return _workload_sched_mgr; }
RuntimeQueryStatiticsMgr* runtime_query_statistics_mgr() {
return _runtime_query_statistics_mgr;
@ -338,7 +336,7 @@ private:
FragmentMgr* _fragment_mgr = nullptr;
pipeline::TaskScheduler* _without_group_task_scheduler = nullptr;
taskgroup::TaskGroupManager* _task_group_manager = nullptr;
WorkloadGroupMgr* _workload_group_manager = nullptr;
ResultCache* _result_cache = nullptr;
TMasterInfo* _master_info = nullptr;

View File

@ -75,9 +75,9 @@
#include "runtime/small_file_mgr.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/task_group/task_group_manager.h"
#include "runtime/thread_context.h"
#include "runtime/user_function_cache.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "runtime/workload_management/workload_sched_policy_mgr.h"
#include "service/backend_options.h"
#include "service/backend_service.h"
@ -210,7 +210,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
init_file_cache_factory();
_pipeline_tracer_ctx = std::make_unique<pipeline::PipelineTracerContext>(); // before query
RETURN_IF_ERROR(init_pipeline_task_scheduler());
_task_group_manager = new taskgroup::TaskGroupManager();
_workload_group_manager = new WorkloadGroupMgr();
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
_fragment_mgr = new FragmentMgr(this);
_result_cache = new ResultCache(config::query_cache_max_size_mb,
@ -295,7 +295,7 @@ Status ExecEnv::init_pipeline_task_scheduler() {
executors_size = CpuInfo::num_cores();
}
// TODO pipeline task group combie two blocked schedulers.
// TODO pipeline workload group combie two blocked schedulers.
auto t_queue = std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
_without_group_block_scheduler =
std::make_shared<pipeline::BlockedTaskScheduler>("PipeNoGSchePool");
@ -541,7 +541,7 @@ void ExecEnv::destroy() {
SAFE_STOP(_without_group_task_scheduler);
// stop pipline step 2, cgroup execution
SAFE_SHUTDOWN(_global_block_scheduler.get());
SAFE_STOP(_task_group_manager);
SAFE_STOP(_workload_group_manager);
SAFE_STOP(_external_scan_context_mgr);
SAFE_STOP(_fragment_mgr);
@ -606,7 +606,7 @@ void ExecEnv::destroy() {
SAFE_DELETE(_result_cache);
SAFE_DELETE(_fragment_mgr);
SAFE_DELETE(_workload_sched_mgr);
SAFE_DELETE(_task_group_manager);
SAFE_DELETE(_workload_group_manager);
SAFE_DELETE(_file_cache_factory);
SAFE_DELETE(_runtime_filter_timer_queue);
// TODO(zhiqiang): Maybe we should call shutdown before release thread pool?

View File

@ -70,10 +70,10 @@
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/task_group/task_group.h"
#include "runtime/task_group/task_group_manager.h"
#include "runtime/thread_context.h"
#include "runtime/types.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "runtime/workload_management/workload_query_info.h"
#include "service/backend_options.h"
#include "util/debug_util.h"
@ -639,15 +639,15 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
if (params.__isset.workload_groups && !params.workload_groups.empty()) {
uint64_t tg_id = params.workload_groups[0].id;
taskgroup::TaskGroupPtr task_group_ptr =
_exec_env->task_group_manager()->get_task_group_by_id(tg_id);
if (task_group_ptr != nullptr) {
RETURN_IF_ERROR(query_ctx->set_task_group(task_group_ptr));
WorkloadGroupPtr workload_group_ptr =
_exec_env->workload_group_mgr()->get_task_group_by_id(tg_id);
if (workload_group_ptr != nullptr) {
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id),
tg_id);
LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id())
<< ", use task group: " << task_group_ptr->debug_string()
<< ", use workload group: " << workload_group_ptr->debug_string()
<< ", is pipeline: " << ((int)is_pipeline)
<< ", enable cgroup soft limit: "
<< ((int)config::enable_cgroup_cpu_soft_limit);
@ -657,7 +657,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo
}
}
// There is some logic in query ctx's dctor, we could not check if exists and delete the
// temp query ctx now. For example, the query id maybe removed from task group's queryset.
// temp query ctx now. For example, the query id maybe removed from workload group's queryset.
_query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx));
LOG(INFO) << "Register query/load memory tracker, query/load id: "
<< print_id(query_ctx->query_id())

View File

@ -30,8 +30,8 @@
#include "olap/memtable_memory_limiter.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/task_group/task_group.h"
#include "runtime/thread_context.h"
#include "runtime/workload_group/workload_group.h"
#include "service/backend_options.h"
#include "util/mem_info.h"
#include "util/perf_counters.h"
@ -396,8 +396,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem,
}
int64_t MemTrackerLimiter::tg_free_top_memory_query(
int64_t min_free_mem, Type type,
std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_groups,
int64_t min_free_mem, Type type, std::vector<WgTrackerLimiterGroup>& tracker_groups,
const std::function<std::string(int64_t, const std::string&)>& cancel_msg,
RuntimeProfile* profile, GCType gctype) {
return free_top_memory_query(min_free_mem, type, tracker_groups, cancel_msg, profile, gctype);
@ -531,8 +530,7 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
}
int64_t MemTrackerLimiter::tg_free_top_overcommit_query(
int64_t min_free_mem, Type type,
std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_groups,
int64_t min_free_mem, Type type, std::vector<WgTrackerLimiterGroup>& tracker_groups,
const std::function<std::string(int64_t, const std::string&)>& cancel_msg,
RuntimeProfile* profile, GCType gctype) {
return free_top_overcommit_query(min_free_mem, type, tracker_groups, cancel_msg, profile,

View File

@ -45,11 +45,9 @@ class RuntimeProfile;
constexpr auto MEM_TRACKER_GROUP_NUM = 1000;
namespace taskgroup {
struct TgTrackerLimiterGroup;
class TaskGroup;
using TaskGroupPtr = std::shared_ptr<TaskGroup>;
} // namespace taskgroup
struct WgTrackerLimiterGroup;
class WorkloadGroup;
using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>;
// Track and limit the memory usage of process and query.
// Contains an limit, arranged into a tree structure.
@ -195,8 +193,7 @@ public:
RuntimeProfile* profile, GCType gctype);
static int64_t tg_free_top_memory_query(
int64_t min_free_mem, Type type,
std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_groups,
int64_t min_free_mem, Type type, std::vector<WgTrackerLimiterGroup>& tracker_groups,
const std::function<std::string(int64_t, const std::string&)>& cancel_msg,
RuntimeProfile* profile, GCType gctype);
@ -219,8 +216,7 @@ public:
RuntimeProfile* profile, GCType gctype);
static int64_t tg_free_top_overcommit_query(
int64_t min_free_mem, Type type,
std::vector<taskgroup::TgTrackerLimiterGroup>& tracker_groups,
int64_t min_free_mem, Type type, std::vector<WgTrackerLimiterGroup>& tracker_groups,
const std::function<std::string(int64_t, const std::string&)>& cancel_msg,
RuntimeProfile* profile, GCType gctype);

View File

@ -23,7 +23,7 @@
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_x/dependency.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/task_group/task_group_manager.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/mem_info.h"
namespace doris {
@ -102,10 +102,10 @@ QueryContext::~QueryContext() {
MemTracker::print_bytes(query_mem_tracker->peak_consumption()));
}
uint64_t group_id = 0;
if (_task_group) {
group_id = _task_group->id(); // before remove
_task_group->remove_mem_tracker_limiter(query_mem_tracker);
_task_group->remove_query(_query_id);
if (_workload_group) {
group_id = _workload_group->id(); // before remove
_workload_group->remove_mem_tracker_limiter(query_mem_tracker);
_workload_group->remove_query(_query_id);
}
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
@ -205,7 +205,7 @@ void QueryContext::register_cpu_statistics() {
}
doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
if (_task_group) {
if (_workload_group) {
if (_task_scheduler) {
return _task_scheduler;
}
@ -214,21 +214,21 @@ doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
}
ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() {
if (_task_group) {
if (_workload_group) {
return _non_pipe_thread_pool;
} else {
return nullptr;
}
}
Status QueryContext::set_task_group(taskgroup::TaskGroupPtr& tg) {
_task_group = tg;
// Should add query first, then the task group will not be deleted.
// see task_group_manager::delete_task_group_by_ids
RETURN_IF_ERROR(_task_group->add_query(_query_id));
_task_group->add_mem_tracker_limiter(query_mem_tracker);
_task_group->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler,
&_non_pipe_thread_pool, &_remote_scan_task_scheduler);
Status QueryContext::set_workload_group(WorkloadGroupPtr& tg) {
_workload_group = tg;
// Should add query first, then the workload group will not be deleted.
// see task_group_manager::delete_workload_group_by_ids
RETURN_IF_ERROR(_workload_group->add_query(_query_id));
_workload_group->add_mem_tracker_limiter(query_mem_tracker);
_workload_group->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler,
&_non_pipe_thread_pool, &_remote_scan_task_scheduler);
return Status::OK();
}

View File

@ -32,11 +32,11 @@
#include "runtime/query_statistics.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_predicate.h"
#include "task_group/task_group.h"
#include "util/threadpool.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/shared_hash_table_controller.h"
#include "vec/runtime/shared_scanner_controller.h"
#include "workload_group/workload_group.h"
namespace doris {
@ -163,7 +163,7 @@ public:
}
}
Status set_task_group(taskgroup::TaskGroupPtr& tg);
Status set_workload_group(WorkloadGroupPtr& tg);
int execution_timeout() const {
return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
@ -292,7 +292,7 @@ private:
std::shared_ptr<vectorized::SharedScannerController> _shared_scanner_controller;
std::unordered_map<int, vectorized::RuntimePredicate> _runtime_predicates;
taskgroup::TaskGroupPtr _task_group = nullptr;
WorkloadGroupPtr _workload_group = nullptr;
std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
const TQueryOptions _query_options;

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
#include "task_group.h"
#include "workload_group.h"
#include <fmt/format.h>
#include <gen_cpp/PaloInternalService_types.h>
@ -38,7 +38,6 @@
#include "vec/exec/scan/scanner_scheduler.h"
namespace doris {
namespace taskgroup {
const static uint64_t CPU_SHARE_DEFAULT_VALUE = 1024;
const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%";
@ -46,7 +45,7 @@ const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024;
TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
WorkloadGroup::WorkloadGroup(const WorkloadGroupInfo& tg_info)
: _id(tg_info.id),
_name(tg_info.name),
_version(tg_info.version),
@ -59,7 +58,7 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
_max_remote_scan_thread_num(tg_info.max_remote_scan_thread_num),
_min_remote_scan_thread_num(tg_info.min_remote_scan_thread_num) {}
std::string TaskGroup::debug_string() const {
std::string WorkloadGroup::debug_string() const {
std::shared_lock<std::shared_mutex> rl {_mutex};
return fmt::format(
"TG[id = {}, name = {}, cpu_share = {}, memory_limit = {}, enable_memory_overcommit = "
@ -70,7 +69,7 @@ std::string TaskGroup::debug_string() const {
_scan_thread_num, _max_remote_scan_thread_num, _min_remote_scan_thread_num);
}
void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
void WorkloadGroup::check_and_update(const WorkloadGroupInfo& tg_info) {
if (UNLIKELY(tg_info.id != _id)) {
return;
}
@ -98,7 +97,7 @@ void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
}
}
int64_t TaskGroup::memory_used() {
int64_t WorkloadGroup::memory_used() {
int64_t used_memory = 0;
for (auto& mem_tracker_group : _mem_tracker_limiter_pool) {
std::lock_guard<std::mutex> l(mem_tracker_group.group_lock);
@ -109,19 +108,19 @@ int64_t TaskGroup::memory_used() {
return used_memory;
}
void TaskGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) {
void WorkloadGroup::add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) {
auto group_num = mem_tracker_ptr->group_num();
std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock);
_mem_tracker_limiter_pool[group_num].trackers.insert(mem_tracker_ptr);
}
void TaskGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) {
void WorkloadGroup::remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr) {
auto group_num = mem_tracker_ptr->group_num();
std::lock_guard<std::mutex> l(_mem_tracker_limiter_pool[group_num].group_lock);
_mem_tracker_limiter_pool[group_num].trackers.erase(mem_tracker_ptr);
}
int64_t TaskGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile) {
int64_t WorkloadGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile) {
if (need_free_mem <= 0) {
return 0;
}
@ -202,97 +201,97 @@ int64_t TaskGroup::gc_memory(int64_t need_free_mem, RuntimeProfile* profile) {
return freed_mem;
}
Status TaskGroupInfo::parse_topic_info(const TWorkloadGroupInfo& workload_group_info,
taskgroup::TaskGroupInfo* task_group_info) {
Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo& tworkload_group_info,
WorkloadGroupInfo* workload_group_info) {
// 1 id
int tg_id = 0;
if (workload_group_info.__isset.id) {
tg_id = workload_group_info.id;
if (tworkload_group_info.__isset.id) {
tg_id = tworkload_group_info.id;
} else {
return Status::InternalError<false>("workload group id is required");
}
task_group_info->id = tg_id;
workload_group_info->id = tg_id;
// 2 name
std::string name = "INVALID_NAME";
if (workload_group_info.__isset.name) {
name = workload_group_info.name;
if (tworkload_group_info.__isset.name) {
name = tworkload_group_info.name;
}
task_group_info->name = name;
workload_group_info->name = name;
// 3 version
int version = 0;
if (workload_group_info.__isset.version) {
version = workload_group_info.version;
if (tworkload_group_info.__isset.version) {
version = tworkload_group_info.version;
} else {
return Status::InternalError<false>("workload group version is required");
}
task_group_info->version = version;
workload_group_info->version = version;
// 4 cpu_share
uint64_t cpu_share = CPU_SHARE_DEFAULT_VALUE;
if (workload_group_info.__isset.cpu_share) {
cpu_share = workload_group_info.cpu_share;
if (tworkload_group_info.__isset.cpu_share) {
cpu_share = tworkload_group_info.cpu_share;
}
task_group_info->cpu_share = cpu_share;
workload_group_info->cpu_share = cpu_share;
// 5 cpu hard limit
int cpu_hard_limit = CPU_HARD_LIMIT_DEFAULT_VALUE;
if (workload_group_info.__isset.cpu_hard_limit) {
cpu_hard_limit = workload_group_info.cpu_hard_limit;
if (tworkload_group_info.__isset.cpu_hard_limit) {
cpu_hard_limit = tworkload_group_info.cpu_hard_limit;
}
task_group_info->cpu_hard_limit = cpu_hard_limit;
workload_group_info->cpu_hard_limit = cpu_hard_limit;
// 6 mem_limit
std::string mem_limit_str = MEMORY_LIMIT_DEFAULT_VALUE;
if (workload_group_info.__isset.mem_limit) {
mem_limit_str = workload_group_info.mem_limit;
if (tworkload_group_info.__isset.mem_limit) {
mem_limit_str = tworkload_group_info.mem_limit;
}
bool is_percent = true;
int64_t mem_limit =
ParseUtil::parse_mem_spec(mem_limit_str, -1, MemInfo::mem_limit(), &is_percent);
task_group_info->memory_limit = mem_limit;
workload_group_info->memory_limit = mem_limit;
// 7 mem overcommit
bool enable_memory_overcommit = ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE;
if (workload_group_info.__isset.enable_memory_overcommit) {
enable_memory_overcommit = workload_group_info.enable_memory_overcommit;
if (tworkload_group_info.__isset.enable_memory_overcommit) {
enable_memory_overcommit = tworkload_group_info.enable_memory_overcommit;
}
task_group_info->enable_memory_overcommit = enable_memory_overcommit;
workload_group_info->enable_memory_overcommit = enable_memory_overcommit;
// 8 cpu soft limit or hard limit
bool enable_cpu_hard_limit = false;
if (workload_group_info.__isset.enable_cpu_hard_limit) {
enable_cpu_hard_limit = workload_group_info.enable_cpu_hard_limit;
if (tworkload_group_info.__isset.enable_cpu_hard_limit) {
enable_cpu_hard_limit = tworkload_group_info.enable_cpu_hard_limit;
}
task_group_info->enable_cpu_hard_limit = enable_cpu_hard_limit;
workload_group_info->enable_cpu_hard_limit = enable_cpu_hard_limit;
// 9 scan thread num
task_group_info->scan_thread_num = config::doris_scanner_thread_pool_thread_num;
if (workload_group_info.__isset.scan_thread_num && workload_group_info.scan_thread_num > 0) {
task_group_info->scan_thread_num = workload_group_info.scan_thread_num;
workload_group_info->scan_thread_num = config::doris_scanner_thread_pool_thread_num;
if (tworkload_group_info.__isset.scan_thread_num && tworkload_group_info.scan_thread_num > 0) {
workload_group_info->scan_thread_num = tworkload_group_info.scan_thread_num;
}
// 10 max remote scan thread num
task_group_info->max_remote_scan_thread_num = config::doris_scanner_thread_pool_thread_num;
if (workload_group_info.__isset.max_remote_scan_thread_num &&
workload_group_info.max_remote_scan_thread_num > 0) {
task_group_info->max_remote_scan_thread_num =
workload_group_info.max_remote_scan_thread_num;
workload_group_info->max_remote_scan_thread_num = config::doris_scanner_thread_pool_thread_num;
if (tworkload_group_info.__isset.max_remote_scan_thread_num &&
tworkload_group_info.max_remote_scan_thread_num > 0) {
workload_group_info->max_remote_scan_thread_num =
tworkload_group_info.max_remote_scan_thread_num;
}
// 11 min remote scan thread num
task_group_info->min_remote_scan_thread_num = config::doris_scanner_thread_pool_thread_num;
if (workload_group_info.__isset.min_remote_scan_thread_num &&
workload_group_info.min_remote_scan_thread_num > 0) {
task_group_info->min_remote_scan_thread_num =
workload_group_info.min_remote_scan_thread_num;
workload_group_info->min_remote_scan_thread_num = config::doris_scanner_thread_pool_thread_num;
if (tworkload_group_info.__isset.min_remote_scan_thread_num &&
tworkload_group_info.min_remote_scan_thread_num > 0) {
workload_group_info->min_remote_scan_thread_num =
tworkload_group_info.min_remote_scan_thread_num;
}
return Status::OK();
}
void TaskGroup::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env) {
void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env) {
uint64_t tg_id = tg_info->id;
std::string tg_name = tg_info->name;
int cpu_hard_limit = tg_info->cpu_hard_limit;
@ -407,10 +406,10 @@ void TaskGroup::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv
}
}
void TaskGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched,
vectorized::SimplifiedScanScheduler** scan_sched,
ThreadPool** non_pipe_thread_pool,
vectorized::SimplifiedScanScheduler** remote_scan_sched) {
void WorkloadGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched,
vectorized::SimplifiedScanScheduler** scan_sched,
ThreadPool** non_pipe_thread_pool,
vectorized::SimplifiedScanScheduler** remote_scan_sched) {
std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
*exec_sched = _task_sched.get();
*scan_sched = _scan_task_sched.get();
@ -418,7 +417,7 @@ void TaskGroup::get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched,
*non_pipe_thread_pool = _non_pipe_thread_pool.get();
}
void TaskGroup::try_stop_schedulers() {
void WorkloadGroup::try_stop_schedulers() {
std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
if (_task_sched) {
_task_sched->stop();
@ -435,5 +434,4 @@ void TaskGroup::try_stop_schedulers() {
}
}
} // namespace taskgroup
} // namespace doris

View File

@ -49,18 +49,16 @@ class PipelineTask;
class TaskScheduler;
} // namespace pipeline
namespace taskgroup {
class TaskGroup;
struct TaskGroupInfo;
struct TgTrackerLimiterGroup {
class WorkloadGroup;
struct WorkloadGroupInfo;
struct WgTrackerLimiterGroup {
std::unordered_set<std::shared_ptr<MemTrackerLimiter>> trackers;
std::mutex group_lock;
};
class TaskGroup : public std::enable_shared_from_this<TaskGroup> {
class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
public:
explicit TaskGroup(const TaskGroupInfo& tg_info);
explicit WorkloadGroup(const WorkloadGroupInfo& tg_info);
int64_t version() const { return _version; }
@ -86,7 +84,7 @@ public:
std::string debug_string() const;
void check_and_update(const TaskGroupInfo& tg_info);
void check_and_update(const WorkloadGroupInfo& tg_info);
void add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> mem_tracker_ptr);
@ -102,7 +100,7 @@ public:
Status add_query(TUniqueId query_id) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
if (_is_shutdown) {
// If the task group is set shutdown, then should not run any more,
// If the workload group is set shutdown, then should not run any more,
// because the scheduler pool and other pointer may be released.
return Status::InternalError(
"Failed add query to workload group, the workload group is shutdown. host: {}",
@ -129,7 +127,7 @@ public:
int64_t gc_memory(int64_t need_free_mem, RuntimeProfile* profile);
void upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env);
void upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* exec_env);
void get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched,
vectorized::SimplifiedScanScheduler** scan_sched,
@ -146,13 +144,13 @@ private:
int64_t _memory_limit; // bytes
bool _enable_memory_overcommit;
std::atomic<uint64_t> _cpu_share;
std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
std::vector<WgTrackerLimiterGroup> _mem_tracker_limiter_pool;
std::atomic<int> _cpu_hard_limit;
std::atomic<int> _scan_thread_num;
std::atomic<int> _max_remote_scan_thread_num;
std::atomic<int> _min_remote_scan_thread_num;
// means task group is mark dropped
// means workload group is mark dropped
// new query can not submit
// waiting running query to be cancelled or finish
bool _is_shutdown = false;
@ -166,9 +164,9 @@ private:
std::unique_ptr<ThreadPool> _non_pipe_thread_pool = nullptr;
};
using TaskGroupPtr = std::shared_ptr<TaskGroup>;
using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>;
struct TaskGroupInfo {
struct WorkloadGroupInfo {
uint64_t id;
std::string name;
uint64_t cpu_share;
@ -184,9 +182,8 @@ struct TaskGroupInfo {
uint64_t cgroup_cpu_shares = 0;
int cgroup_cpu_hard_limit = 0;
static Status parse_topic_info(const TWorkloadGroupInfo& topic_info,
taskgroup::TaskGroupInfo* task_group_info);
static Status parse_topic_info(const TWorkloadGroupInfo& tworkload_group_info,
WorkloadGroupInfo* workload_group_info);
};
} // namespace taskgroup
} // namespace doris

View File

@ -15,75 +15,76 @@
// specific language governing permissions and limitations
// under the License.
#include "task_group_manager.h"
#include "workload_group_manager.h"
#include <memory>
#include <mutex>
#include "pipeline/task_scheduler.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/task_group/task_group.h"
#include "runtime/workload_group/workload_group.h"
#include "util/threadpool.h"
#include "util/time.h"
#include "vec/exec/scan/scanner_scheduler.h"
namespace doris::taskgroup {
namespace doris {
TaskGroupPtr TaskGroupManager::get_or_create_task_group(const TaskGroupInfo& task_group_info) {
WorkloadGroupPtr WorkloadGroupMgr::get_or_create_workload_group(
const WorkloadGroupInfo& workload_group_info) {
{
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
if (LIKELY(_task_groups.count(task_group_info.id))) {
auto task_group = _task_groups[task_group_info.id];
task_group->check_and_update(task_group_info);
return task_group;
if (LIKELY(_workload_groups.count(workload_group_info.id))) {
auto workload_group = _workload_groups[workload_group_info.id];
workload_group->check_and_update(workload_group_info);
return workload_group;
}
}
auto new_task_group = std::make_shared<TaskGroup>(task_group_info);
auto new_task_group = std::make_shared<WorkloadGroup>(workload_group_info);
std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
if (_task_groups.count(task_group_info.id)) {
auto task_group = _task_groups[task_group_info.id];
task_group->check_and_update(task_group_info);
return task_group;
if (_workload_groups.count(workload_group_info.id)) {
auto workload_group = _workload_groups[workload_group_info.id];
workload_group->check_and_update(workload_group_info);
return workload_group;
}
_task_groups[task_group_info.id] = new_task_group;
_workload_groups[workload_group_info.id] = new_task_group;
return new_task_group;
}
void TaskGroupManager::get_related_taskgroups(
const std::function<bool(const TaskGroupPtr& ptr)>& pred,
std::vector<TaskGroupPtr>* task_groups) {
void WorkloadGroupMgr::get_related_workload_groups(
const std::function<bool(const WorkloadGroupPtr& ptr)>& pred,
std::vector<WorkloadGroupPtr>* task_groups) {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
for (const auto& [id, task_group] : _task_groups) {
if (pred(task_group)) {
task_groups->push_back(task_group);
for (const auto& [id, workload_group] : _workload_groups) {
if (pred(workload_group)) {
task_groups->push_back(workload_group);
}
}
}
TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t tg_id) {
WorkloadGroupPtr WorkloadGroupMgr::get_task_group_by_id(uint64_t tg_id) {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
if (_task_groups.find(tg_id) != _task_groups.end()) {
return _task_groups.at(tg_id);
if (_workload_groups.find(tg_id) != _workload_groups.end()) {
return _workload_groups.at(tg_id);
}
return nullptr;
}
void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
void WorkloadGroupMgr::delete_workload_group_by_ids(std::set<uint64_t> used_wg_id) {
int64_t begin_time = MonotonicMillis();
// 1 get delete group without running queries
std::vector<TaskGroupPtr> deleted_task_groups;
std::vector<WorkloadGroupPtr> deleted_task_groups;
{
std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
for (auto iter = _task_groups.begin(); iter != _task_groups.end(); iter++) {
for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) {
uint64_t tg_id = iter->first;
auto task_group_ptr = iter->second;
auto workload_group_ptr = iter->second;
if (used_wg_id.find(tg_id) == used_wg_id.end()) {
task_group_ptr->shutdown();
// only when no query running in task group, its resource can be released in BE
if (task_group_ptr->query_num() == 0) {
workload_group_ptr->shutdown();
// only when no query running in workload group, its resource can be released in BE
if (workload_group_ptr->query_num() == 0) {
LOG(INFO) << "There is no query in wg " << tg_id << ", delete it.";
deleted_task_groups.push_back(task_group_ptr);
deleted_task_groups.push_back(workload_group_ptr);
}
}
}
@ -100,7 +101,7 @@ void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
{
std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
for (auto& tg : deleted_task_groups) {
_task_groups.erase(tg->id());
_workload_groups.erase(tg->id());
}
}
@ -119,7 +120,7 @@ void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
if (ret.ok()) {
_is_init_succ = true;
} else {
LOG(INFO) << "init task group mgr cpu ctl failed, " << ret.to_string();
LOG(INFO) << "init workload group mgr cpu ctl failed, " << ret.to_string();
}
}
if (_is_init_succ) {
@ -130,14 +131,14 @@ void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
}
}
int64_t time_cost_ms = MonotonicMillis() - begin_time;
LOG(INFO) << "finish clear unused task group, time cost: " << time_cost_ms
LOG(INFO) << "finish clear unused workload group, time cost: " << time_cost_ms
<< "ms, deleted group size:" << deleted_task_groups.size();
}
void TaskGroupManager::stop() {
for (auto iter = _task_groups.begin(); iter != _task_groups.end(); iter++) {
void WorkloadGroupMgr::stop() {
for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); iter++) {
iter->second->try_stop_schedulers();
}
}
} // namespace doris::taskgroup
} // namespace doris

View File

@ -21,7 +21,7 @@
#include <shared_mutex>
#include <unordered_map>
#include "task_group.h"
#include "workload_group.h"
namespace doris {
@ -32,21 +32,19 @@ class TaskScheduler;
class MultiCoreTaskQueue;
} // namespace pipeline
namespace taskgroup {
class TaskGroupManager {
class WorkloadGroupMgr {
public:
TaskGroupManager() = default;
~TaskGroupManager() = default;
WorkloadGroupMgr() = default;
~WorkloadGroupMgr() = default;
TaskGroupPtr get_or_create_task_group(const TaskGroupInfo& task_group_info);
WorkloadGroupPtr get_or_create_workload_group(const WorkloadGroupInfo& workload_group_info);
void get_related_taskgroups(const std::function<bool(const TaskGroupPtr& ptr)>& pred,
std::vector<TaskGroupPtr>* task_groups);
void get_related_workload_groups(const std::function<bool(const WorkloadGroupPtr& ptr)>& pred,
std::vector<WorkloadGroupPtr>* task_groups);
void delete_task_group_by_ids(std::set<uint64_t> id_set);
void delete_workload_group_by_ids(std::set<uint64_t> id_set);
TaskGroupPtr get_task_group_by_id(uint64_t tg_id);
WorkloadGroupPtr get_task_group_by_id(uint64_t tg_id);
void stop();
@ -58,12 +56,11 @@ public:
private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;
std::unordered_map<uint64_t, WorkloadGroupPtr> _workload_groups;
std::shared_mutex _init_cg_ctl_lock;
std::unique_ptr<CgroupCpuCtl> _cg_cpu_ctl;
bool _is_init_succ = false;
};
} // namespace taskgroup
} // namespace doris

View File

@ -42,8 +42,8 @@
#include "runtime/exec_env.h"
#include "runtime/memory/cache_manager.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/task_group/task_group.h"
#include "runtime/task_group/task_group_manager.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/cgroup_util.h"
#include "util/defer_op.h"
#include "util/parse_util.h"
@ -241,23 +241,24 @@ bool MemInfo::process_full_gc() {
int64_t MemInfo::tg_not_enable_overcommit_group_gc() {
MonotonicStopWatch watch;
watch.start();
std::vector<taskgroup::TaskGroupPtr> task_groups;
std::vector<WorkloadGroupPtr> task_groups;
std::unique_ptr<RuntimeProfile> tg_profile = std::make_unique<RuntimeProfile>("WorkloadGroup");
int64_t total_free_memory = 0;
ExecEnv::GetInstance()->task_group_manager()->get_related_taskgroups(
[](const taskgroup::TaskGroupPtr& task_group) {
return task_group->is_mem_limit_valid() && !task_group->enable_memory_overcommit();
ExecEnv::GetInstance()->workload_group_mgr()->get_related_workload_groups(
[](const WorkloadGroupPtr& workload_group) {
return workload_group->is_mem_limit_valid() &&
!workload_group->enable_memory_overcommit();
},
&task_groups);
if (task_groups.empty()) {
return 0;
}
std::vector<taskgroup::TaskGroupPtr> task_groups_overcommit;
for (const auto& task_group : task_groups) {
if (task_group->memory_used() > task_group->memory_limit()) {
task_groups_overcommit.push_back(task_group);
std::vector<WorkloadGroupPtr> task_groups_overcommit;
for (const auto& workload_group : task_groups) {
if (workload_group->memory_used() > workload_group->memory_limit()) {
task_groups_overcommit.push_back(workload_group);
}
}
if (task_groups_overcommit.empty()) {
@ -283,10 +284,10 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() {
}
}};
for (const auto& task_group : task_groups_overcommit) {
auto used = task_group->memory_used();
for (const auto& workload_group : task_groups_overcommit) {
auto used = workload_group->memory_used();
total_free_memory +=
task_group->gc_memory(used - task_group->memory_limit(), tg_profile.get());
workload_group->gc_memory(used - workload_group->memory_limit(), tg_profile.get());
}
return total_free_memory;
}
@ -295,10 +296,11 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory,
RuntimeProfile* profile) {
MonotonicStopWatch watch;
watch.start();
std::vector<taskgroup::TaskGroupPtr> task_groups;
ExecEnv::GetInstance()->task_group_manager()->get_related_taskgroups(
[](const taskgroup::TaskGroupPtr& task_group) {
return task_group->is_mem_limit_valid() && task_group->enable_memory_overcommit();
std::vector<WorkloadGroupPtr> task_groups;
ExecEnv::GetInstance()->workload_group_mgr()->get_related_workload_groups(
[](const WorkloadGroupPtr& workload_group) {
return workload_group->is_mem_limit_valid() &&
workload_group->enable_memory_overcommit();
},
&task_groups);
if (task_groups.empty()) {
@ -308,9 +310,9 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory,
int64_t total_exceeded_memory = 0;
std::vector<int64_t> used_memorys;
std::vector<int64_t> exceeded_memorys;
for (const auto& task_group : task_groups) {
int64_t used_memory = task_group->memory_used();
int64_t exceeded = used_memory - task_group->memory_limit();
for (const auto& workload_group : task_groups) {
int64_t used_memory = workload_group->memory_used();
int64_t exceeded = used_memory - workload_group->memory_limit();
int64_t exceeded_memory = exceeded > 0 ? exceeded : 0;
total_exceeded_memory += exceeded_memory;
used_memorys.emplace_back(used_memory);
@ -356,8 +358,8 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory,
gc_all_exceeded ? exceeded_memorys[i]
: static_cast<double>(exceeded_memorys[i]) / total_exceeded_memory *
request_free_memory /* exceeded memory as a weight */;
auto task_group = task_groups[i];
total_free_memory += task_group->gc_memory(tg_need_free_memory, profile);
auto workload_group = task_groups[i];
total_free_memory += workload_group->gc_memory(tg_need_free_memory, profile);
}
return total_free_memory;
}

View File

@ -39,16 +39,13 @@ namespace doris {
class ThreadPoolToken;
class RuntimeState;
class TupleDescriptor;
class WorkloadGroup;
namespace pipeline {
class ScanLocalStateBase;
class Dependency;
} // namespace pipeline
namespace taskgroup {
class TaskGroup;
} // namespace taskgroup
namespace vectorized {
class VScanner;