216 lines
6.6 KiB
C++
216 lines
6.6 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
#pragma once
|
|
|
|
#include <glog/logging.h>
|
|
#include <stddef.h>
|
|
#include <stdint.h>
|
|
|
|
#include <atomic>
|
|
#include <condition_variable>
|
|
#include <memory>
|
|
#include <mutex>
|
|
#include <ostream>
|
|
#include <queue>
|
|
#include <set>
|
|
|
|
#include "common/status.h"
|
|
#include "pipeline_task.h"
|
|
#include "runtime/task_group/task_group.h"
|
|
|
|
namespace doris {
|
|
|
|
namespace pipeline {
|
|
|
|
class TaskQueue {
|
|
public:
|
|
TaskQueue(size_t core_size) : _core_size(core_size) {}
|
|
virtual ~TaskQueue();
|
|
virtual void close() = 0;
|
|
// Get the task by core id.
|
|
// TODO: To think the logic is useful?
|
|
virtual PipelineTask* take(size_t core_id) = 0;
|
|
|
|
// push from scheduler
|
|
virtual Status push_back(PipelineTask* task) = 0;
|
|
|
|
// push from worker
|
|
virtual Status push_back(PipelineTask* task, size_t core_id) = 0;
|
|
|
|
virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}
|
|
|
|
virtual void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
|
|
taskgroup::TaskGroupPtr task_group) = 0;
|
|
|
|
int cores() const { return _core_size; }
|
|
|
|
protected:
|
|
size_t _core_size;
|
|
static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
|
|
};
|
|
|
|
class SubTaskQueue {
|
|
friend class PriorityTaskQueue;
|
|
|
|
public:
|
|
void push_back(PipelineTask* task) { _queue.emplace(task); }
|
|
|
|
PipelineTask* try_take(bool is_steal);
|
|
|
|
void set_level_factor(double level_factor) { _level_factor = level_factor; }
|
|
|
|
// note:
|
|
// runtime is the time consumed by the actual execution of the task
|
|
// vruntime(means virtual runtime) = runtime / _level_factor
|
|
double get_vruntime() { return _runtime / _level_factor; }
|
|
|
|
void inc_runtime(uint64_t delta_time) { _runtime += delta_time; }
|
|
|
|
void adjust_runtime(uint64_t vruntime) { this->_runtime = vruntime * _level_factor; }
|
|
|
|
bool empty() { return _queue.empty(); }
|
|
|
|
private:
|
|
std::queue<PipelineTask*> _queue;
|
|
// depends on LEVEL_QUEUE_TIME_FACTOR
|
|
double _level_factor = 1;
|
|
|
|
std::atomic<uint64_t> _runtime = 0;
|
|
};
|
|
|
|
// A Multilevel Feedback Queue
|
|
class PriorityTaskQueue {
|
|
public:
|
|
explicit PriorityTaskQueue();
|
|
|
|
void close();
|
|
|
|
PipelineTask* try_take_unprotected(bool is_steal);
|
|
|
|
PipelineTask* try_take(bool is_steal);
|
|
|
|
PipelineTask* take(uint32_t timeout_ms = 0);
|
|
|
|
Status push(PipelineTask* task);
|
|
|
|
void inc_sub_queue_runtime(int level, uint64_t runtime) {
|
|
_sub_queues[level].inc_runtime(runtime);
|
|
}
|
|
|
|
private:
|
|
static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2;
|
|
static constexpr size_t SUB_QUEUE_LEVEL = 6;
|
|
SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL];
|
|
// 1s, 3s, 10s, 60s, 300s
|
|
uint64_t _queue_level_limit[SUB_QUEUE_LEVEL - 1] = {1000000000, 3000000000, 10000000000,
|
|
60000000000, 300000000000};
|
|
std::mutex _work_size_mutex;
|
|
std::condition_variable _wait_task;
|
|
std::atomic<size_t> _total_task_size = 0;
|
|
bool _closed;
|
|
|
|
// used to adjust vruntime of a queue when it's not empty
|
|
// protected by lock _work_size_mutex
|
|
uint64_t _queue_level_min_vruntime = 0;
|
|
|
|
int _compute_level(uint64_t real_runtime);
|
|
};
|
|
|
|
// Need consider NUMA architecture
|
|
class MultiCoreTaskQueue : public TaskQueue {
|
|
public:
|
|
explicit MultiCoreTaskQueue(size_t core_size);
|
|
|
|
~MultiCoreTaskQueue() override;
|
|
|
|
void close() override;
|
|
|
|
// Get the task by core id.
|
|
// TODO: To think the logic is useful?
|
|
PipelineTask* take(size_t core_id) override;
|
|
|
|
// TODO combine these methods to `push_back(task, core_id = -1)`
|
|
Status push_back(PipelineTask* task) override;
|
|
|
|
Status push_back(PipelineTask* task, size_t core_id) override;
|
|
|
|
void update_statistics(PipelineTask* task, int64_t time_spent) override {
|
|
task->inc_runtime_ns(time_spent);
|
|
_prio_task_queue_list[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(),
|
|
time_spent);
|
|
}
|
|
|
|
void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
|
|
taskgroup::TaskGroupPtr task_group) override {
|
|
LOG(FATAL) << "update_tg_cpu_share not implemented";
|
|
}
|
|
|
|
private:
|
|
PipelineTask* _steal_take(size_t core_id);
|
|
|
|
std::unique_ptr<PriorityTaskQueue[]> _prio_task_queue_list;
|
|
std::atomic<size_t> _next_core = 0;
|
|
std::atomic<bool> _closed;
|
|
};
|
|
|
|
class TaskGroupTaskQueue : public TaskQueue {
|
|
public:
|
|
explicit TaskGroupTaskQueue(size_t);
|
|
~TaskGroupTaskQueue() override;
|
|
|
|
void close() override;
|
|
|
|
PipelineTask* take(size_t core_id) override;
|
|
|
|
// from TaskScheduler or BlockedTaskScheduler
|
|
Status push_back(PipelineTask* task) override;
|
|
|
|
// from worker
|
|
Status push_back(PipelineTask* task, size_t core_id) override;
|
|
|
|
void update_statistics(PipelineTask* task, int64_t time_spent) override;
|
|
|
|
void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
|
|
taskgroup::TaskGroupPtr task_group) override;
|
|
|
|
private:
|
|
template <bool from_executor>
|
|
Status _push_back(PipelineTask* task);
|
|
template <bool from_worker>
|
|
void _enqueue_task_group(taskgroup::TGEntityPtr);
|
|
void _dequeue_task_group(taskgroup::TGEntityPtr);
|
|
taskgroup::TGEntityPtr _next_tg_entity();
|
|
int64_t _ideal_runtime_ns(taskgroup::TGEntityPtr tg_entity) const;
|
|
void _update_min_tg();
|
|
|
|
// Like cfs rb tree in sched_entity
|
|
struct TaskGroupSchedEntityComparator {
|
|
bool operator()(const taskgroup::TGEntityPtr&, const taskgroup::TGEntityPtr&) const;
|
|
};
|
|
using ResouceGroupSet = std::set<taskgroup::TGEntityPtr, TaskGroupSchedEntityComparator>;
|
|
ResouceGroupSet _group_entities;
|
|
std::condition_variable _wait_task;
|
|
std::mutex _rs_mutex;
|
|
bool _closed = false;
|
|
int _total_cpu_share = 0;
|
|
std::atomic<taskgroup::TGEntityPtr> _min_tg_entity = nullptr;
|
|
uint64_t _min_tg_v_runtime_ns = 0;
|
|
};
|
|
|
|
} // namespace pipeline
|
|
} // namespace doris
|