From 2ec1d282c5e27b25d37baf91cacde082cca4ec31 Mon Sep 17 00:00:00 2001 From: TengJianPing <18241664+jacktengg@users.noreply.github.com> Date: Thu, 25 May 2023 10:29:35 +0800 Subject: [PATCH] [fix](sink) fix END_OF_FILE error for pipeline caused by VDataStreamSender eof (#20007) * [fix](sink) fix END_OF_FILE error for pipeline caused by VDataStreamSender eof --- be/src/pipeline/exec/operator.h | 7 ++++++- be/src/vec/sink/vresult_file_sink.cpp | 5 ++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index eff683efb3..d2a40ab553 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -286,7 +286,12 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override { if (in_block->rows() > 0) { - return _sink->send(state, in_block, source_state == SourceState::FINISHED); + auto st = _sink->send(state, in_block, source_state == SourceState::FINISHED); + // TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished + if (st.template is()) { + return Status::OK(); + } + return st; } return Status::OK(); } diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp index a4b15f032a..92b396d189 100644 --- a/be/src/vec/sink/vresult_file_sink.cpp +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -178,7 +178,10 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) { state->fragment_instance_id()); } else { if (final_status.ok()) { - RETURN_IF_ERROR(_stream_sender->send(state, _output_block.get(), true)); + auto st = _stream_sender->send(state, _output_block.get(), true); + if (!st.template is()) { + RETURN_IF_ERROR(st); + } } RETURN_IF_ERROR(_stream_sender->close(state, final_status)); _output_block->clear();