From a6925cc0cfb1ddc31ef5a20b0fb9059fbdb2ab15 Mon Sep 17 00:00:00 2001 From: wangbo Date: Fri, 20 Oct 2023 18:56:01 +0800 Subject: [PATCH] Fix exchange operator can not aware end of file (#25562) --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 3 +++ be/src/pipeline/exec/operator.h | 7 +------ 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 31a8a1852a..b3d2222e50 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -148,6 +148,9 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { return Status::OK(); } TUniqueId ins_id = request.channel->_fragment_instance_id; + if (_is_receiver_eof(ins_id.lo)) { + return Status::EndOfFile("receiver eof"); + } bool send_now = false; { std::unique_lock lock(*_instance_to_package_queue_mutex[ins_id.lo]); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 4ba2aec977..125f8fd89e 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -295,12 +295,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override { if (in_block->rows() > 0 || source_state == SourceState::FINISHED) { - auto st = _sink->sink(state, in_block, source_state == SourceState::FINISHED); - // TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished - if (st.template is()) { - return Status::OK(); - } - return st; + return _sink->sink(state, in_block, source_state == SourceState::FINISHED); } return Status::OK(); }