[improvement](executor) Priority Queue support vruntime (#18635)

* 1 rename some class
2 mfqs support vruntime

* fix const

* as sugguestion

* fix const
This commit is contained in:
wangbo
2023-04-17 10:17:28 +08:00
committed by GitHub
parent 9c7a69ff09
commit ac0b382fed
4 changed files with 114 additions and 77 deletions

View File

@ -174,14 +174,26 @@ public:
std::string debug_string() const;
uint32_t total_schedule_time() const { return _schedule_time; }
taskgroup::TaskGroup* get_task_group() const;
void set_task_queue(TaskQueue* task_queue);
static constexpr auto THREAD_TIME_SLICE = 100'000'000L;
// 1 used for update priority queue
// note(wb) an ugly implementation, need refactor later
// 1.1 pipeline task
void inc_runtime_ns(uint64_t delta_time) { this->_runtime += delta_time; }
uint64_t get_runtime_ns() const { return this->_runtime; }
// 1.2 priority queue's queue level
void update_queue_level(int queue_level) { this->_queue_level = queue_level; }
int get_queue_level() const { return this->_queue_level; }
// 1.3 priority queue's core id
void set_core_id(int core_id) { this->_core_id = core_id; }
int get_core_id() const { return this->_core_id; }
private:
Status _open();
void _init_profile();
@ -206,6 +218,17 @@ private:
PipelineFragmentContext* _fragment_context;
TaskQueue* _task_queue = nullptr;
// used for priority queue
// it may be visited by different thread but there is no race condition
// so no need to add lock
uint64_t _runtime = 0;
// it's visited in one thread, so no need to thread synchronization
// 1 get task, (set _queue_level/_core_id)
// 2 exe task
// 3 update task statistics(update _queue_level/_core_id)
int _queue_level = 0;
int _core_id = 0;
RuntimeProfile* _parent_profile;
std::unique_ptr<RuntimeProfile> _task_profile;
RuntimeProfile::Counter* _task_cpu_timer;

View File

@ -24,7 +24,7 @@ namespace pipeline {
TaskQueue::~TaskQueue() = default;
PipelineTask* SubWorkTaskQueue::try_take(bool is_steal) {
PipelineTask* SubTaskQueue::try_take(bool is_steal) {
if (_queue.empty()) {
return nullptr;
}
@ -32,81 +32,69 @@ PipelineTask* SubWorkTaskQueue::try_take(bool is_steal) {
if (!task->can_steal() && is_steal) {
return nullptr;
}
++_schedule_time;
_queue.pop();
return task;
}
//////////////////// WorkTaskQueue ////////////////////
//////////////////// PriorityTaskQueue ////////////////////
NormalWorkTaskQueue::NormalWorkTaskQueue() : _closed(false) {
PriorityTaskQueue::PriorityTaskQueue() : _closed(false) {
double factor = 1;
for (int i = 0; i < SUB_QUEUE_LEVEL; ++i) {
_sub_queues[i].set_factor_for_normal(factor);
_sub_queues[i].set_level_factor(factor);
factor *= LEVEL_QUEUE_TIME_FACTOR;
}
int i = 0;
_task_schedule_limit[i] = BASE_LIMIT * (i + 1);
for (i = 1; i < SUB_QUEUE_LEVEL - 1; ++i) {
_task_schedule_limit[i] = _task_schedule_limit[i - 1] + BASE_LIMIT * (i + 1);
}
}
void NormalWorkTaskQueue::close() {
void PriorityTaskQueue::close() {
std::unique_lock<std::mutex> lock(_work_size_mutex);
_closed = true;
_wait_task.notify_all();
}
PipelineTask* NormalWorkTaskQueue::try_take_unprotected(bool is_steal) {
PipelineTask* PriorityTaskQueue::try_take_unprotected(bool is_steal) {
if (_total_task_size == 0 || _closed) {
return nullptr;
}
double normal_schedule_times[SUB_QUEUE_LEVEL];
double min_schedule_time = 0;
int idx = -1;
double min_vruntime = 0;
int level = -1;
for (int i = 0; i < SUB_QUEUE_LEVEL; ++i) {
normal_schedule_times[i] = _sub_queues[i].schedule_time_after_normal();
double cur_queue_vruntime = _sub_queues[i].get_vruntime();
if (!_sub_queues[i].empty()) {
if (idx == -1 || normal_schedule_times[i] < min_schedule_time) {
idx = i;
min_schedule_time = normal_schedule_times[i];
if (level == -1 || cur_queue_vruntime < min_vruntime) {
level = i;
min_vruntime = cur_queue_vruntime;
}
}
}
DCHECK(idx != -1);
// update empty queue's schedule time, to avoid too high priority
for (int i = 0; i < SUB_QUEUE_LEVEL; ++i) {
if (_sub_queues[i].empty() && normal_schedule_times[i] < min_schedule_time) {
_sub_queues[i]._schedule_time = min_schedule_time / _sub_queues[i]._factor_for_normal;
}
}
DCHECK(level != -1);
_queue_level_min_vruntime = min_vruntime;
auto task = _sub_queues[idx].try_take(is_steal);
auto task = _sub_queues[level].try_take(is_steal);
if (task) {
task->update_queue_level(level);
_total_task_size--;
}
return task;
}
int NormalWorkTaskQueue::_compute_level(PipelineTask* task) {
uint32_t schedule_time = task->total_schedule_time();
int PriorityTaskQueue::_compute_level(uint64_t runtime) {
for (int i = 0; i < SUB_QUEUE_LEVEL - 1; ++i) {
if (schedule_time <= _task_schedule_limit[i]) {
if (runtime <= _queue_level_limit[i]) {
return i;
}
}
return SUB_QUEUE_LEVEL - 1;
}
PipelineTask* NormalWorkTaskQueue::try_take(bool is_steal) {
PipelineTask* PriorityTaskQueue::try_take(bool is_steal) {
// TODO other efficient lock? e.g. if get lock fail, return null_ptr
std::unique_lock<std::mutex> lock(_work_size_mutex);
return try_take_unprotected(is_steal);
}
PipelineTask* NormalWorkTaskQueue::take(uint32_t timeout_ms) {
PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
std::unique_lock<std::mutex> lock(_work_size_mutex);
auto task = try_take_unprotected(false);
if (task) {
@ -121,44 +109,53 @@ PipelineTask* NormalWorkTaskQueue::take(uint32_t timeout_ms) {
}
}
Status NormalWorkTaskQueue::push(PipelineTask* task) {
Status PriorityTaskQueue::push(PipelineTask* task) {
if (_closed) {
return Status::InternalError("WorkTaskQueue closed");
}
auto level = _compute_level(task);
auto level = _compute_level(task->get_runtime_ns());
std::unique_lock<std::mutex> lock(_work_size_mutex);
// update empty queue's runtime, to avoid too high priority
if (_sub_queues[level].empty() &&
_queue_level_min_vruntime > _sub_queues[level].get_vruntime()) {
_sub_queues[level].adjust_runtime(_queue_level_min_vruntime);
}
_sub_queues[level].push_back(task);
_total_task_size++;
_wait_task.notify_one();
return Status::OK();
}
NormalTaskQueue::~NormalTaskQueue() = default;
MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;
NormalTaskQueue::NormalTaskQueue(size_t core_size) : TaskQueue(core_size), _closed(false) {
_async_queue.reset(new NormalWorkTaskQueue[core_size]);
MultiCoreTaskQueue::MultiCoreTaskQueue(size_t core_size) : TaskQueue(core_size), _closed(false) {
_prio_task_queue_list.reset(new PriorityTaskQueue[core_size]);
}
void NormalTaskQueue::close() {
void MultiCoreTaskQueue::close() {
_closed = true;
for (int i = 0; i < _core_size; ++i) {
_async_queue[i].close();
_prio_task_queue_list[i].close();
}
}
PipelineTask* NormalTaskQueue::take(size_t core_id) {
PipelineTask* MultiCoreTaskQueue::take(size_t core_id) {
PipelineTask* task = nullptr;
while (!_closed) {
task = _async_queue[core_id].try_take(false);
task = _prio_task_queue_list[core_id].try_take(false);
if (task) {
task->set_core_id(core_id);
break;
}
task = _steal_take(core_id);
if (task) {
break;
}
task = _async_queue[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
task = _prio_task_queue_list[core_id].take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
if (task) {
task->set_core_id(core_id);
break;
}
}
@ -168,7 +165,7 @@ PipelineTask* NormalTaskQueue::take(size_t core_id) {
return task;
}
PipelineTask* NormalTaskQueue::_steal_take(size_t core_id) {
PipelineTask* MultiCoreTaskQueue::_steal_take(size_t core_id) {
DCHECK(core_id < _core_size);
size_t next_id = core_id;
for (size_t i = 1; i < _core_size; ++i) {
@ -177,15 +174,16 @@ PipelineTask* NormalTaskQueue::_steal_take(size_t core_id) {
next_id = 0;
}
DCHECK(next_id < _core_size);
auto task = _async_queue[next_id].try_take(true);
auto task = _prio_task_queue_list[next_id].try_take(true);
if (task) {
task->set_core_id(next_id);
return task;
}
}
return nullptr;
}
Status NormalTaskQueue::push_back(PipelineTask* task) {
Status MultiCoreTaskQueue::push_back(PipelineTask* task) {
int core_id = task->get_previous_core_id();
if (core_id < 0) {
core_id = _next_core.fetch_add(1) % _core_size;
@ -193,10 +191,10 @@ Status NormalTaskQueue::push_back(PipelineTask* task) {
return push_back(task, core_id);
}
Status NormalTaskQueue::push_back(PipelineTask* task, size_t core_id) {
Status MultiCoreTaskQueue::push_back(PipelineTask* task, size_t core_id) {
DCHECK(core_id < _core_size);
task->put_in_runnable_queue();
return _async_queue[core_id].push(task);
return _prio_task_queue_list[core_id].push(task);
}
bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()(

View File

@ -54,34 +54,39 @@ protected:
static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
};
class SubWorkTaskQueue {
friend class WorkTaskQueue;
friend class NormalWorkTaskQueue;
class SubTaskQueue {
friend class PriorityTaskQueue;
public:
void push_back(PipelineTask* task) { _queue.emplace(task); }
PipelineTask* try_take(bool is_steal);
void set_factor_for_normal(double factor_for_normal) { _factor_for_normal = factor_for_normal; }
void set_level_factor(double level_factor) { _level_factor = level_factor; }
double schedule_time_after_normal() { return _schedule_time * _factor_for_normal; }
// note:
// runtime is the time consumed by the actual execution of the task
// vruntime(means virtual runtime) = runtime / _level_factor
double get_vruntime() { return _runtime / _level_factor; }
void inc_runtime(uint64_t delta_time) { _runtime += delta_time; }
void adjust_runtime(uint64_t vruntime) { this->_runtime = vruntime * _level_factor; }
bool empty() { return _queue.empty(); }
private:
std::queue<PipelineTask*> _queue;
// factor for normalization
double _factor_for_normal = 1;
// the value cal the queue task time consume, the WorkTaskQueue
// use it to find the min queue to take task work
std::atomic<uint64_t> _schedule_time = 0;
// depends on LEVEL_QUEUE_TIME_FACTOR
double _level_factor = 1;
std::atomic<uint64_t> _runtime = 0;
};
// Each thread have private MLFQ
class NormalWorkTaskQueue {
// A Multilevel Feedback Queue
class PriorityTaskQueue {
public:
explicit NormalWorkTaskQueue();
explicit PriorityTaskQueue();
void close();
@ -93,27 +98,35 @@ public:
Status push(PipelineTask* task);
void inc_sub_queue_runtime(int level, uint64_t runtime) {
_sub_queues[level].inc_runtime(runtime);
}
private:
static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 1.5;
static constexpr size_t SUB_QUEUE_LEVEL = 5;
// 3, 6, 9, 12
static constexpr uint32_t BASE_LIMIT = 3;
SubWorkTaskQueue _sub_queues[SUB_QUEUE_LEVEL];
uint32_t _task_schedule_limit[SUB_QUEUE_LEVEL - 1];
static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2;
static constexpr size_t SUB_QUEUE_LEVEL = 6;
SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL];
// 1s, 3s, 10s, 60s, 300s
uint64_t _queue_level_limit[SUB_QUEUE_LEVEL - 1] = {1000000000, 3000000000, 10000000000,
60000000000, 300000000000};
std::mutex _work_size_mutex;
std::condition_variable _wait_task;
std::atomic<size_t> _total_task_size = 0;
bool _closed;
int _compute_level(PipelineTask* task);
// used to adjust vruntime of a queue when it's not empty
// protected by lock _work_size_mutex
uint64_t _queue_level_min_vruntime = 0;
int _compute_level(uint64_t real_runtime);
};
// Need consider NUMA architecture
class NormalTaskQueue : public TaskQueue {
class MultiCoreTaskQueue : public TaskQueue {
public:
explicit NormalTaskQueue(size_t core_size);
explicit MultiCoreTaskQueue(size_t core_size);
~NormalTaskQueue() override;
~MultiCoreTaskQueue() override;
void close() override;
@ -126,8 +139,11 @@ public:
Status push_back(PipelineTask* task, size_t core_id) override;
// TODO pipeline update NormalWorkTaskQueue by time_spent.
// void update_statistics(PipelineTask* task, int64_t time_spent) override;
void update_statistics(PipelineTask* task, int64_t time_spent) override {
task->inc_runtime_ns(time_spent);
_prio_task_queue_list[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(),
time_spent);
}
void update_task_group(const taskgroup::TaskGroupInfo& task_group_info,
taskgroup::TaskGroupPtr& task_group) override {
@ -137,7 +153,7 @@ public:
private:
PipelineTask* _steal_take(size_t core_id);
std::unique_ptr<NormalWorkTaskQueue[]> _async_queue;
std::unique_ptr<PriorityTaskQueue[]> _prio_task_queue_list;
std::atomic<size_t> _next_core = 0;
std::atomic<bool> _closed;
};

View File

@ -167,7 +167,7 @@ Status ExecEnv::init_pipeline_task_scheduler() {
}
// TODO pipeline task group combie two blocked schedulers.
auto t_queue = std::make_shared<pipeline::NormalTaskQueue>(executors_size);
auto t_queue = std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
auto b_scheduler = std::make_shared<pipeline::BlockedTaskScheduler>(t_queue);
_pipeline_task_scheduler = new pipeline::TaskScheduler(this, b_scheduler, t_queue);
RETURN_IF_ERROR(_pipeline_task_scheduler->start());