[refactor](taskqueue) remove old task scheduler based wg (#30832)

---------

Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
yiguolei
2024-02-05 18:22:01 +08:00
committed by yiguolei
parent cffe79feba
commit 2c99c53812
15 changed files with 8 additions and 372 deletions

View File

@ -131,9 +131,6 @@ PipelineFragmentContext::PipelineFragmentContext(
_is_report_on_cancel(true),
_report_status_cb(report_status_cb),
_create_time(MonotonicNanos()) {
if (_query_ctx->get_task_group()) {
_task_group_entity = _query_ctx->get_task_group()->task_entity();
}
_fragment_watcher.start();
}

View File

@ -132,9 +132,6 @@ public:
return _query_ctx->exec_status();
}
[[nodiscard]] taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const {
return _task_group_entity;
}
void trigger_report_if_necessary();
virtual void instance_ids(std::vector<TUniqueId>& ins_ids) const {
ins_ids.resize(1);
@ -198,8 +195,6 @@ protected:
std::shared_ptr<QueryContext> _query_ctx;
taskgroup::TaskGroupPipelineTaskEntity* _task_group_entity = nullptr;
std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
MonotonicStopWatch _fragment_watcher;

View File

@ -426,8 +426,4 @@ std::string PipelineTask::debug_string() {
return fmt::to_string(debug_string_buffer);
}
taskgroup::TaskGroupPipelineTaskEntity* PipelineTask::get_task_group_entity() const {
return _fragment_context->get_task_group_entity();
}
} // namespace doris::pipeline

View File

@ -196,8 +196,6 @@ public:
virtual std::string debug_string();
taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const;
void set_task_queue(TaskQueue* task_queue);
TaskQueue* get_task_queue() { return _task_queue; }

View File

@ -204,175 +204,5 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task, size_t core_id) {
return _prio_task_queue_list[core_id].push(task);
}
bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()(
const taskgroup::TGPTEntityPtr& lhs_ptr, const taskgroup::TGPTEntityPtr& rhs_ptr) const {
auto lhs_val = lhs_ptr->vruntime_ns();
auto rhs_val = rhs_ptr->vruntime_ns();
if (lhs_val != rhs_val) {
return lhs_val < rhs_val;
} else {
auto l_share = lhs_ptr->cpu_share();
auto r_share = rhs_ptr->cpu_share();
if (l_share != r_share) {
return l_share < r_share;
} else {
return lhs_ptr->task_group_id() < rhs_ptr->task_group_id();
}
}
}
TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size)
: TaskQueue(core_size), _min_tg_entity(nullptr) {}
TaskGroupTaskQueue::~TaskGroupTaskQueue() = default;
void TaskGroupTaskQueue::close() {
std::unique_lock<std::mutex> lock(_rs_mutex);
_closed = true;
_wait_task.notify_all();
}
Status TaskGroupTaskQueue::push_back(PipelineTask* task) {
return _push_back<false>(task);
}
Status TaskGroupTaskQueue::push_back(PipelineTask* task, size_t core_id) {
return _push_back<true>(task);
}
template <bool from_executor>
Status TaskGroupTaskQueue::_push_back(PipelineTask* task) {
task->put_in_runnable_queue();
auto* entity = task->get_task_group_entity();
std::unique_lock<std::mutex> lock(_rs_mutex);
entity->task_queue()->emplace(task);
if (_group_entities.find(entity) == _group_entities.end()) {
_enqueue_task_group<from_executor>(entity);
}
_wait_task.notify_one();
return Status::OK();
}
// TODO pipeline support steal
PipelineTask* TaskGroupTaskQueue::take(size_t core_id) {
std::unique_lock<std::mutex> lock(_rs_mutex);
taskgroup::TGPTEntityPtr entity = nullptr;
while (entity == nullptr) {
if (_closed) {
return nullptr;
}
if (_group_entities.empty()) {
_wait_task.wait(lock);
} else {
entity = _next_tg_entity();
if (!entity) {
_wait_task.wait_for(lock, std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS));
}
}
}
DCHECK(entity->task_size() > 0);
if (entity->task_size() == 1) {
_dequeue_task_group(entity);
}
auto task = entity->task_queue()->front();
if (task) {
entity->task_queue()->pop();
task->pop_out_runnable_queue();
}
return task;
}
template <bool from_worker>
void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGPTEntityPtr tg_entity) {
_total_cpu_share += tg_entity->cpu_share();
if constexpr (!from_worker) {
/**
* If a task group entity leaves task queue for a long time, its v runtime will be very
* small. This can cause it to preempt too many execution time. So, in order to avoid this
* situation, it is necessary to adjust the task group's v runtime.
* */
auto old_v_ns = tg_entity->vruntime_ns();
auto* min_entity = _min_tg_entity.load();
if (min_entity) {
auto min_tg_v = min_entity->vruntime_ns();
auto ideal_r = _ideal_runtime_ns(tg_entity) / 2;
uint64_t new_vruntime_ns = min_tg_v > ideal_r ? min_tg_v - ideal_r : min_tg_v;
if (new_vruntime_ns > old_v_ns) {
tg_entity->adjust_vruntime_ns(new_vruntime_ns);
}
} else if (old_v_ns < _min_tg_v_runtime_ns) {
tg_entity->adjust_vruntime_ns(_min_tg_v_runtime_ns);
}
}
_group_entities.emplace(tg_entity);
VLOG_DEBUG << "enqueue tg " << tg_entity->debug_string()
<< ", group entity size: " << _group_entities.size();
_update_min_tg();
}
void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGPTEntityPtr tg_entity) {
_total_cpu_share -= tg_entity->cpu_share();
_group_entities.erase(tg_entity);
VLOG_DEBUG << "dequeue tg " << tg_entity->debug_string()
<< ", group entity size: " << _group_entities.size();
_update_min_tg();
}
void TaskGroupTaskQueue::_update_min_tg() {
auto* min_entity = _next_tg_entity();
_min_tg_entity = min_entity;
if (min_entity) {
auto min_v_runtime = min_entity->vruntime_ns();
if (min_v_runtime > _min_tg_v_runtime_ns) {
_min_tg_v_runtime_ns = min_v_runtime;
}
}
}
// like sched_fair.c calc_delta_fair, THREAD_TIME_SLICE maybe a dynamic value.
uint64_t TaskGroupTaskQueue::_ideal_runtime_ns(taskgroup::TGPTEntityPtr tg_entity) const {
return PipelineTask::THREAD_TIME_SLICE * _core_size * tg_entity->cpu_share() / _total_cpu_share;
}
taskgroup::TGPTEntityPtr TaskGroupTaskQueue::_next_tg_entity() {
taskgroup::TGPTEntityPtr res = nullptr;
for (auto* entity : _group_entities) {
res = entity;
break;
}
return res;
}
void TaskGroupTaskQueue::update_statistics(PipelineTask* task, int64_t time_spent) {
std::unique_lock<std::mutex> lock(_rs_mutex);
auto* entity = task->get_task_group_entity();
auto find_entity = _group_entities.find(entity);
bool is_in_queue = find_entity != _group_entities.end();
VLOG_DEBUG << "update_statistics " << entity->debug_string() << ", in queue:" << is_in_queue;
if (is_in_queue) {
_group_entities.erase(entity);
}
entity->incr_runtime_ns(time_spent);
if (is_in_queue) {
_group_entities.emplace(entity);
_update_min_tg();
}
}
void TaskGroupTaskQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
taskgroup::TGPTEntityPtr entity) {
std::unique_lock<std::mutex> lock(_rs_mutex);
bool is_in_queue = _group_entities.find(entity) != _group_entities.end();
if (is_in_queue) {
_group_entities.erase(entity);
_total_cpu_share -= entity->cpu_share();
}
entity->check_and_update_cpu_share(task_group_info);
if (is_in_queue) {
_group_entities.emplace(entity);
_total_cpu_share += entity->cpu_share();
}
}
} // namespace pipeline
} // namespace doris

View File

@ -52,9 +52,6 @@ public:
virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}
virtual void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
taskgroup::TGPTEntityPtr entity) = 0;
int cores() const { return _core_size; }
protected:
@ -154,11 +151,6 @@ public:
time_spent);
}
void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
taskgroup::TGPTEntityPtr entity) override {
LOG(FATAL) << "update_tg_cpu_share not implemented";
}
private:
PipelineTask* _steal_take(size_t core_id);
@ -167,49 +159,5 @@ private:
std::atomic<bool> _closed;
};
class TaskGroupTaskQueue : public TaskQueue {
public:
explicit TaskGroupTaskQueue(size_t);
~TaskGroupTaskQueue() override;
void close() override;
PipelineTask* take(size_t core_id) override;
// from TaskScheduler or BlockedTaskScheduler
Status push_back(PipelineTask* task) override;
// from worker
Status push_back(PipelineTask* task, size_t core_id) override;
void update_statistics(PipelineTask* task, int64_t time_spent) override;
void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
taskgroup::TGPTEntityPtr entity) override;
private:
template <bool from_executor>
Status _push_back(PipelineTask* task);
template <bool from_worker>
void _enqueue_task_group(taskgroup::TGPTEntityPtr);
void _dequeue_task_group(taskgroup::TGPTEntityPtr);
taskgroup::TGPTEntityPtr _next_tg_entity();
uint64_t _ideal_runtime_ns(taskgroup::TGPTEntityPtr tg_entity) const;
void _update_min_tg();
// Like cfs rb tree in sched_entity
struct TaskGroupSchedEntityComparator {
bool operator()(const taskgroup::TGPTEntityPtr&, const taskgroup::TGPTEntityPtr&) const;
};
using ResouceGroupSet = std::set<taskgroup::TGPTEntityPtr, TaskGroupSchedEntityComparator>;
ResouceGroupSet _group_entities;
std::condition_variable _wait_task;
std::mutex _rs_mutex;
bool _closed = false;
int _total_cpu_share = 0;
std::atomic<taskgroup::TGPTEntityPtr> _min_tg_entity = nullptr;
uint64_t _min_tg_v_runtime_ns = 0;
};
} // namespace pipeline
} // namespace doris

View File

@ -152,7 +152,6 @@ public:
ClientCache<TPaloBrokerServiceClient>* broker_client_cache() { return _broker_client_cache; }
pipeline::TaskScheduler* pipeline_task_scheduler() { return _without_group_task_scheduler; }
pipeline::TaskScheduler* pipeline_task_group_scheduler() { return _with_group_task_scheduler; }
taskgroup::TaskGroupManager* task_group_manager() { return _task_group_manager; }
WorkloadSchedPolicyMgr* workload_sched_policy_mgr() { return _workload_sched_mgr; }
RuntimeQueryStatiticsMgr* runtime_query_statistics_mgr() {
@ -327,7 +326,6 @@ private:
FragmentMgr* _fragment_mgr = nullptr;
pipeline::TaskScheduler* _without_group_task_scheduler = nullptr;
pipeline::TaskScheduler* _with_group_task_scheduler = nullptr;
taskgroup::TaskGroupManager* _task_group_manager = nullptr;
ResultCache* _result_cache = nullptr;
@ -382,8 +380,6 @@ private:
std::shared_ptr<doris::pipeline::BlockedTaskScheduler> _global_block_scheduler;
// used for query without workload group
std::shared_ptr<doris::pipeline::BlockedTaskScheduler> _without_group_block_scheduler;
// used for query with workload group cpu soft limit
std::shared_ptr<doris::pipeline::BlockedTaskScheduler> _with_group_block_scheduler;
doris::pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr;

View File

@ -298,13 +298,6 @@ Status ExecEnv::init_pipeline_task_scheduler() {
RETURN_IF_ERROR(_without_group_task_scheduler->start());
RETURN_IF_ERROR(_without_group_block_scheduler->start());
auto tg_queue = std::make_shared<pipeline::TaskGroupTaskQueue>(executors_size);
_with_group_block_scheduler = std::make_shared<pipeline::BlockedTaskScheduler>("PipeGSchePool");
_with_group_task_scheduler = new pipeline::TaskScheduler(this, _with_group_block_scheduler,
tg_queue, "PipeGSchePool", nullptr);
RETURN_IF_ERROR(_with_group_task_scheduler->start());
RETURN_IF_ERROR(_with_group_block_scheduler->start());
_global_block_scheduler = std::make_shared<pipeline::BlockedTaskScheduler>("PipeGBlockSche");
RETURN_IF_ERROR(_global_block_scheduler->start());
_runtime_filter_timer_queue = new doris::pipeline::RuntimeFilterTimerQueue();
@ -539,8 +532,6 @@ void ExecEnv::destroy() {
// stop pipline step 1, non-cgroup execution
SAFE_SHUTDOWN(_without_group_block_scheduler.get());
SAFE_STOP(_without_group_task_scheduler);
SAFE_SHUTDOWN(_with_group_block_scheduler.get());
SAFE_STOP(_with_group_task_scheduler);
// stop pipline step 2, cgroup execution
SAFE_SHUTDOWN(_global_block_scheduler.get());
SAFE_STOP(_task_group_manager);
@ -608,7 +599,6 @@ void ExecEnv::destroy() {
SAFE_DELETE(_fragment_mgr);
SAFE_DELETE(_workload_sched_mgr);
SAFE_DELETE(_task_group_manager);
SAFE_DELETE(_with_group_task_scheduler);
SAFE_DELETE(_without_group_task_scheduler);
SAFE_DELETE(_file_cache_factory);
SAFE_DELETE(_runtime_filter_timer_queue);

View File

@ -162,9 +162,7 @@ void QueryContext::set_query_scheduler(uint64_t tg_id) {
doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
if (_task_group) {
if (!config::enable_cgroup_cpu_soft_limit) {
return _exec_env->pipeline_task_group_scheduler();
} else if (_task_scheduler) {
if (_task_scheduler) {
return _task_scheduler;
}
}

View File

@ -152,10 +152,6 @@ public:
void set_task_group(taskgroup::TaskGroupPtr& tg) { _task_group = tg; }
taskgroup::TaskGroup* get_task_group() const {
return _task_group == nullptr ? nullptr : _task_group.get();
}
int execution_timeout() const {
return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
: _query_options.query_timeout;

View File

@ -43,65 +43,6 @@ const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%";
const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
template <typename QueueType>
TaskGroupEntity<QueueType>::TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type)
: _tg(tg), _type(type), _version(tg->version()), _cpu_share(tg->cpu_share()) {
_task_queue = new QueueType();
}
template <typename QueueType>
TaskGroupEntity<QueueType>::~TaskGroupEntity() {
delete _task_queue;
}
template <typename QueueType>
QueueType* TaskGroupEntity<QueueType>::task_queue() {
return _task_queue;
}
template <typename QueueType>
void TaskGroupEntity<QueueType>::incr_runtime_ns(uint64_t runtime_ns) {
auto v_time = runtime_ns / _cpu_share;
_vruntime_ns += v_time;
}
template <typename QueueType>
void TaskGroupEntity<QueueType>::adjust_vruntime_ns(uint64_t vruntime_ns) {
VLOG_DEBUG << "adjust " << debug_string() << "vtime to " << vruntime_ns;
_vruntime_ns = vruntime_ns;
}
template <typename QueueType>
size_t TaskGroupEntity<QueueType>::task_size() const {
return _task_queue->size();
}
template <typename QueueType>
uint64_t TaskGroupEntity<QueueType>::cpu_share() const {
return _cpu_share;
}
template <typename QueueType>
uint64_t TaskGroupEntity<QueueType>::task_group_id() const {
return _tg->id();
}
template <typename QueueType>
void TaskGroupEntity<QueueType>::check_and_update_cpu_share(const TaskGroupInfo& tg_info) {
if (tg_info.version > _version) {
_cpu_share = tg_info.cpu_share;
_version = tg_info.version;
}
}
template <typename QueueType>
std::string TaskGroupEntity<QueueType>::debug_string() const {
return fmt::format("TGE[id = {}, name = {}-{}, cpu_share = {}, task size: {}, v_time:{} ns]",
_tg->id(), _tg->name(), _type, cpu_share(), task_size(), _vruntime_ns);
}
template class TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
: _id(tg_info.id),
_name(tg_info.name),
@ -109,7 +50,6 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
_memory_limit(tg_info.memory_limit),
_enable_memory_overcommit(tg_info.enable_memory_overcommit),
_cpu_share(tg_info.cpu_share),
_task_entity(this, "pipeline task entity"),
_mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM),
_cpu_hard_limit(tg_info.cpu_hard_limit) {}
@ -145,8 +85,6 @@ void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
return;
}
}
ExecEnv::GetInstance()->pipeline_task_group_scheduler()->task_queue()->update_tg_cpu_share(
tg_info, &_task_entity);
}
int64_t TaskGroup::memory_used() {

View File

@ -44,50 +44,6 @@ namespace taskgroup {
class TaskGroup;
struct TaskGroupInfo;
template <typename QueueType>
class TaskGroupEntity {
public:
explicit TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type);
~TaskGroupEntity();
uint64_t vruntime_ns() const { return _vruntime_ns; }
QueueType* task_queue();
void incr_runtime_ns(uint64_t runtime_ns);
void adjust_vruntime_ns(uint64_t vruntime_ns);
size_t task_size() const;
uint64_t cpu_share() const;
std::string debug_string() const;
uint64_t task_group_id() const;
void check_and_update_cpu_share(const TaskGroupInfo& tg_info);
private:
QueueType* _task_queue = nullptr;
uint64_t _vruntime_ns = 0;
taskgroup::TaskGroup* _tg = nullptr;
std::string _type;
// Because updating cpu share of entity requires locking the task queue(pipeline task queue or
// scan task queue) contains that entity, we kept version and cpu share in entity for
// independent updates.
int64_t _version;
uint64_t _cpu_share;
};
// TODO llj tg use PriorityTaskQueue to replace std::queue
using TaskGroupPipelineTaskEntity = TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
using TGPTEntityPtr = TaskGroupPipelineTaskEntity*;
struct TgTrackerLimiterGroup {
std::unordered_set<std::shared_ptr<MemTrackerLimiter>> trackers;
std::mutex group_lock;
@ -97,8 +53,6 @@ class TaskGroup : public std::enable_shared_from_this<TaskGroup> {
public:
explicit TaskGroup(const TaskGroupInfo& tg_info);
TaskGroupPipelineTaskEntity* task_entity() { return &_task_entity; }
int64_t version() const { return _version; }
uint64_t cpu_share() const { return _cpu_share.load(); }
@ -160,7 +114,6 @@ private:
int64_t _memory_limit; // bytes
bool _enable_memory_overcommit;
std::atomic<uint64_t> _cpu_share;
TaskGroupPipelineTaskEntity _task_entity;
std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
std::atomic<int> _cpu_hard_limit;

View File

@ -49,8 +49,9 @@ TaskGroupPtr TaskGroupManager::get_or_create_task_group(const TaskGroupInfo& tas
return new_task_group;
}
void TaskGroupManager::get_resource_groups(const std::function<bool(const TaskGroupPtr& ptr)>& pred,
std::vector<TaskGroupPtr>* task_groups) {
void TaskGroupManager::get_related_taskgroups(
const std::function<bool(const TaskGroupPtr& ptr)>& pred,
std::vector<TaskGroupPtr>* task_groups) {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
for (const auto& [id, task_group] : _task_groups) {
if (pred(task_group)) {

View File

@ -48,8 +48,8 @@ public:
TaskGroupPtr get_or_create_task_group(const TaskGroupInfo& task_group_info);
void get_resource_groups(const std::function<bool(const TaskGroupPtr& ptr)>& pred,
std::vector<TaskGroupPtr>* task_groups);
void get_related_taskgroups(const std::function<bool(const TaskGroupPtr& ptr)>& pred,
std::vector<TaskGroupPtr>* task_groups);
Status upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv* exec_env);

View File

@ -245,7 +245,7 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() {
std::unique_ptr<RuntimeProfile> tg_profile = std::make_unique<RuntimeProfile>("WorkloadGroup");
int64_t total_free_memory = 0;
ExecEnv::GetInstance()->task_group_manager()->get_resource_groups(
ExecEnv::GetInstance()->task_group_manager()->get_related_taskgroups(
[](const taskgroup::TaskGroupPtr& task_group) {
return task_group->is_mem_limit_valid() && !task_group->enable_memory_overcommit();
},
@ -301,7 +301,7 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t request_free_memory,
MonotonicStopWatch watch;
watch.start();
std::vector<taskgroup::TaskGroupPtr> task_groups;
ExecEnv::GetInstance()->task_group_manager()->get_resource_groups(
ExecEnv::GetInstance()->task_group_manager()->get_related_taskgroups(
[](const taskgroup::TaskGroupPtr& task_group) {
return task_group->is_mem_limit_valid() && task_group->enable_memory_overcommit();
},