[Pipeline](Exec) disable work steal of hash join build (#15652)

This commit is contained in:
HappenLee
2023-01-06 15:08:10 +08:00
committed by GitHub
parent f24659c003
commit 1038093c29
5 changed files with 42 additions and 18 deletions

View File

@ -20,12 +20,15 @@
namespace doris {
namespace pipeline {
PipelineTask* SubWorkTaskQueue::try_take() {
PipelineTask* SubWorkTaskQueue::try_take(bool is_steal) {
if (_queue.empty()) {
return nullptr;
}
++_schedule_time;
auto task = _queue.front();
if (!task->can_steal() && is_steal) {
return nullptr;
}
++_schedule_time;
_queue.pop();
return task;
}
@ -52,7 +55,7 @@ void WorkTaskQueue::close() {
_wait_task.notify_all();
}
PipelineTask* WorkTaskQueue::try_take_unprotected() {
PipelineTask* WorkTaskQueue::try_take_unprotected(bool is_steal) {
if (_total_task_size == 0 || _closed) {
return nullptr;
}
@ -76,7 +79,7 @@ PipelineTask* WorkTaskQueue::try_take_unprotected() {
}
}
auto task = _sub_queues[idx].try_take();
auto task = _sub_queues[idx].try_take(is_steal);
if (task) {
_total_task_size--;
}
@ -93,15 +96,15 @@ int WorkTaskQueue::_compute_level(PipelineTask* task) {
return SUB_QUEUE_LEVEL - 1;
}
PipelineTask* WorkTaskQueue::try_take() {
PipelineTask* WorkTaskQueue::try_take(bool is_steal) {
// TODO other efficient lock? e.g. if get lock fail, return null_ptr
std::unique_lock<std::mutex> lock(_work_size_mutex);
return try_take_unprotected();
return try_take_unprotected(is_steal);
}
PipelineTask* WorkTaskQueue::take(uint32_t timeout_ms) {
std::unique_lock<std::mutex> lock(_work_size_mutex);
auto task = try_take_unprotected();
auto task = try_take_unprotected(false);
if (task) {
return task;
} else {
@ -110,7 +113,7 @@ PipelineTask* WorkTaskQueue::take(uint32_t timeout_ms) {
} else {
_wait_task.wait(lock);
}
return try_take_unprotected();
return try_take_unprotected(false);
}
}
@ -138,7 +141,7 @@ void TaskQueue::close() {
PipelineTask* TaskQueue::try_take(size_t core_id) {
PipelineTask* task;
while (!_closed) {
task = _async_queue[core_id].try_take();
task = _async_queue[core_id].try_take(false);
if (task) {
break;
}
@ -166,7 +169,7 @@ PipelineTask* TaskQueue::steal_take(size_t core_id) {
next_id = 0;
}
DCHECK(next_id < _core_size);
auto task = _async_queue[next_id].try_take();
auto task = _async_queue[next_id].try_take(true);
if (task) {
return task;
}