From 784c27deeb49e91f6ee830f12822c11d403d5d2b Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 14 Feb 2023 16:40:13 +0800 Subject: [PATCH] [Bug](shuffle) fix mem leak in data stream sender (#16685) --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 2 +- be/src/pipeline/exec/exchange_sink_buffer.h | 2 +- be/src/vec/sink/vdata_stream_sender.h | 27 ++++++++++++++++--- 3 files changed, 25 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 36f5aab77c..aa3948e0a1 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -240,7 +240,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { if (!q.empty()) { // If we have data to shuffle which is not broadcasted - DO_RPC(q, block, nullptr) + DO_RPC(q, block.get(), nullptr) } else if (!broadcast_q.empty()) { // If we have data to shuffle which is broadcasted DO_RPC(broadcast_q, block_holder->get_block(), request.block_holder) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 7be17706ce..783457c165 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -38,7 +38,7 @@ namespace pipeline { using InstanceLoId = int64_t; struct TransmitInfo { vectorized::PipChannel* channel; - PBlock* block; + std::unique_ptr block; bool eos; }; diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h index 19858c259d..9f25349bd1 100644 --- a/be/src/vec/sink/vdata_stream_sender.h +++ b/be/src/vec/sink/vdata_stream_sender.h @@ -293,7 +293,7 @@ public: bool is_local() const { return _is_local; } - void ch_roll_pb_block(); + virtual void ch_roll_pb_block(); bool can_write() { if (!is_local()) { @@ -392,14 +392,32 @@ Status VDataStreamSender::channel_add_rows(Channels& channels, int num_channels, return Status::OK(); } -class PipChannel : public Channel { +class PipChannel final : public Channel { public: PipChannel(VDataStreamSender* parent, const RowDescriptor& row_desc, const TNetworkAddress& brpc_dest, const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int buffer_size, bool is_transfer_chain, bool send_query_statistics_with_every_batch) : Channel(parent, row_desc, brpc_dest, fragment_instance_id, dest_node_id, buffer_size, - is_transfer_chain, send_query_statistics_with_every_batch) {} + is_transfer_chain, send_query_statistics_with_every_batch) { + ch_roll_pb_block(); + } + + ~PipChannel() override { + if (_ch_cur_pb_block) { + delete _ch_cur_pb_block; + } + } + + void ch_roll_pb_block() override { + // We have two choices here. + // 1. Use a PBlock pool and fetch an available PBlock if we need one. In this way, we can + // reuse the memory, but we have to use a lock to synchronize. + // 2. Create a new PBlock every time. In this way we don't need a lock but have to allocate + // new memory. + // Now we use the second way. + _ch_cur_pb_block = new PBlock(); + } // Asynchronously sends a block // Returns the status of the most recently finished transmit_data @@ -414,7 +432,8 @@ public: } } if (eos || block->column_metas_size()) { - RETURN_IF_ERROR(_buffer->add_block({this, block, eos})); + RETURN_IF_ERROR(_buffer->add_block( + {this, block ? std::make_unique(*block) : nullptr, eos})); } return Status::OK(); }