[improvement](pipeline) task group scan entity (#19924)
This commit is contained in:
@ -27,11 +27,14 @@
|
||||
#include <utility>
|
||||
|
||||
#include "common/logging.h"
|
||||
#include "pipeline/task_queue.h"
|
||||
#include "pipeline/task_scheduler.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "runtime/memory/mem_tracker_limiter.h"
|
||||
#include "util/mem_info.h"
|
||||
#include "util/parse_util.h"
|
||||
#include "vec/exec/scan/scan_task_queue.h"
|
||||
#include "vec/exec/scan/scanner_scheduler.h"
|
||||
|
||||
namespace doris {
|
||||
namespace taskgroup {
|
||||
@ -40,50 +43,75 @@ const static std::string CPU_SHARE = "cpu_share";
|
||||
const static std::string MEMORY_LIMIT = "memory_limit";
|
||||
const static std::string ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit";
|
||||
|
||||
pipeline::PipelineTask* TaskGroupEntity::take() {
|
||||
if (_queue.empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
auto task = _queue.front();
|
||||
_queue.pop();
|
||||
return task;
|
||||
template <typename QueueType>
|
||||
TaskGroupEntity<QueueType>::TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type)
|
||||
: _tg(tg), _type(type), _version(tg->version()), _cpu_share(tg->cpu_share()) {
|
||||
_task_queue = new QueueType();
|
||||
}
|
||||
|
||||
void TaskGroupEntity::incr_runtime_ns(uint64_t runtime_ns) {
|
||||
auto v_time = runtime_ns / _tg->cpu_share();
|
||||
template <typename QueueType>
|
||||
TaskGroupEntity<QueueType>::~TaskGroupEntity() {
|
||||
delete _task_queue;
|
||||
}
|
||||
|
||||
template <typename QueueType>
|
||||
QueueType* TaskGroupEntity<QueueType>::task_queue() {
|
||||
return _task_queue;
|
||||
}
|
||||
|
||||
template <typename QueueType>
|
||||
void TaskGroupEntity<QueueType>::incr_runtime_ns(uint64_t runtime_ns) {
|
||||
auto v_time = runtime_ns / _cpu_share;
|
||||
_vruntime_ns += v_time;
|
||||
}
|
||||
|
||||
void TaskGroupEntity::adjust_vruntime_ns(uint64_t vruntime_ns) {
|
||||
template <typename QueueType>
|
||||
void TaskGroupEntity<QueueType>::adjust_vruntime_ns(uint64_t vruntime_ns) {
|
||||
VLOG_DEBUG << "adjust " << debug_string() << "vtime to " << vruntime_ns;
|
||||
_vruntime_ns = vruntime_ns;
|
||||
}
|
||||
|
||||
void TaskGroupEntity::push_back(pipeline::PipelineTask* task) {
|
||||
_queue.emplace(task);
|
||||
template <typename QueueType>
|
||||
size_t TaskGroupEntity<QueueType>::task_size() const {
|
||||
return _task_queue->size();
|
||||
}
|
||||
|
||||
uint64_t TaskGroupEntity::cpu_share() const {
|
||||
return _tg->cpu_share();
|
||||
template <typename QueueType>
|
||||
uint64_t TaskGroupEntity<QueueType>::cpu_share() const {
|
||||
return _cpu_share;
|
||||
}
|
||||
|
||||
uint64_t TaskGroupEntity::task_group_id() const {
|
||||
template <typename QueueType>
|
||||
uint64_t TaskGroupEntity<QueueType>::task_group_id() const {
|
||||
return _tg->id();
|
||||
}
|
||||
|
||||
std::string TaskGroupEntity::debug_string() const {
|
||||
return fmt::format("TGE[id = {}, cpu_share = {}, task size: {}, v_time:{}ns]", _tg->id(),
|
||||
cpu_share(), _queue.size(), _vruntime_ns);
|
||||
template <typename QueueType>
|
||||
void TaskGroupEntity<QueueType>::check_and_update_cpu_share(const TaskGroupInfo& tg_info) {
|
||||
if (tg_info.version > _version) {
|
||||
_cpu_share = tg_info.cpu_share;
|
||||
_version = tg_info.version;
|
||||
}
|
||||
}
|
||||
|
||||
template <typename QueueType>
|
||||
std::string TaskGroupEntity<QueueType>::debug_string() const {
|
||||
return fmt::format("TGE[id = {}, name = {}-{}, cpu_share = {}, task size: {}, v_time:{} ns]",
|
||||
_tg->id(), _tg->name(), _type, cpu_share(), task_size(), _vruntime_ns);
|
||||
}
|
||||
|
||||
template class TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
|
||||
template class TaskGroupEntity<ScanTaskQueue>;
|
||||
|
||||
TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
|
||||
: _id(tg_info.id),
|
||||
_name(tg_info.name),
|
||||
_cpu_share(tg_info.cpu_share),
|
||||
_version(tg_info.version),
|
||||
_memory_limit(tg_info.memory_limit),
|
||||
_enable_memory_overcommit(tg_info.enable_memory_overcommit),
|
||||
_version(tg_info.version),
|
||||
_task_entity(this),
|
||||
_cpu_share(tg_info.cpu_share),
|
||||
_task_entity(this, "pipeline task entity"),
|
||||
_local_scan_entity(this, "local scan entity"),
|
||||
_mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM) {}
|
||||
|
||||
std::string TaskGroup::debug_string() const {
|
||||
@ -105,22 +133,22 @@ void TaskGroup::check_and_update(const TaskGroupInfo& tg_info) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
std::lock_guard<std::shared_mutex> wl {_mutex};
|
||||
if (tg_info.version > _version) {
|
||||
_name = tg_info.name;
|
||||
_version = tg_info.version;
|
||||
_memory_limit = tg_info.memory_limit;
|
||||
_enable_memory_overcommit = tg_info.enable_memory_overcommit;
|
||||
if (_cpu_share != tg_info.cpu_share) {
|
||||
ExecEnv::GetInstance()->pipeline_task_group_scheduler()->update_tg_cpu_share(
|
||||
tg_info, shared_from_this());
|
||||
{
|
||||
std::lock_guard<std::shared_mutex> wl {_mutex};
|
||||
if (tg_info.version > _version) {
|
||||
_name = tg_info.name;
|
||||
_version = tg_info.version;
|
||||
_memory_limit = tg_info.memory_limit;
|
||||
_enable_memory_overcommit = tg_info.enable_memory_overcommit;
|
||||
_cpu_share = tg_info.cpu_share;
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void TaskGroup::update_cpu_share_unlock(const TaskGroupInfo& tg_info) {
|
||||
_cpu_share = tg_info.cpu_share;
|
||||
ExecEnv::GetInstance()->pipeline_task_group_scheduler()->task_queue()->update_tg_cpu_share(
|
||||
tg_info, &_task_entity);
|
||||
ExecEnv::GetInstance()->scanner_scheduler()->local_scan_task_queue()->update_tg_cpu_share(
|
||||
tg_info, &_local_scan_entity);
|
||||
}
|
||||
|
||||
int64_t TaskGroup::memory_used() {
|
||||
|
||||
Reference in New Issue
Block a user