[BUG][Timeout][QueryLeak] Fixed memory not released in time (#6221)

* Revert "[Optimize] Put _Tuple_ptrs into mempool when RowBatch is initialized (#6036)"

This reverts commit f254870aeb18752a786586ef5d7ccf952b97f895.

* [BUG][Timeout][QueryLeak] Fixed memory not released in time, Fix Core dump in bloomfilter
This commit is contained in:
stdpain
2021-07-16 12:32:10 +08:00
committed by GitHub
parent c2695e9716
commit bf5db6eefe
4 changed files with 68 additions and 12 deletions

View File

@ -56,7 +56,13 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_
_tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*);
DCHECK_GT(_tuple_ptrs_size, 0);
// TODO: switch to Init() pattern so we can check memory limit and return Status.
_tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
if (config::enable_partitioned_aggregation) {
_mem_tracker->Consume(_tuple_ptrs_size);
_tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
DCHECK(_tuple_ptrs != NULL);
} else {
_tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
}
}
// TODO: we want our input_batch's tuple_data to come from our (not yet implemented)
@ -82,7 +88,13 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
_tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*);
DCHECK_GT(_tuple_ptrs_size, 0);
// TODO: switch to Init() pattern so we can check memory limit and return Status.
_tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
if (config::enable_partitioned_aggregation) {
_mem_tracker->Consume(_tuple_ptrs_size);
_tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
DCHECK(_tuple_ptrs != nullptr);
} else {
_tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
}
uint8_t* tuple_data = nullptr;
if (input_batch.is_compressed()) {
@ -210,7 +222,13 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
_tuple_ptrs_size = _num_rows * input_batch.row_tuples.size() * sizeof(Tuple*);
DCHECK_GT(_tuple_ptrs_size, 0);
// TODO: switch to Init() pattern so we can check memory limit and return Status.
_tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
if (config::enable_partitioned_aggregation) {
_mem_tracker->Consume(_tuple_ptrs_size);
_tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
DCHECK(_tuple_ptrs != NULL);
} else {
_tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
}
uint8_t* tuple_data = NULL;
if (input_batch.is_compressed) {
@ -338,6 +356,12 @@ void RowBatch::clear() {
for (int i = 0; i < _blocks.size(); ++i) {
_blocks[i]->del();
}
if (config::enable_partitioned_aggregation) {
DCHECK(_tuple_ptrs != NULL);
free(_tuple_ptrs);
_mem_tracker->Release(_tuple_ptrs_size);
_tuple_ptrs = NULL;
}
_cleared = true;
}
@ -521,6 +545,8 @@ void RowBatch::reset() {
_capacity = _tuple_ptrs_size / (_num_tuples_per_row * sizeof(Tuple*));
_has_in_flight_row = false;
// TODO: Change this to Clear() and investigate the repercussions.
_tuple_data_pool->free_all();
_agg_object_pool.reset(new ObjectPool());
for (int i = 0; i < _io_buffers.size(); ++i) {
_io_buffers[i]->return_buffer();
@ -538,6 +564,9 @@ void RowBatch::reset() {
}
_blocks.clear();
_auxiliary_mem_usage = 0;
if (!config::enable_partitioned_aggregation) {
_tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
}
_need_to_return = false;
_flush = FlushMode::NO_FLUSH_RESOURCES;
_needs_deep_copy = false;
@ -635,7 +664,14 @@ void RowBatch::acquire_state(RowBatch* src) {
_num_rows = src->_num_rows;
_capacity = src->_capacity;
_need_to_return = src->_need_to_return;
std::swap(_tuple_ptrs, src->_tuple_ptrs);
if (!config::enable_partitioned_aggregation) {
// Tuple pointers are allocated from tuple_data_pool_ so are transferred.
_tuple_ptrs = src->_tuple_ptrs;
src->_tuple_ptrs = NULL;
} else {
// tuple_ptrs_ were allocated with malloc so can be swapped between batches.
std::swap(_tuple_ptrs, src->_tuple_ptrs);
}
src->transfer_resource_ownership(this);
}