Fix exchange operator can not aware end of file (#25562)

This commit is contained in:
wangbo
2023-10-20 18:56:01 +08:00
committed by GitHub
parent 7f1e3e48cd
commit a6925cc0cf
2 changed files with 4 additions and 6 deletions

View File

@ -148,6 +148,9 @@ Status ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
return Status::OK();
}
TUniqueId ins_id = request.channel->_fragment_instance_id;
if (_is_receiver_eof(ins_id.lo)) {
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
{
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[ins_id.lo]);

View File

@ -295,12 +295,7 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
auto st = _sink->sink(state, in_block, source_state == SourceState::FINISHED);
// TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished
if (st.template is<ErrorCode::END_OF_FILE>()) {
return Status::OK();
}
return st;
return _sink->sink(state, in_block, source_state == SourceState::FINISHED);
}
return Status::OK();
}