[fix](sink) fix END_OF_FILE error for pipeline caused by VDataStreamSender eof (#20007)
* [fix](sink) fix END_OF_FILE error for pipeline caused by VDataStreamSender eof
This commit is contained in:
@ -286,7 +286,12 @@ public:
|
||||
Status sink(RuntimeState* state, vectorized::Block* in_block,
|
||||
SourceState source_state) override {
|
||||
if (in_block->rows() > 0) {
|
||||
return _sink->send(state, in_block, source_state == SourceState::FINISHED);
|
||||
auto st = _sink->send(state, in_block, source_state == SourceState::FINISHED);
|
||||
// TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished
|
||||
if (st.template is<ErrorCode::END_OF_FILE>()) {
|
||||
return Status::OK();
|
||||
}
|
||||
return st;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -178,7 +178,10 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) {
|
||||
state->fragment_instance_id());
|
||||
} else {
|
||||
if (final_status.ok()) {
|
||||
RETURN_IF_ERROR(_stream_sender->send(state, _output_block.get(), true));
|
||||
auto st = _stream_sender->send(state, _output_block.get(), true);
|
||||
if (!st.template is<ErrorCode::END_OF_FILE>()) {
|
||||
RETURN_IF_ERROR(st);
|
||||
}
|
||||
}
|
||||
RETURN_IF_ERROR(_stream_sender->close(state, final_status));
|
||||
_output_block->clear();
|
||||
|
||||
Reference in New Issue
Block a user