diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp index 293769162f..601db7014b 100644 --- a/be/src/pipeline/task_queue.cpp +++ b/be/src/pipeline/task_queue.cpp @@ -154,22 +154,20 @@ void MultiCoreTaskQueue::close() { PipelineTask* MultiCoreTaskQueue::take(int core_id) { PipelineTask* task = nullptr; - auto prio_task_queue_list = - std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed); while (!_closed) { - DCHECK(prio_task_queue_list->size() > core_id) - << " list size: " << prio_task_queue_list->size() << " core_id: " << core_id + DCHECK(_prio_task_queue_list->size() > core_id) + << " list size: " << _prio_task_queue_list->size() << " core_id: " << core_id << " _core_size: " << _core_size << " _next_core: " << _next_core.load(); - task = (*prio_task_queue_list)[core_id]->try_take(false); + task = (*_prio_task_queue_list)[core_id]->try_take(false); if (task) { task->set_core_id(core_id); break; } - task = _steal_take(core_id, *prio_task_queue_list); + task = _steal_take(core_id, *_prio_task_queue_list); if (task) { break; } - task = (*prio_task_queue_list)[core_id]->take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); + task = (*_prio_task_queue_list)[core_id]->take(WAIT_CORE_TASK_TIMEOUT_MS /* timeout_ms */); if (task) { task->set_core_id(core_id); break; @@ -211,9 +209,7 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task) { Status MultiCoreTaskQueue::push_back(PipelineTask* task, int core_id) { DCHECK(core_id < _core_size); task->put_in_runnable_queue(); - auto prio_task_queue_list = - std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed); - return (*prio_task_queue_list)[core_id]->push(task); + return (*_prio_task_queue_list)[core_id]->push(task); } } // namespace pipeline diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h index e93d58ed07..01282be7be 100644 --- a/be/src/pipeline/task_queue.h +++ b/be/src/pipeline/task_queue.h @@ -144,10 +144,8 @@ public: void update_statistics(PipelineTask* task, int64_t time_spent) override { task->inc_runtime_ns(time_spent); - auto prio_task_queue_list = - std::atomic_load_explicit(&_prio_task_queue_list, std::memory_order_relaxed); - (*prio_task_queue_list)[task->get_core_id()]->inc_sub_queue_runtime(task->get_queue_level(), - time_spent); + (*_prio_task_queue_list)[task->get_core_id()]->inc_sub_queue_runtime( + task->get_queue_level(), time_spent); } private: