[refactor](rowbatch) make RowBatch better (#7286)

1. add const keyword for RowBatch's read-only member functions
2. should use member object rather than member object pointer as possible as you can
This commit is contained in:
thinker
2021-12-06 10:31:43 +08:00
committed by GitHub
parent e080afa186
commit f9be31d4bc
2 changed files with 33 additions and 39 deletions

View File

@ -50,8 +50,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_
_row_desc(row_desc),
_auxiliary_mem_usage(0),
_need_to_return(false),
_tuple_data_pool(new MemPool(_mem_tracker)),
_agg_object_pool(new ObjectPool()) {
_tuple_data_pool(_mem_tracker) {
DCHECK(_mem_tracker != nullptr);
DCHECK_GT(capacity, 0);
_tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*);
@ -62,7 +61,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_
_tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
DCHECK(_tuple_ptrs != nullptr);
} else {
_tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
_tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool.allocate(_tuple_ptrs_size));
}
}
@ -83,8 +82,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
_row_desc(row_desc),
_auxiliary_mem_usage(0),
_need_to_return(false),
_tuple_data_pool(new MemPool(_mem_tracker)),
_agg_object_pool(new ObjectPool()) {
_tuple_data_pool(_mem_tracker) {
DCHECK(_mem_tracker != nullptr);
_tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*);
DCHECK_GT(_tuple_ptrs_size, 0);
@ -94,7 +92,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
_tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
DCHECK(_tuple_ptrs != nullptr);
} else {
_tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
_tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool.allocate(_tuple_ptrs_size));
}
uint8_t* tuple_data = nullptr;
@ -106,13 +104,13 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
bool success =
snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size);
DCHECK(success) << "snappy::GetUncompressedLength failed";
tuple_data = reinterpret_cast<uint8_t*>(_tuple_data_pool->allocate(uncompressed_size));
tuple_data = reinterpret_cast<uint8_t*>(_tuple_data_pool.allocate(uncompressed_size));
success = snappy::RawUncompress(compressed_data, compressed_size,
reinterpret_cast<char*>(tuple_data));
DCHECK(success) << "snappy::RawUncompress failed";
} else {
// Tuple data uncompressed, copy directly into data pool
tuple_data = _tuple_data_pool->allocate(input_batch.tuple_data().size());
tuple_data = _tuple_data_pool.allocate(input_batch.tuple_data().size());
memcpy(tuple_data, input_batch.tuple_data().c_str(), input_batch.tuple_data().size());
}
@ -217,8 +215,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
_row_desc(row_desc),
_auxiliary_mem_usage(0),
_need_to_return(false),
_tuple_data_pool(new MemPool(_mem_tracker)),
_agg_object_pool(new ObjectPool()) {
_tuple_data_pool(_mem_tracker) {
DCHECK(_mem_tracker != nullptr);
_tuple_ptrs_size = _num_rows * input_batch.row_tuples.size() * sizeof(Tuple*);
DCHECK_GT(_tuple_ptrs_size, 0);
@ -228,7 +225,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
_tuple_ptrs = reinterpret_cast<Tuple**>(malloc(_tuple_ptrs_size));
DCHECK(_tuple_ptrs != nullptr);
} else {
_tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
_tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool.allocate(_tuple_ptrs_size));
}
uint8_t* tuple_data = nullptr;
@ -240,13 +237,13 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
bool success =
snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size);
DCHECK(success) << "snappy::GetUncompressedLength failed";
tuple_data = reinterpret_cast<uint8_t*>(_tuple_data_pool->allocate(uncompressed_size));
tuple_data = reinterpret_cast<uint8_t*>(_tuple_data_pool.allocate(uncompressed_size));
success = snappy::RawUncompress(compressed_data, compressed_size,
reinterpret_cast<char*>(tuple_data));
DCHECK(success) << "snappy::RawUncompress failed";
} else {
// Tuple data uncompressed, copy directly into data pool
tuple_data = _tuple_data_pool->allocate(input_batch.tuple_data.size());
tuple_data = _tuple_data_pool.allocate(input_batch.tuple_data.size());
memcpy(tuple_data, input_batch.tuple_data.c_str(), input_batch.tuple_data.size());
}
@ -257,8 +254,6 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
if (*offset == -1) {
_tuple_ptrs[tuple_idx++] = nullptr;
} else {
// _tuple_ptrs[tuple_idx++] =
// reinterpret_cast<Tuple*>(_tuple_data_pool->get_data_ptr(*offset));
_tuple_ptrs[tuple_idx++] = reinterpret_cast<Tuple*>(tuple_data + *offset);
}
}
@ -343,8 +338,8 @@ void RowBatch::clear() {
return;
}
_tuple_data_pool->free_all();
_agg_object_pool.reset(new ObjectPool());
_tuple_data_pool.free_all();
_agg_object_pool.clear();
for (int i = 0; i < _io_buffers.size(); ++i) {
_io_buffers[i]->return_buffer();
}
@ -518,7 +513,7 @@ Status RowBatch::resize_and_allocate_tuple_buffer(RuntimeState* state, int64_t*
}
*tuple_buffer_size = static_cast<int64_t>(row_size) * _capacity;
// TODO(dhc): change allocate to try_allocate?
*buffer = _tuple_data_pool->allocate(*tuple_buffer_size);
*buffer = _tuple_data_pool.allocate(*tuple_buffer_size);
if (*buffer == nullptr) {
std::stringstream ss;
ss << "Failed to allocate tuple buffer" << *tuple_buffer_size;
@ -541,14 +536,13 @@ void RowBatch::add_block(BufferedBlockMgr2::Block* block) {
}
void RowBatch::reset() {
DCHECK(_tuple_data_pool.get() != nullptr);
_num_rows = 0;
_capacity = _tuple_ptrs_size / (_num_tuples_per_row * sizeof(Tuple*));
_has_in_flight_row = false;
// TODO: Change this to Clear() and investigate the repercussions.
_tuple_data_pool->free_all();
_agg_object_pool.reset(new ObjectPool());
_tuple_data_pool.free_all();
_agg_object_pool.clear();
for (int i = 0; i < _io_buffers.size(); ++i) {
_io_buffers[i]->return_buffer();
}
@ -566,7 +560,7 @@ void RowBatch::reset() {
_blocks.clear();
_auxiliary_mem_usage = 0;
if (!config::enable_partitioned_aggregation) {
_tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool->allocate(_tuple_ptrs_size));
_tuple_ptrs = reinterpret_cast<Tuple**>(_tuple_data_pool.allocate(_tuple_ptrs_size));
}
_need_to_return = false;
_flush = FlushMode::NO_FLUSH_RESOURCES;
@ -582,9 +576,9 @@ void RowBatch::close_tuple_streams() {
}
void RowBatch::transfer_resource_ownership(RowBatch* dest) {
dest->_auxiliary_mem_usage += _tuple_data_pool->total_allocated_bytes();
dest->_tuple_data_pool->acquire_data(_tuple_data_pool.get(), false);
dest->_agg_object_pool->acquire_data(_agg_object_pool.get());
dest->_auxiliary_mem_usage += _tuple_data_pool.total_allocated_bytes();
dest->_tuple_data_pool.acquire_data(&_tuple_data_pool, false);
dest->_agg_object_pool.acquire_data(&_agg_object_pool);
for (int i = 0; i < _io_buffers.size(); ++i) {
DiskIoMgr::BufferDescriptor* buffer = _io_buffers[i];
dest->_io_buffers.push_back(buffer);
@ -684,7 +678,7 @@ void RowBatch::deep_copy_to(RowBatch* dst) {
for (int i = 0; i < _num_rows; ++i) {
TupleRow* src_row = get_row(i);
TupleRow* dst_row = reinterpret_cast<TupleRow*>(dst->_tuple_ptrs + i * _num_tuples_per_row);
src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), dst->_tuple_data_pool.get(),
src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), &dst->_tuple_data_pool,
false);
}
dst->commit_rows(_num_rows);
@ -751,7 +745,7 @@ size_t RowBatch::total_byte_size() {
return result;
}
int RowBatch::max_tuple_buffer_size() {
int RowBatch::max_tuple_buffer_size() const {
int row_size = _row_desc.get_row_size();
if (row_size > AT_CAPACITY_MEM_USAGE) {
return row_size;

View File

@ -143,7 +143,7 @@ public:
// Returns true if the row batch has filled all the rows or has accumulated
// enough memory.
bool at_capacity() {
bool at_capacity() const {
return _num_rows == _capacity || _auxiliary_mem_usage >= AT_CAPACITY_MEM_USAGE ||
num_tuple_streams() > 0 || _need_to_return;
}
@ -152,13 +152,13 @@ public:
// enough memory. tuple_pool is an intermediate memory pool containing tuple data
// that will eventually be attached to this row batch. We need to make sure
// the tuple pool does not accumulate excessive memory.
bool at_capacity(MemPool* tuple_pool) {
bool at_capacity(const MemPool* tuple_pool) const {
DCHECK(tuple_pool != nullptr);
return at_capacity() || tuple_pool->total_allocated_bytes() > AT_CAPACITY_MEM_USAGE;
}
// Returns true if row_batch has reached capacity.
bool is_full() { return _num_rows == _capacity; }
bool is_full() const { return _num_rows == _capacity; }
// Returns true if the row batch has accumulated enough external memory (in MemPools
// and io buffers). This would be a trigger to compact the row batch or reclaim
@ -234,9 +234,9 @@ public:
};
int num_tuples_per_row() const { return _num_tuples_per_row; }
int row_byte_size() { return _num_tuples_per_row * sizeof(Tuple*); }
MemPool* tuple_data_pool() { return _tuple_data_pool.get(); }
ObjectPool* agg_object_pool() { return _agg_object_pool.get(); }
int row_byte_size() const { return _num_tuples_per_row * sizeof(Tuple*); }
MemPool* tuple_data_pool() { return &_tuple_data_pool; }
ObjectPool* agg_object_pool() { return &_agg_object_pool; }
int num_io_buffers() const { return _io_buffers.size(); }
int num_tuple_streams() const { return _tuple_streams.size(); }
@ -271,7 +271,7 @@ public:
// tree.
void mark_need_to_return() { _need_to_return = true; }
bool need_to_return() { return _need_to_return; }
bool need_to_return() const { return _need_to_return; }
/// Used by an operator to indicate that it cannot produce more rows until the
/// resources that it has attached to the row batch are freed or acquired by an
@ -302,7 +302,7 @@ public:
_needs_deep_copy = true;
}
bool needs_deep_copy() { return _needs_deep_copy; }
bool needs_deep_copy() const { return _needs_deep_copy; }
// Transfer ownership of resources to dest. This includes tuple data in mem
// pool and io buffers.
@ -383,10 +383,10 @@ public:
uint8_t** buffer);
void set_scanner_id(int id) { _scanner_id = id; }
int scanner_id() { return _scanner_id; }
int scanner_id() const { return _scanner_id; }
// Computes the maximum size needed to store tuple data for this row batch.
int max_tuple_buffer_size();
int max_tuple_buffer_size() const;
static const int MAX_MEM_POOL_SIZE = 32 * 1024 * 1024;
std::string to_string();
@ -444,10 +444,10 @@ private:
bool _need_to_return;
// holding (some of the) data referenced by rows
std::unique_ptr<MemPool> _tuple_data_pool;
MemPool _tuple_data_pool;
// holding some complex agg object data (bitmap, hll)
std::unique_ptr<ObjectPool> _agg_object_pool;
ObjectPool _agg_object_pool;
// IO buffers current owned by this row batch. Ownership of IO buffers transfer
// between row batches. Any IO buffer will be owned by at most one row batch