diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index d846d8940d..043da50d80 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -174,14 +174,26 @@ public: std::string debug_string() const; - uint32_t total_schedule_time() const { return _schedule_time; } - taskgroup::TaskGroup* get_task_group() const; void set_task_queue(TaskQueue* task_queue); static constexpr auto THREAD_TIME_SLICE = 100'000'000L; + // 1 used for update priority queue + // note(wb) an ugly implementation, need refactor later + // 1.1 pipeline task + void inc_runtime_ns(uint64_t delta_time) { this->_runtime += delta_time; } + uint64_t get_runtime_ns() const { return this->_runtime; } + + // 1.2 priority queue's queue level + void update_queue_level(int queue_level) { this->_queue_level = queue_level; } + int get_queue_level() const { return this->_queue_level; } + + // 1.3 priority queue's core id + void set_core_id(int core_id) { this->_core_id = core_id; } + int get_core_id() const { return this->_core_id; } + private: Status _open(); void _init_profile(); @@ -206,6 +218,17 @@ private: PipelineFragmentContext* _fragment_context; TaskQueue* _task_queue = nullptr; + // used for priority queue + // it may be visited by different thread but there is no race condition + // so no need to add lock + uint64_t _runtime = 0; + // it's visited in one thread, so no need to thread synchronization + // 1 get task, (set _queue_level/_core_id) + // 2 exe task + // 3 update task statistics(update _queue_level/_core_id) + int _queue_level = 0; + int _core_id = 0; + RuntimeProfile* _parent_profile; std::unique_ptr _task_profile; RuntimeProfile::Counter* _task_cpu_timer; diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 23a601d5c0..dac6ca69ab 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -24,7 +24,7 @@ namespace pipeline { TaskQueue::~TaskQueue() = default; -PipelineTask* SubWorkTaskQueue::try_take(bool is_steal) { +PipelineTask* SubTaskQueue::try_take(bool is_steal) { if (_queue.empty()) { return nullptr; } @@ -32,81 +32,69 @@ PipelineTask* SubWorkTaskQueue::try_take(bool is_steal) { if (!task->can_steal() && is_steal) { return nullptr; } - ++_schedule_time; _queue.pop(); return task; } -//////////////////// WorkTaskQueue //////////////////// +//////////////////// PriorityTaskQueue //////////////////// -NormalWorkTaskQueue::NormalWorkTaskQueue() : _closed(false) { +PriorityTaskQueue::PriorityTaskQueue() : _closed(false) { double factor = 1; for (int i = 0; i < SUB_QUEUE_LEVEL; ++i) { - _sub_queues[i].set_factor_for_normal(factor); + _sub_queues[i].set_level_factor(factor); factor *= LEVEL_QUEUE_TIME_FACTOR; } - - int i = 0; - _task_schedule_limit[i] = BASE_LIMIT * (i + 1); - for (i = 1; i < SUB_QUEUE_LEVEL - 1; ++i) { - _task_schedule_limit[i] = _task_schedule_limit[i - 1] + BASE_LIMIT * (i + 1); - } } -void NormalWorkTaskQueue::close() { +void PriorityTaskQueue::close() { std::unique_lock lock(_work_size_mutex); _closed = true; _wait_task.notify_all(); } -PipelineTask* NormalWorkTaskQueue::try_take_unprotected(bool is_steal) { +PipelineTask* PriorityTaskQueue::try_take_unprotected(bool is_steal) { if (_total_task_size == 0 || _closed) { return nullptr; } - double normal_schedule_times[SUB_QUEUE_LEVEL]; - double min_schedule_time = 0; - int idx = -1; + + double min_vruntime = 0; + int level = -1; for (int i = 0; i < SUB_QUEUE_LEVEL; ++i) { - normal_schedule_times[i] = _sub_queues[i].schedule_time_after_normal(); + double cur_queue_vruntime = _sub_queues[i].get_vruntime(); if (!_sub_queues[i].empty()) { - if (idx == -1 || normal_schedule_times[i] < min_schedule_time) { - idx = i; - min_schedule_time = normal_schedule_times[i]; + if (level == -1 || cur_queue_vruntime < min_vruntime) { + level = i; + min_vruntime = cur_queue_vruntime; } } } - DCHECK(idx != -1); - // update empty queue's schedule time, to avoid too high priority - for (int i = 0; i < SUB_QUEUE_LEVEL; ++i) { - if (_sub_queues[i].empty() && normal_schedule_times[i] < min_schedule_time) { - _sub_queues[i]._schedule_time = min_schedule_time / _sub_queues[i]._factor_for_normal; - } - } + DCHECK(level != -1); + _queue_level_min_vruntime = min_vruntime; - auto task = _sub_queues[idx].try_take(is_steal); + auto task = _sub_queues[level].try_take(is_steal); if (task) { + task->update_queue_level(level); _total_task_size--; } return task; } -int NormalWorkTaskQueue::_compute_level(PipelineTask* task) { - uint32_t schedule_time = task->total_schedule_time(); +int PriorityTaskQueue::_compute_level(uint64_t runtime) { for (int i = 0; i < SUB_QUEUE_LEVEL - 1; ++i) { - if (schedule_time <= _task_schedule_limit[i]) { + if (runtime <= _queue_level_limit[i]) { return i; } } return SUB_QUEUE_LEVEL - 1; } -PipelineTask* NormalWorkTaskQueue::try_take(bool is_steal) { +PipelineTask* PriorityTaskQueue::try_take(bool is_steal) { // TODO other efficient lock? e.g. if get lock fail, return null_ptr std::unique_lock lock(_work_size_mutex); return try_take_unprotected(is_steal); } -PipelineTask* NormalWorkTaskQueue::take(uint32_t timeout_ms) { +PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) { std::unique_lock lock(_work_size_mutex); auto task = try_take_unprotected(false); if (task) { @@ -121,44 +109,53 @@ PipelineTask* NormalWorkTaskQueue::take(uint32_t timeout_ms) { } } -Status NormalWorkTaskQueue::push(PipelineTask* task) { +Status PriorityTaskQueue::push(PipelineTask* task) { if (_closed) { return Status::InternalError("WorkTaskQueue closed"); } - auto level = _compute_level(task); + auto level = _compute_level(task->get_runtime_ns()); std::unique_lock lock(_work_size_mutex); + + // update empty queue's runtime, to avoid too high priority + if (_sub_queues[level].empty() && + _queue_level_min_vruntime > _sub_queues[level].get_vruntime()) { + _sub_queues[level].adjust_runtime(_queue_level_min_vruntime); + } + _sub_queues[level].push_back(task); _total_task_size++; _wait_task.notify_one(); return Status::OK(); } -NormalTaskQueue::~NormalTaskQueue() = default; +MultiCoreTaskQueue::~MultiCoreTaskQueue() = default; -NormalTaskQueue::NormalTaskQueue(size_t core_size) : TaskQueue(core_size), _closed(false) { - _async_queue.reset(new NormalWorkTaskQueue[core_size]); +MultiCoreTaskQueue::MultiCoreTaskQueue(size_t core_size) : TaskQueue(core_size), _closed(false) { + _prio_task_queue_list.reset(new PriorityTaskQueue[core_size]); } -void NormalTaskQueue::close() { +void MultiCoreTaskQueue::close() { _closed = true; for (int i = 0; i < _core_size; ++i) { - _async_queue[i].close(); + _prio_task_queue_list[i].close(); } } -PipelineTask* NormalTaskQueue::take(size_t core_id) { +PipelineTask* MultiCoreTaskQueue::take(size_t core_id) { PipelineTask* task = nullptr; while (!_closed) { - task = _async_queue[core_id].try_take(false); + task = _prio_task_queue_list[core_id].try_take(false); if (task) { + task->set_core_id(core_id); break; } task = _steal_take(core_id); if (task) { break; } - task = _async_queue[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); + task = _prio_task_queue_list[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); if (task) { + task->set_core_id(core_id); break; } } @@ -168,7 +165,7 @@ PipelineTask* NormalTaskQueue::take(size_t core_id) { return task; } -PipelineTask* NormalTaskQueue::_steal_take(size_t core_id) { +PipelineTask* MultiCoreTaskQueue::_steal_take(size_t core_id) { DCHECK(core_id < _core_size); size_t next_id = core_id; for (size_t i = 1; i < _core_size; ++i) { @@ -177,15 +174,16 @@ PipelineTask* NormalTaskQueue::_steal_take(size_t core_id) { next_id = 0; } DCHECK(next_id < _core_size); - auto task = _async_queue[next_id].try_take(true); + auto task = _prio_task_queue_list[next_id].try_take(true); if (task) { + task->set_core_id(next_id); return task; } } return nullptr; } -Status NormalTaskQueue::push_back(PipelineTask* task) { +Status MultiCoreTaskQueue::push_back(PipelineTask* task) { int core_id = task->get_previous_core_id(); if (core_id < 0) { core_id = _next_core.fetch_add(1) % _core_size; @@ -193,10 +191,10 @@ Status NormalTaskQueue::push_back(PipelineTask* task) { return push_back(task, core_id); } -Status NormalTaskQueue::push_back(PipelineTask* task, size_t core_id) { +Status MultiCoreTaskQueue::push_back(PipelineTask* task, size_t core_id) { DCHECK(core_id < _core_size); task->put_in_runnable_queue(); - return _async_queue[core_id].push(task); + return _prio_task_queue_list[core_id].push(task); } bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()( diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index 39b57815ad..2aa4881028 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -54,34 +54,39 @@ protected: static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100; }; -class SubWorkTaskQueue { - friend class WorkTaskQueue; - friend class NormalWorkTaskQueue; +class SubTaskQueue { + friend class PriorityTaskQueue; public: void push_back(PipelineTask* task) { _queue.emplace(task); } PipelineTask* try_take(bool is_steal); - void set_factor_for_normal(double factor_for_normal) { _factor_for_normal = factor_for_normal; } + void set_level_factor(double level_factor) { _level_factor = level_factor; } - double schedule_time_after_normal() { return _schedule_time * _factor_for_normal; } + // note: + // runtime is the time consumed by the actual execution of the task + // vruntime(means virtual runtime) = runtime / _level_factor + double get_vruntime() { return _runtime / _level_factor; } + + void inc_runtime(uint64_t delta_time) { _runtime += delta_time; } + + void adjust_runtime(uint64_t vruntime) { this->_runtime = vruntime * _level_factor; } bool empty() { return _queue.empty(); } private: std::queue _queue; - // factor for normalization - double _factor_for_normal = 1; - // the value cal the queue task time consume, the WorkTaskQueue - // use it to find the min queue to take task work - std::atomic _schedule_time = 0; + // depends on LEVEL_QUEUE_TIME_FACTOR + double _level_factor = 1; + + std::atomic _runtime = 0; }; -// Each thread have private MLFQ -class NormalWorkTaskQueue { +// A Multilevel Feedback Queue +class PriorityTaskQueue { public: - explicit NormalWorkTaskQueue(); + explicit PriorityTaskQueue(); void close(); @@ -93,27 +98,35 @@ public: Status push(PipelineTask* task); + void inc_sub_queue_runtime(int level, uint64_t runtime) { + _sub_queues[level].inc_runtime(runtime); + } + private: - static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 1.5; - static constexpr size_t SUB_QUEUE_LEVEL = 5; - // 3, 6, 9, 12 - static constexpr uint32_t BASE_LIMIT = 3; - SubWorkTaskQueue _sub_queues[SUB_QUEUE_LEVEL]; - uint32_t _task_schedule_limit[SUB_QUEUE_LEVEL - 1]; + static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2; + static constexpr size_t SUB_QUEUE_LEVEL = 6; + SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL]; + // 1s, 3s, 10s, 60s, 300s + uint64_t _queue_level_limit[SUB_QUEUE_LEVEL - 1] = {1000000000, 3000000000, 10000000000, + 60000000000, 300000000000}; std::mutex _work_size_mutex; std::condition_variable _wait_task; std::atomic _total_task_size = 0; bool _closed; - int _compute_level(PipelineTask* task); + // used to adjust vruntime of a queue when it's not empty + // protected by lock _work_size_mutex + uint64_t _queue_level_min_vruntime = 0; + + int _compute_level(uint64_t real_runtime); }; // Need consider NUMA architecture -class NormalTaskQueue : public TaskQueue { +class MultiCoreTaskQueue : public TaskQueue { public: - explicit NormalTaskQueue(size_t core_size); + explicit MultiCoreTaskQueue(size_t core_size); - ~NormalTaskQueue() override; + ~MultiCoreTaskQueue() override; void close() override; @@ -126,8 +139,11 @@ public: Status push_back(PipelineTask* task, size_t core_id) override; - // TODO pipeline update NormalWorkTaskQueue by time_spent. - // void update_statistics(PipelineTask* task, int64_t time_spent) override; + void update_statistics(PipelineTask* task, int64_t time_spent) override { + task->inc_runtime_ns(time_spent); + _prio_task_queue_list[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(), + time_spent); + } void update_task_group(const taskgroup::TaskGroupInfo& task_group_info, taskgroup::TaskGroupPtr& task_group) override { @@ -137,7 +153,7 @@ public: private: PipelineTask* _steal_take(size_t core_id); - std::unique_ptr _async_queue; + std::unique_ptr _prio_task_queue_list; std::atomic _next_core = 0; std::atomic _closed; }; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 22a2aaae23..e1c7cf39a9 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -167,7 +167,7 @@ Status ExecEnv::init_pipeline_task_scheduler() { } // TODO pipeline task group combie two blocked schedulers. - auto t_queue = std::make_shared(executors_size); + auto t_queue = std::make_shared(executors_size); auto b_scheduler = std::make_shared(t_queue); _pipeline_task_scheduler = new pipeline::TaskScheduler(this, b_scheduler, t_queue); RETURN_IF_ERROR(_pipeline_task_scheduler->start());