[fix](pipelineX) fix error open in scan (#33068)
This commit is contained in:
@ -189,16 +189,21 @@ Status PipelineXTask::_open() {
|
||||
_dry_run = _sink->should_dry_run(_state);
|
||||
for (auto& o : _operators) {
|
||||
auto* local_state = _state->get_local_state(o->operator_id());
|
||||
auto st = local_state->open(_state);
|
||||
if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
|
||||
DCHECK(_filter_dependency);
|
||||
_blocked_dep = _filter_dependency->is_blocked_by(this);
|
||||
if (_blocked_dep) {
|
||||
set_state(PipelineTaskState::BLOCKED_FOR_RF);
|
||||
for (size_t i = 0; i < 2; i++) {
|
||||
auto st = local_state->open(_state);
|
||||
if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
|
||||
DCHECK(_filter_dependency);
|
||||
_blocked_dep = _filter_dependency->is_blocked_by(this);
|
||||
if (_blocked_dep) {
|
||||
set_state(PipelineTaskState::BLOCKED_FOR_RF);
|
||||
RETURN_IF_ERROR(st);
|
||||
} else if (i == 1) {
|
||||
return Status::InternalError("Unknown RF error, task was blocked by RF twice");
|
||||
}
|
||||
} else {
|
||||
RETURN_IF_ERROR(st);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
RETURN_IF_ERROR(st);
|
||||
}
|
||||
}
|
||||
RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state));
|
||||
|
||||
Reference in New Issue
Block a user