From 3598518e593288df9a559f2cc90090a83b99b191 Mon Sep 17 00:00:00 2001 From: TengJianPing <18241664+jacktengg@users.noreply.github.com> Date: Thu, 25 May 2023 16:50:17 +0800 Subject: [PATCH] [fix](revert) data stream sender stop sending data to receiver if it returns eos early (#19847)" (#20040) * Revert "[fix](sink) fix END_OF_FILE error for pipeline caused by VDataStreamSender eof (#20007)" This reverts commit 2ec1d282c5e27b25d37baf91cacde082cca4ec31. * [fix](revert) data stream sender stop sending data to receiver if it returns eos early (#19847)" This reverts commit c73003359567067ea7d44e4a06c1670c9ec37902. --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 24 +--- be/src/pipeline/exec/exchange_sink_buffer.h | 3 - be/src/pipeline/exec/operator.h | 7 +- be/src/vec/runtime/vdata_stream_mgr.cpp | 6 +- be/src/vec/sink/vdata_stream_sender.cpp | 110 +++++------------- be/src/vec/sink/vdata_stream_sender.h | 47 ++------ be/src/vec/sink/vresult_file_sink.cpp | 5 +- 7 files changed, 44 insertions(+), 158 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 8618a0d445..db7eb31a82 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -152,7 +152,6 @@ void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) { finst_id.set_lo(fragment_instance_id.lo); _instance_to_finst_id[low_id] = finst_id; _instance_to_sending_by_pipeline[low_id] = true; - _instance_to_receiver_eof[low_id] = false; } Status ExchangeSinkBuffer::add_block(TransmitInfo&& request) { @@ -182,9 +181,6 @@ Status ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) { return Status::OK(); } TUniqueId ins_id = request.channel->_fragment_instance_id; - if (_is_receiver_eof(ins_id.lo)) { - return Status::EndOfFile("receiver eof"); - } bool send_now = false; request.block_holder->ref(); { @@ -234,9 +230,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, const PTransmitDataResult& result) { Status s = Status(result.status()); - if (s.is()) { - _set_receiver_eof(id); - } else if (!s.ok()) { + if (!s.ok()) { _failed(id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); } else if (eos) { @@ -279,9 +273,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { _closure->addSuccessHandler([&](const InstanceLoId& id, const bool& eos, const PTransmitDataResult& result) { Status s = Status(result.status()); - if (s.is()) { - _set_receiver_eof(id); - } else if (!s.ok()) { + if (!s.ok()) { _failed(id, fmt::format("exchange req success but status isn't ok: {}", s.to_string())); } else if (eos) { @@ -331,16 +323,6 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { _is_finishing = true; _context->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, err); _ended(id); -} - -void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { - std::unique_lock lock(*_instance_to_package_queue_mutex[id]); - _instance_to_receiver_eof[id] = true; -} - -bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) { - std::unique_lock lock(*_instance_to_package_queue_mutex[id]); - return _instance_to_receiver_eof[id]; -} +}; } // namespace doris::pipeline diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index e71df9053d..17093bca87 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -85,7 +85,6 @@ private: phmap::flat_hash_map _instance_to_request; phmap::flat_hash_map _instance_to_finst_id; phmap::flat_hash_map _instance_to_sending_by_pipeline; - phmap::flat_hash_map _instance_to_receiver_eof; std::atomic _is_finishing; PUniqueId _query_id; @@ -101,8 +100,6 @@ private: void _construct_request(InstanceLoId id); inline void _ended(InstanceLoId id); inline void _failed(InstanceLoId id, const std::string& err); - inline void _set_receiver_eof(InstanceLoId id); - inline bool _is_receiver_eof(InstanceLoId id); }; } // namespace pipeline diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index d2a40ab553..eff683efb3 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -286,12 +286,7 @@ public: Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState source_state) override { if (in_block->rows() > 0) { - auto st = _sink->send(state, in_block, source_state == SourceState::FINISHED); - // TODO: improvement: if sink returned END_OF_FILE, pipeline task can be finished - if (st.template is()) { - return Status::OK(); - } - return st; + return _sink->send(state, in_block, source_state == SourceState::FINISHED); } return Status::OK(); } diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp b/be/src/vec/runtime/vdata_stream_mgr.cpp index eeee072371..e0e2229daf 100644 --- a/be/src/vec/runtime/vdata_stream_mgr.cpp +++ b/be/src/vec/runtime/vdata_stream_mgr.cpp @@ -102,13 +102,9 @@ Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, // As a consequence, find_recvr() may return an innocuous NULL if a thread // calling deregister_recvr() beat the thread calling find_recvr() // in acquiring _lock. - // - // e.g. for broadcast join build side, only one instance will build the hash table, - // all other instances don't need build side data and will close the data stream receiver. - // // TODO: Rethink the lifecycle of DataStreamRecvr to distinguish // errors from receiver-initiated teardowns. - return Status::EndOfFile("data stream receiver closed"); + return Status::OK(); } // request can only be used before calling recvr's add_batch or when request diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 5e3f6783ba..446716970e 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -135,11 +135,8 @@ Status Channel::send_local_block(bool eos) { if (eos) { _local_recvr->remove_sender(_parent->_sender_id, _be_number); } - return Status::OK(); - } else { - _mutable_block.reset(); - return receiver_status_; } + return Status::OK(); } Status Channel::send_local_block(Block* block) { @@ -149,10 +146,8 @@ Status Channel::send_local_block(Block* block) { COUNTER_UPDATE(_parent->_local_sent_rows, block->rows()); COUNTER_UPDATE(_parent->_blocks_sent_counter, 1); _local_recvr->add_block(block, _parent->_sender_id, false); - return Status::OK(); - } else { - return receiver_status_; } + return Status::OK(); } Status Channel::send_block(PBlock* block, bool eos) { @@ -241,9 +236,7 @@ Status Channel::add_rows(Block* block, const std::vector& rows) { Status Channel::close_wait(RuntimeState* state) { if (_need_close) { Status st = _wait_last_brpc(); - if (st.is()) { - st = Status::OK(); - } else if (!st.ok()) { + if (!st.ok()) { state->log_error(st.to_string()); } _need_close = false; @@ -259,33 +252,18 @@ Status Channel::close_internal() { } VLOG_RPC << "Channel::close() instance_id=" << _fragment_instance_id << " dest_node=" << _dest_node_id - << " #rows= " << ((_mutable_block == nullptr) ? 0 : _mutable_block->rows()) - << " receiver status: " << receiver_status_; - if (receiver_status_.is()) { - _mutable_block.reset(); - return Status::OK(); - } - Status status; + << " #rows= " << ((_mutable_block == nullptr) ? 0 : _mutable_block->rows()); if (_mutable_block != nullptr && _mutable_block->rows() > 0) { - status = send_current_block(true); + RETURN_IF_ERROR(send_current_block(true)); } else { SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); - status = send_block((PBlock*)nullptr, true); + RETURN_IF_ERROR(send_block((PBlock*)nullptr, true)); } // Don't wait for the last packet to finish, left it to close_wait. - if (status.is()) { - return Status::OK(); - } else { - return status; - } + return Status::OK(); } Status Channel::close(RuntimeState* state) { - if (_closed) { - return Status::OK(); - } - _closed = true; - Status st = close_internal(); if (!st.ok()) { state->log_error(st.to_string()); @@ -516,33 +494,15 @@ Status VDataStreamSender::open(RuntimeState* state) { return Status::OK(); } -template -void VDataStreamSender::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, - Status st) { - channel->set_receiver_eof(st); - channel->close(state); -} - Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { SCOPED_TIMER(_profile->total_time_counter()); - for (auto channel : _channels) { - if (!channel->is_receiver_eof()) { - break; - } - return Status::EndOfFile("all data stream channels EOF"); - } - if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) { // 1. serialize depends on it is not local exchange // 2. send block // 3. rollover block if (_only_local_exchange) { - Status status; for (auto channel : _channels) { - if (!channel->is_receiver_eof()) { - status = channel->send_local_block(block); - HANDLE_CHANNEL_STATUS(state, channel, status); - } + RETURN_IF_ERROR(channel->send_local_block(block)); } } else if (_enable_pipeline_exec) { BroadcastPBlockHolder* block_holder = nullptr; @@ -553,16 +513,12 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { serialize_block(block, block_holder->get_block(), _channels.size())); } - Status status; for (auto channel : _channels) { - if (!channel->is_receiver_eof()) { - if (channel->is_local()) { - status = channel->send_local_block(block); - } else { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = channel->send_block(block_holder, eos); - } - HANDLE_CHANNEL_STATUS(state, channel, status); + if (channel->is_local()) { + RETURN_IF_ERROR(channel->send_local_block(block)); + } else { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + RETURN_IF_ERROR(channel->send_block(block_holder, eos)); } } } else { @@ -571,16 +527,12 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { RETURN_IF_ERROR(serialize_block(block, _cur_pb_block, _channels.size())); } - Status status; for (auto channel : _channels) { - if (!channel->is_receiver_eof()) { - if (channel->is_local()) { - status = channel->send_local_block(block); - } else { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - status = channel->send_block(_cur_pb_block, eos); - } - HANDLE_CHANNEL_STATUS(state, channel, status); + if (channel->is_local()) { + RETURN_IF_ERROR(channel->send_local_block(block)); + } else { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + RETURN_IF_ERROR(channel->send_block(_cur_pb_block, eos)); } } // rollover @@ -589,18 +541,14 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { } else if (_part_type == TPartitionType::RANDOM) { // 1. select channel Channel* current_channel = _channels[_current_channel_idx]; - if (!current_channel->is_receiver_eof()) { - // 2. serialize, send and rollover block - if (current_channel->is_local()) { - auto status = current_channel->send_local_block(block); - HANDLE_CHANNEL_STATUS(state, current_channel, status); - } else { - SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); - RETURN_IF_ERROR(serialize_block(block, current_channel->ch_cur_pb_block())); - auto status = current_channel->send_block(current_channel->ch_cur_pb_block(), eos); - HANDLE_CHANNEL_STATUS(state, current_channel, status); - current_channel->ch_roll_pb_block(); - } + // 2. serialize, send and rollover block + if (current_channel->is_local()) { + RETURN_IF_ERROR(current_channel->send_local_block(block)); + } else { + SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); + RETURN_IF_ERROR(serialize_block(block, current_channel->ch_cur_pb_block())); + RETURN_IF_ERROR(current_channel->send_block(current_channel->ch_cur_pb_block(), eos)); + current_channel->ch_roll_pb_block(); } _current_channel_idx = (_current_channel_idx + 1) % _channels.size(); } else if (_part_type == TPartitionType::HASH_PARTITIONED || @@ -652,7 +600,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { Block::erase_useless_column(block, column_to_keep); } - RETURN_IF_ERROR(channel_add_rows(state, _channels, element_size, hashes, rows, block)); + RETURN_IF_ERROR(channel_add_rows(_channels, element_size, hashes, rows, block)); } else { for (int j = 0; j < result_size; ++j) { block->get_by_position(result[j]).column->update_crcs_with_value( @@ -667,8 +615,8 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); Block::erase_useless_column(block, column_to_keep); } - RETURN_IF_ERROR(channel_add_rows(state, _channel_shared_ptrs, element_size, hashes, - rows, block)); + RETURN_IF_ERROR( + channel_add_rows(_channel_shared_ptrs, element_size, hashes, rows, block)); } } else { // Range partition diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 3ce04915b7..c151a5fa46 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -28,7 +28,6 @@ #include #include -#include #include #include #include @@ -158,11 +157,8 @@ protected: } template - Status channel_add_rows(RuntimeState* state, Channels& channels, int num_channels, - const uint64_t* channel_ids, int rows, Block* block); - - template - void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st); + Status channel_add_rows(Channels& channels, int num_channels, const uint64_t* channel_ids, + int rows, Block* block); struct hash_128 { uint64_t high; @@ -253,7 +249,6 @@ public: _num_data_bytes_sent(0), _packet_seq(0), _need_close(false), - _closed(false), _brpc_dest_addr(brpc_dest), _is_transfer_chain(is_transfer_chain), _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) { @@ -329,18 +324,8 @@ public: _local_recvr->sender_queue_empty(_parent->_sender_id); } - bool is_receiver_eof() const { return receiver_status_.is(); } - - void set_receiver_eof(Status st) { receiver_status_ = st; } - protected: - bool _recvr_is_valid() { - if (_local_recvr && !_local_recvr->is_closed()) { - return true; - } - receiver_status_ = Status::EndOfFile("local data stream receiver closed"); - return false; - } + bool _recvr_is_valid() { return _local_recvr && !_local_recvr->is_closed(); } Status _wait_last_brpc() { SCOPED_TIMER(_parent->_brpc_wait_timer); @@ -350,7 +335,6 @@ protected: auto cntl = &_closure->cntl; auto call_id = _closure->cntl.call_id(); brpc::Join(call_id); - receiver_status_ = _closure->result.status(); if (cntl->Failed()) { std::string err = fmt::format( "failed to send brpc batch, error={}, error_text={}, client: {}, " @@ -360,7 +344,7 @@ protected: LOG(WARNING) << err; return Status::RpcError(err); } - return receiver_status_; + return Status::OK(); } // Serialize _batch into _thrift_batch and send via send_batch(). @@ -382,7 +366,6 @@ protected: std::unique_ptr _mutable_block; bool _need_close; - bool _closed; int _be_number; TNetworkAddress _brpc_dest_addr; @@ -393,7 +376,6 @@ protected: PTransmitDataParams _brpc_request; std::shared_ptr _brpc_stub = nullptr; RefCountClosure* _closure = nullptr; - Status receiver_status_; int32_t _brpc_timeout_ms = 500; // whether the dest can be treated as query statistics transfer chain. bool _is_transfer_chain; @@ -411,30 +393,19 @@ protected: PBlock _ch_pb_block2; }; -#define HANDLE_CHANNEL_STATUS(state, channel, status) \ - do { \ - if (status.is()) { \ - _handle_eof_channel(state, channel, status); \ - } else { \ - RETURN_IF_ERROR(status); \ - } \ - } while (0) - template -Status VDataStreamSender::channel_add_rows(RuntimeState* state, Channels& channels, - int num_channels, const uint64_t* __restrict channel_ids, - int rows, Block* block) { +Status VDataStreamSender::channel_add_rows(Channels& channels, int num_channels, + const uint64_t* __restrict channel_ids, int rows, + Block* block) { std::vector channel2rows[num_channels]; for (int i = 0; i < rows; i++) { channel2rows[channel_ids[i]].emplace_back(i); } - Status status; for (int i = 0; i < num_channels; ++i) { - if (!channels[i]->is_receiver_eof() && !channel2rows[i].empty()) { - status = channels[i]->add_rows(block, channel2rows[i]); - HANDLE_CHANNEL_STATUS(state, channels[i], status); + if (!channel2rows[i].empty()) { + RETURN_IF_ERROR(channels[i]->add_rows(block, channel2rows[i])); } } diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp index 92b396d189..a4b15f032a 100644 --- a/be/src/vec/sink/vresult_file_sink.cpp +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -178,10 +178,7 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) { state->fragment_instance_id()); } else { if (final_status.ok()) { - auto st = _stream_sender->send(state, _output_block.get(), true); - if (!st.template is()) { - RETURN_IF_ERROR(st); - } + RETURN_IF_ERROR(_stream_sender->send(state, _output_block.get(), true)); } RETURN_IF_ERROR(_stream_sender->close(state, final_status)); _output_block->clear();