From ed48d321d00967b400d3fa19473ead665e82e8c6 Mon Sep 17 00:00:00 2001 From: Mryange <59914473+Mryange@users.noreply.github.com> Date: Sat, 30 Mar 2024 20:05:49 +0800 Subject: [PATCH] [fix](pipelineX) fix error open in scan (#33068) --- .../pipeline/pipeline_x/pipeline_x_task.cpp | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 689c4e672e..da5da2f047 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -189,16 +189,21 @@ Status PipelineXTask::_open() { _dry_run = _sink->should_dry_run(_state); for (auto& o : _operators) { auto* local_state = _state->get_local_state(o->operator_id()); - auto st = local_state->open(_state); - if (st.is()) { - DCHECK(_filter_dependency); - _blocked_dep = _filter_dependency->is_blocked_by(this); - if (_blocked_dep) { - set_state(PipelineTaskState::BLOCKED_FOR_RF); + for (size_t i = 0; i < 2; i++) { + auto st = local_state->open(_state); + if (st.is()) { + DCHECK(_filter_dependency); + _blocked_dep = _filter_dependency->is_blocked_by(this); + if (_blocked_dep) { + set_state(PipelineTaskState::BLOCKED_FOR_RF); + RETURN_IF_ERROR(st); + } else if (i == 1) { + return Status::InternalError("Unknown RF error, task was blocked by RF twice"); + } + } else { RETURN_IF_ERROR(st); + break; } - } else { - RETURN_IF_ERROR(st); } } RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state));