diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index b5d38748c1..4652df4854 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -50,8 +50,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_ _row_desc(row_desc), _auxiliary_mem_usage(0), _need_to_return(false), - _tuple_data_pool(new MemPool(_mem_tracker)), - _agg_object_pool(new ObjectPool()) { + _tuple_data_pool(_mem_tracker) { DCHECK(_mem_tracker != nullptr); DCHECK_GT(capacity, 0); _tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*); @@ -62,7 +61,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_ _tuple_ptrs = reinterpret_cast(malloc(_tuple_ptrs_size)); DCHECK(_tuple_ptrs != nullptr); } else { - _tuple_ptrs = reinterpret_cast(_tuple_data_pool->allocate(_tuple_ptrs_size)); + _tuple_ptrs = reinterpret_cast(_tuple_data_pool.allocate(_tuple_ptrs_size)); } } @@ -83,8 +82,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch, _row_desc(row_desc), _auxiliary_mem_usage(0), _need_to_return(false), - _tuple_data_pool(new MemPool(_mem_tracker)), - _agg_object_pool(new ObjectPool()) { + _tuple_data_pool(_mem_tracker) { DCHECK(_mem_tracker != nullptr); _tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*); DCHECK_GT(_tuple_ptrs_size, 0); @@ -94,7 +92,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch, _tuple_ptrs = reinterpret_cast(malloc(_tuple_ptrs_size)); DCHECK(_tuple_ptrs != nullptr); } else { - _tuple_ptrs = reinterpret_cast(_tuple_data_pool->allocate(_tuple_ptrs_size)); + _tuple_ptrs = reinterpret_cast(_tuple_data_pool.allocate(_tuple_ptrs_size)); } uint8_t* tuple_data = nullptr; @@ -106,13 +104,13 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch, bool success = snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size); DCHECK(success) << "snappy::GetUncompressedLength failed"; - tuple_data = reinterpret_cast(_tuple_data_pool->allocate(uncompressed_size)); + tuple_data = reinterpret_cast(_tuple_data_pool.allocate(uncompressed_size)); success = snappy::RawUncompress(compressed_data, compressed_size, reinterpret_cast(tuple_data)); DCHECK(success) << "snappy::RawUncompress failed"; } else { // Tuple data uncompressed, copy directly into data pool - tuple_data = _tuple_data_pool->allocate(input_batch.tuple_data().size()); + tuple_data = _tuple_data_pool.allocate(input_batch.tuple_data().size()); memcpy(tuple_data, input_batch.tuple_data().c_str(), input_batch.tuple_data().size()); } @@ -217,8 +215,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, _row_desc(row_desc), _auxiliary_mem_usage(0), _need_to_return(false), - _tuple_data_pool(new MemPool(_mem_tracker)), - _agg_object_pool(new ObjectPool()) { + _tuple_data_pool(_mem_tracker) { DCHECK(_mem_tracker != nullptr); _tuple_ptrs_size = _num_rows * input_batch.row_tuples.size() * sizeof(Tuple*); DCHECK_GT(_tuple_ptrs_size, 0); @@ -228,7 +225,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, _tuple_ptrs = reinterpret_cast(malloc(_tuple_ptrs_size)); DCHECK(_tuple_ptrs != nullptr); } else { - _tuple_ptrs = reinterpret_cast(_tuple_data_pool->allocate(_tuple_ptrs_size)); + _tuple_ptrs = reinterpret_cast(_tuple_data_pool.allocate(_tuple_ptrs_size)); } uint8_t* tuple_data = nullptr; @@ -240,13 +237,13 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, bool success = snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size); DCHECK(success) << "snappy::GetUncompressedLength failed"; - tuple_data = reinterpret_cast(_tuple_data_pool->allocate(uncompressed_size)); + tuple_data = reinterpret_cast(_tuple_data_pool.allocate(uncompressed_size)); success = snappy::RawUncompress(compressed_data, compressed_size, reinterpret_cast(tuple_data)); DCHECK(success) << "snappy::RawUncompress failed"; } else { // Tuple data uncompressed, copy directly into data pool - tuple_data = _tuple_data_pool->allocate(input_batch.tuple_data.size()); + tuple_data = _tuple_data_pool.allocate(input_batch.tuple_data.size()); memcpy(tuple_data, input_batch.tuple_data.c_str(), input_batch.tuple_data.size()); } @@ -257,8 +254,6 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, if (*offset == -1) { _tuple_ptrs[tuple_idx++] = nullptr; } else { - // _tuple_ptrs[tuple_idx++] = - // reinterpret_cast(_tuple_data_pool->get_data_ptr(*offset)); _tuple_ptrs[tuple_idx++] = reinterpret_cast(tuple_data + *offset); } } @@ -343,8 +338,8 @@ void RowBatch::clear() { return; } - _tuple_data_pool->free_all(); - _agg_object_pool.reset(new ObjectPool()); + _tuple_data_pool.free_all(); + _agg_object_pool.clear(); for (int i = 0; i < _io_buffers.size(); ++i) { _io_buffers[i]->return_buffer(); } @@ -518,7 +513,7 @@ Status RowBatch::resize_and_allocate_tuple_buffer(RuntimeState* state, int64_t* } *tuple_buffer_size = static_cast(row_size) * _capacity; // TODO(dhc): change allocate to try_allocate? - *buffer = _tuple_data_pool->allocate(*tuple_buffer_size); + *buffer = _tuple_data_pool.allocate(*tuple_buffer_size); if (*buffer == nullptr) { std::stringstream ss; ss << "Failed to allocate tuple buffer" << *tuple_buffer_size; @@ -541,14 +536,13 @@ void RowBatch::add_block(BufferedBlockMgr2::Block* block) { } void RowBatch::reset() { - DCHECK(_tuple_data_pool.get() != nullptr); _num_rows = 0; _capacity = _tuple_ptrs_size / (_num_tuples_per_row * sizeof(Tuple*)); _has_in_flight_row = false; // TODO: Change this to Clear() and investigate the repercussions. - _tuple_data_pool->free_all(); - _agg_object_pool.reset(new ObjectPool()); + _tuple_data_pool.free_all(); + _agg_object_pool.clear(); for (int i = 0; i < _io_buffers.size(); ++i) { _io_buffers[i]->return_buffer(); } @@ -566,7 +560,7 @@ void RowBatch::reset() { _blocks.clear(); _auxiliary_mem_usage = 0; if (!config::enable_partitioned_aggregation) { - _tuple_ptrs = reinterpret_cast(_tuple_data_pool->allocate(_tuple_ptrs_size)); + _tuple_ptrs = reinterpret_cast(_tuple_data_pool.allocate(_tuple_ptrs_size)); } _need_to_return = false; _flush = FlushMode::NO_FLUSH_RESOURCES; @@ -582,9 +576,9 @@ void RowBatch::close_tuple_streams() { } void RowBatch::transfer_resource_ownership(RowBatch* dest) { - dest->_auxiliary_mem_usage += _tuple_data_pool->total_allocated_bytes(); - dest->_tuple_data_pool->acquire_data(_tuple_data_pool.get(), false); - dest->_agg_object_pool->acquire_data(_agg_object_pool.get()); + dest->_auxiliary_mem_usage += _tuple_data_pool.total_allocated_bytes(); + dest->_tuple_data_pool.acquire_data(&_tuple_data_pool, false); + dest->_agg_object_pool.acquire_data(&_agg_object_pool); for (int i = 0; i < _io_buffers.size(); ++i) { DiskIoMgr::BufferDescriptor* buffer = _io_buffers[i]; dest->_io_buffers.push_back(buffer); @@ -684,7 +678,7 @@ void RowBatch::deep_copy_to(RowBatch* dst) { for (int i = 0; i < _num_rows; ++i) { TupleRow* src_row = get_row(i); TupleRow* dst_row = reinterpret_cast(dst->_tuple_ptrs + i * _num_tuples_per_row); - src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), dst->_tuple_data_pool.get(), + src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), &dst->_tuple_data_pool, false); } dst->commit_rows(_num_rows); @@ -751,7 +745,7 @@ size_t RowBatch::total_byte_size() { return result; } -int RowBatch::max_tuple_buffer_size() { +int RowBatch::max_tuple_buffer_size() const { int row_size = _row_desc.get_row_size(); if (row_size > AT_CAPACITY_MEM_USAGE) { return row_size; diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h index 1474f79056..637c4b9f57 100644 --- a/be/src/runtime/row_batch.h +++ b/be/src/runtime/row_batch.h @@ -143,7 +143,7 @@ public: // Returns true if the row batch has filled all the rows or has accumulated // enough memory. - bool at_capacity() { + bool at_capacity() const { return _num_rows == _capacity || _auxiliary_mem_usage >= AT_CAPACITY_MEM_USAGE || num_tuple_streams() > 0 || _need_to_return; } @@ -152,13 +152,13 @@ public: // enough memory. tuple_pool is an intermediate memory pool containing tuple data // that will eventually be attached to this row batch. We need to make sure // the tuple pool does not accumulate excessive memory. - bool at_capacity(MemPool* tuple_pool) { + bool at_capacity(const MemPool* tuple_pool) const { DCHECK(tuple_pool != nullptr); return at_capacity() || tuple_pool->total_allocated_bytes() > AT_CAPACITY_MEM_USAGE; } // Returns true if row_batch has reached capacity. - bool is_full() { return _num_rows == _capacity; } + bool is_full() const { return _num_rows == _capacity; } // Returns true if the row batch has accumulated enough external memory (in MemPools // and io buffers). This would be a trigger to compact the row batch or reclaim @@ -234,9 +234,9 @@ public: }; int num_tuples_per_row() const { return _num_tuples_per_row; } - int row_byte_size() { return _num_tuples_per_row * sizeof(Tuple*); } - MemPool* tuple_data_pool() { return _tuple_data_pool.get(); } - ObjectPool* agg_object_pool() { return _agg_object_pool.get(); } + int row_byte_size() const { return _num_tuples_per_row * sizeof(Tuple*); } + MemPool* tuple_data_pool() { return &_tuple_data_pool; } + ObjectPool* agg_object_pool() { return &_agg_object_pool; } int num_io_buffers() const { return _io_buffers.size(); } int num_tuple_streams() const { return _tuple_streams.size(); } @@ -271,7 +271,7 @@ public: // tree. void mark_need_to_return() { _need_to_return = true; } - bool need_to_return() { return _need_to_return; } + bool need_to_return() const { return _need_to_return; } /// Used by an operator to indicate that it cannot produce more rows until the /// resources that it has attached to the row batch are freed or acquired by an @@ -302,7 +302,7 @@ public: _needs_deep_copy = true; } - bool needs_deep_copy() { return _needs_deep_copy; } + bool needs_deep_copy() const { return _needs_deep_copy; } // Transfer ownership of resources to dest. This includes tuple data in mem // pool and io buffers. @@ -383,10 +383,10 @@ public: uint8_t** buffer); void set_scanner_id(int id) { _scanner_id = id; } - int scanner_id() { return _scanner_id; } + int scanner_id() const { return _scanner_id; } // Computes the maximum size needed to store tuple data for this row batch. - int max_tuple_buffer_size(); + int max_tuple_buffer_size() const; static const int MAX_MEM_POOL_SIZE = 32 * 1024 * 1024; std::string to_string(); @@ -444,10 +444,10 @@ private: bool _need_to_return; // holding (some of the) data referenced by rows - std::unique_ptr _tuple_data_pool; + MemPool _tuple_data_pool; // holding some complex agg object data (bitmap, hll) - std::unique_ptr _agg_object_pool; + ObjectPool _agg_object_pool; // IO buffers current owned by this row batch. Ownership of IO buffers transfer // between row batches. Any IO buffer will be owned by at most one row batch