[Bug](pipeline) try fix the exchange sink buffer result error (#27052)

This commit is contained in:
HappenLee
2023-11-16 09:20:56 +08:00
committed by GitHub
parent 02f3762ab3
commit 7ef1f7e511

View File

@ -333,23 +333,28 @@ void TaskScheduler::_do_work(size_t index) {
void TaskScheduler::_try_close_task(PipelineTask* task, PipelineTaskState state,
Status exec_status) {
auto status = task->try_close(exec_status);
if (!status.ok() && state != PipelineTaskState::CANCELED) {
// Call `close` if `try_close` failed to make sure allocated resources are released
static_cast<void>(task->close(exec_status));
auto cancel = [&]() {
task->query_context()->cancel(true, status.to_string(),
Status::Cancelled(status.to_string()));
state = PipelineTaskState::CANCELED;
} else if (task->is_pending_finish()) {
};
auto try_close_failed = !status.ok() && state != PipelineTaskState::CANCELED;
if (try_close_failed) {
cancel();
// Call `close` if `try_close` failed to make sure allocated resources are released
static_cast<void>(task->close(exec_status));
} else if (!task->is_pending_finish()) {
status = task->close(exec_status);
if (!status.ok() && state != PipelineTaskState::CANCELED) {
cancel();
}
}
if (task->is_pending_finish()) {
task->set_state(PipelineTaskState::PENDING_FINISH);
static_cast<void>(_blocked_task_scheduler->add_blocked_task(task));
return;
} else {
status = task->close(exec_status);
if (!status.ok() && state != PipelineTaskState::CANCELED) {
task->query_context()->cancel(true, status.to_string(),
Status::Cancelled(status.to_string()));
state = PipelineTaskState::CANCELED;
}
}
task->set_state(state);
task->set_close_pipeline_time();