query cpu hard limit based on doris scheduler (#24844)
This commit is contained in:
@ -53,8 +53,16 @@ bool ScanTaskQueue::try_get(ScanTask* scan_task, uint32_t timeout_ms) {
|
||||
return r;
|
||||
}
|
||||
|
||||
ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) : _core_size(core_size) {}
|
||||
ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() = default;
|
||||
ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) : _core_size(core_size) {
|
||||
_empty_scan_task->scan_entity = _empty_group_entity;
|
||||
_empty_scan_task->is_empty_task = true;
|
||||
_empty_group_entity->set_empty_group_entity(true);
|
||||
}
|
||||
|
||||
ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() {
|
||||
delete _empty_group_entity;
|
||||
delete _empty_scan_task;
|
||||
}
|
||||
|
||||
void ScanTaskTaskGroupQueue::close() {
|
||||
std::unique_lock<std::mutex> lock(_rs_mutex);
|
||||
@ -78,9 +86,16 @@ bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if (entity->is_empty_group_entity()) {
|
||||
*scan_task = *_empty_scan_task;
|
||||
return true;
|
||||
}
|
||||
DCHECK(entity->task_size() > 0);
|
||||
if (entity->task_size() == 1) {
|
||||
_dequeue_task_group(entity);
|
||||
if (_enable_cpu_hard_limit) {
|
||||
reset_empty_group_entity();
|
||||
}
|
||||
}
|
||||
return entity->task_queue()->try_get(scan_task, WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */);
|
||||
}
|
||||
@ -95,6 +110,9 @@ bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) {
|
||||
}
|
||||
if (_group_entities.find(entity) == _group_entities.end()) {
|
||||
_enqueue_task_group(entity);
|
||||
if (_enable_cpu_hard_limit) {
|
||||
reset_empty_group_entity();
|
||||
}
|
||||
}
|
||||
_wait_task.notify_one();
|
||||
return true;
|
||||
@ -132,6 +150,31 @@ void ScanTaskTaskGroupQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo&
|
||||
}
|
||||
}
|
||||
|
||||
void ScanTaskTaskGroupQueue::reset_empty_group_entity() {
|
||||
int user_g_cpu_hard_limit = 0;
|
||||
bool contains_empty_group = false;
|
||||
for (auto* entity : _group_entities) {
|
||||
if (!entity->is_empty_group_entity()) {
|
||||
user_g_cpu_hard_limit += entity->cpu_share();
|
||||
} else {
|
||||
contains_empty_group = true;
|
||||
}
|
||||
}
|
||||
|
||||
// 0 <= user_g_cpu_hard_limit <= 100, bound by FE
|
||||
// user_g_cpu_hard_limit = 0 means no group exists
|
||||
int empty_group_cpu_share = 100 - user_g_cpu_hard_limit;
|
||||
if (empty_group_cpu_share > 0 && empty_group_cpu_share < 100 && !contains_empty_group) {
|
||||
_empty_group_entity->update_empty_cpu_share(empty_group_cpu_share);
|
||||
_enqueue_task_group(_empty_group_entity);
|
||||
} else if ((empty_group_cpu_share == 0 || empty_group_cpu_share == 100) &&
|
||||
contains_empty_group) {
|
||||
// no need to update empty group here
|
||||
// only update empty group's cpu share when exec enqueue
|
||||
_dequeue_task_group(_empty_group_entity);
|
||||
}
|
||||
}
|
||||
|
||||
void ScanTaskTaskGroupQueue::_enqueue_task_group(TGSTEntityPtr tg_entity) {
|
||||
_total_cpu_share += tg_entity->cpu_share();
|
||||
// TODO llj tg If submitted back to this queue from the scanner thread, `adjust_vruntime_ns`
|
||||
|
||||
Reference in New Issue
Block a user