From 7ef1f7e511c74d0d72ddeb0bd854ae4dd2d754e0 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Thu, 16 Nov 2023 09:20:56 +0800 Subject: [PATCH] [Bug](pipeline) try fix the exchange sink buffer result error (#27052) --- be/src/pipeline/task_scheduler.cpp | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index d197878255..aa3891a5a2 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -333,23 +333,28 @@ void TaskScheduler::_do_work(size_t index) { void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state, Status exec_status) { auto status = task->try_close(exec_status); - if (!status.ok() && state != PipelineTaskState::CANCELED) { - // Call `close` if `try_close` failed to make sure allocated resources are released - static_cast(task->close(exec_status)); + auto cancel = [&]() { task->query_context()->cancel(true, status.to_string(), Status::Cancelled(status.to_string())); state = PipelineTaskState::CANCELED; - } else if (task->is_pending_finish()) { + }; + + auto try_close_failed = !status.ok() && state != PipelineTaskState::CANCELED; + if (try_close_failed) { + cancel(); + // Call `close` if `try_close` failed to make sure allocated resources are released + static_cast(task->close(exec_status)); + } else if (!task->is_pending_finish()) { + status = task->close(exec_status); + if (!status.ok() && state != PipelineTaskState::CANCELED) { + cancel(); + } + } + + if (task->is_pending_finish()) { task->set_state(PipelineTaskState::PENDING_FINISH); static_cast(_blocked_task_scheduler->add_blocked_task(task)); return; - } else { - status = task->close(exec_status); - if (!status.ok() && state != PipelineTaskState::CANCELED) { - task->query_context()->cancel(true, status.to_string(), - Status::Cancelled(status.to_string())); - state = PipelineTaskState::CANCELED; - } } task->set_state(state); task->set_close_pipeline_time();