diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index 7fe66524c3..b825390471 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -200,7 +200,7 @@ Status DataStreamRecvr::SenderQueue::get_batch(RowBatch** next_batch) { void DataStreamRecvr::SenderQueue::add_batch(const PRowBatch& pb_batch, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done) { - unique_lock l(_lock); + lock_guard l(_lock); if (_is_cancelled) { return; } @@ -269,7 +269,7 @@ void DataStreamRecvr::SenderQueue::add_batch(const PRowBatch& pb_batch, int be_n } void DataStreamRecvr::SenderQueue::add_batch(RowBatch* batch, bool use_move) { - unique_lock l(_lock); + lock_guard l(_lock); if (_is_cancelled) { return; } diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h index 23bd239622..89a1d2e0c7 100644 --- a/be/src/runtime/mem_tracker.h +++ b/be/src/runtime/mem_tracker.h @@ -380,8 +380,8 @@ public: // 3. Repeated releases of MemTacker. When the consume is called on the child MemTracker, // after the release is called on the parent MemTracker, // the child ~MemTracker will cause repeated releases. - static void memory_leak_check(MemTracker* tracker) { - tracker->flush_untracked_mem(); + static void memory_leak_check(MemTracker* tracker, bool flush = true) { + if (flush) tracker->flush_untracked_mem(); DCHECK_EQ(tracker->_consumption->current_value(), 0) << std::endl << tracker->log_usage(); } diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 3c019ffc04..eebe11588d 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -90,7 +90,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) { void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number, int64_t packet_seq, ::google::protobuf::Closure** done) { - std::unique_lock l(_lock); + std::lock_guard l(_lock); if (_is_cancelled) { return; } @@ -140,6 +140,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe } void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { + std::lock_guard l(_lock); if (_is_cancelled) { return; } @@ -158,8 +159,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) { } materialize_block_inplace(*nblock); - - std::unique_lock l(_lock); size_t block_size = nblock->bytes(); _block_queue.emplace_back(block_size, nblock); _recvr->_block_mem_tracker->consume(nblock->bytes()); @@ -286,6 +285,7 @@ VDataStreamRecvr::VDataStreamRecvr( VDataStreamRecvr::~VDataStreamRecvr() { DCHECK(_mgr == nullptr) << "Must call close()"; + MemTracker::memory_leak_check(_block_mem_tracker.get(), false); } Status VDataStreamRecvr::create_merger(const std::vector& ordering_expr,