diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index 9f7c34a6c3..1f0e0059e4 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -95,6 +95,8 @@ Status Channel::init(RuntimeState* state) { _fragment_instance_id, _dest_node_id); } + _serializer.set_is_local(_is_local); + // In bucket shuffle join will set fragment_instance_id (-1, -1) // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0" // so the empty channel not need call function close_internal() @@ -526,7 +528,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { for (auto channel : _channels) { if (!channel->is_receiver_eof()) { if (channel->is_local()) { - status = channel->send_local_block(block); + status = channel->send_local_block(&cur_block); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_block(block_holder, eos); @@ -544,11 +546,15 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { RETURN_IF_ERROR(_serializer.next_serialized_block( block, _cur_pb_block, _channels.size(), &serialized, false)); if (serialized) { + auto cur_block = _serializer.get_block()->to_block(); + if (!cur_block.empty()) { + _serializer.serialize_block(&cur_block, _cur_pb_block, _channels.size()); + } Status status; for (auto channel : _channels) { if (!channel->is_receiver_eof()) { if (channel->is_local()) { - status = channel->send_local_block(block); + status = channel->send_local_block(&cur_block); } else { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); status = channel->send_block(_cur_pb_block, false); @@ -556,6 +562,8 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) { HANDLE_CHANNEL_STATUS(state, channel, status); } } + cur_block.clear_column_data(); + _serializer.get_block()->set_muatable_columns(cur_block.mutate_columns()); _roll_pb_block(); } } @@ -757,7 +765,7 @@ Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) { return Status::OK(); } -Status BlockSerializer::serialize_block(Block* src, PBlock* dest, int num_receivers) { +Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, int num_receivers) { { SCOPED_TIMER(_parent->_serialize_batch_timer); dest->Clear(); diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 503a73798e..474e1fba45 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -75,12 +75,14 @@ public: Status next_serialized_block(Block* src, PBlock* dest, int num_receivers, bool* serialized, bool eos, const std::vector* rows = nullptr); Status serialize_block(PBlock* dest, int num_receivers = 1); - Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1); + Status serialize_block(const Block* src, PBlock* dest, int num_receivers = 1); MutableBlock* get_block() const { return _mutable_block.get(); } void reset_block() { _mutable_block.reset(); } + void set_is_local(bool is_local) { _is_local = is_local; } + private: VDataStreamSender* _parent; std::unique_ptr _mutable_block; @@ -501,10 +503,6 @@ public: return send_local_block(eos); } SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get()); - if (eos) { - _pblock = std::make_unique(); - RETURN_IF_ERROR(_serializer.serialize_block(_pblock.get(), 1)); - } RETURN_IF_ERROR(send_block(_pblock.release(), eos)); return Status::OK(); }