[fix] fix memory leak in VDataStreamRecvr::SenderQueue (#8643)

After `VDataStreamRecvr::SenderQueue::close` clears `_block_queue`, calling 
`VDataStreamRecvr::SenderQueue::add_block` again will cause a memory leak.

So, change the lock position, like the other add_block and add_batch.
This commit is contained in:
Xinyi Zou
2022-03-28 10:19:22 +08:00
committed by GitHub
parent cdf0a016c3
commit ea45940ef0
3 changed files with 7 additions and 7 deletions

View File

@ -200,7 +200,7 @@ Status DataStreamRecvr::SenderQueue::get_batch(RowBatch** next_batch) {
void DataStreamRecvr::SenderQueue::add_batch(const PRowBatch& pb_batch, int be_number,
int64_t packet_seq,
::google::protobuf::Closure** done) {
unique_lock<mutex> l(_lock);
lock_guard<mutex> l(_lock);
if (_is_cancelled) {
return;
}
@ -269,7 +269,7 @@ void DataStreamRecvr::SenderQueue::add_batch(const PRowBatch& pb_batch, int be_n
}
void DataStreamRecvr::SenderQueue::add_batch(RowBatch* batch, bool use_move) {
unique_lock<mutex> l(_lock);
lock_guard<mutex> l(_lock);
if (_is_cancelled) {
return;
}

View File

@ -380,8 +380,8 @@ public:
// 3. Repeated releases of MemTacker. When the consume is called on the child MemTracker,
// after the release is called on the parent MemTracker,
// the child ~MemTracker will cause repeated releases.
static void memory_leak_check(MemTracker* tracker) {
tracker->flush_untracked_mem();
static void memory_leak_check(MemTracker* tracker, bool flush = true) {
if (flush) tracker->flush_untracked_mem();
DCHECK_EQ(tracker->_consumption->current_value(), 0) << std::endl << tracker->log_usage();
}

View File

@ -90,7 +90,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) {
void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_number,
int64_t packet_seq,
::google::protobuf::Closure** done) {
std::unique_lock<std::mutex> l(_lock);
std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
return;
}
@ -140,6 +140,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int be_numbe
}
void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
return;
}
@ -158,8 +159,6 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
}
materialize_block_inplace(*nblock);
std::unique_lock<std::mutex> l(_lock);
size_t block_size = nblock->bytes();
_block_queue.emplace_back(block_size, nblock);
_recvr->_block_mem_tracker->consume(nblock->bytes());
@ -286,6 +285,7 @@ VDataStreamRecvr::VDataStreamRecvr(
VDataStreamRecvr::~VDataStreamRecvr() {
DCHECK(_mgr == nullptr) << "Must call close()";
MemTracker::memory_leak_check(_block_mem_tracker.get(), false);
}
Status VDataStreamRecvr::create_merger(const std::vector<VExprContext*>& ordering_expr,