When the length of `Tuple/Block data` is greater than 2G, serialize the protoBuf request and embed the `Tuple/Block data` into the controller attachment and transmit it through http brpc. This is to avoid errors when the length of the protoBuf request exceeds 2G: `Bad request, error_text=[E1003]Fail to compress request`. In #7164, `Tuple/Block data` was put into attachment and sent via default `baidu_std brpc`, but when the attachment exceeds 2G, it will be truncated. There is no 2G limit for sending via `http brpc`. Also, in #7921, consider putting `Tuple/Block data` into attachment transport by default, as this theoretically reduces one serialization and improves performance. However, the test found that the performance did not improve, but the memory peak increased due to the addition of a memory copy.
591 lines
23 KiB
C++
591 lines
23 KiB
C++
// Licensed to the Apache Software Foundation (ASF) under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing,
|
|
// software distributed under the License is distributed on an
|
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
// KIND, either express or implied. See the License for the
|
|
// specific language governing permissions and limitations
|
|
// under the License.
|
|
// This file is copied from
|
|
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/row-batch.cc
|
|
// and modified by Doris
|
|
|
|
#include "runtime/row_batch.h"
|
|
|
|
#include <snappy/snappy.h>
|
|
#include <stdint.h> // for intptr_t
|
|
|
|
#include "common/utils.h"
|
|
#include "gen_cpp/Data_types.h"
|
|
#include "gen_cpp/data.pb.h"
|
|
#include "runtime/buffered_tuple_stream2.inline.h"
|
|
#include "runtime/collection_value.h"
|
|
#include "runtime/exec_env.h"
|
|
#include "runtime/runtime_state.h"
|
|
#include "runtime/string_value.h"
|
|
#include "runtime/thread_context.h"
|
|
#include "runtime/tuple_row.h"
|
|
#include "vec/columns/column_vector.h"
|
|
#include "vec/core/block.h"
|
|
|
|
using std::vector;
|
|
|
|
namespace doris {
|
|
|
|
const int RowBatch::AT_CAPACITY_MEM_USAGE = 8 * 1024 * 1024;
|
|
const int RowBatch::FIXED_LEN_BUFFER_LIMIT = AT_CAPACITY_MEM_USAGE / 2;
|
|
|
|
RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity)
|
|
: _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()),
|
|
_has_in_flight_row(false),
|
|
_num_rows(0),
|
|
_num_uncommitted_rows(0),
|
|
_capacity(capacity),
|
|
_flush(FlushMode::NO_FLUSH_RESOURCES),
|
|
_needs_deep_copy(false),
|
|
_num_tuples_per_row(row_desc.tuple_descriptors().size()),
|
|
_row_desc(row_desc),
|
|
_auxiliary_mem_usage(0),
|
|
_need_to_return(false),
|
|
_tuple_data_pool() {
|
|
DCHECK_GT(capacity, 0);
|
|
_tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*);
|
|
DCHECK_GT(_tuple_ptrs_size, 0);
|
|
_tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size));
|
|
DCHECK(_tuple_ptrs != nullptr);
|
|
}
|
|
|
|
// TODO: we want our input_batch's tuple_data to come from our (not yet implemented)
|
|
// global runtime memory segment; how do we get thrift to allocate it from there?
|
|
// maybe change line (in Data_types.cc generated from Data.thrift)
|
|
// xfer += iprot->readString(this->tuple_data[_i9]);
|
|
// to allocated string data in special mempool
|
|
// (change via python script that runs over Data_types.cc)
|
|
RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch)
|
|
: _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()),
|
|
_has_in_flight_row(false),
|
|
_num_rows(input_batch.num_rows()),
|
|
_num_uncommitted_rows(0),
|
|
_capacity(_num_rows),
|
|
_flush(FlushMode::NO_FLUSH_RESOURCES),
|
|
_needs_deep_copy(false),
|
|
_num_tuples_per_row(input_batch.row_tuples_size()),
|
|
_row_desc(row_desc),
|
|
_auxiliary_mem_usage(0),
|
|
_need_to_return(false),
|
|
_tuple_data_pool() {
|
|
_tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*);
|
|
DCHECK_GT(_tuple_ptrs_size, 0);
|
|
_tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size));
|
|
DCHECK(_tuple_ptrs != nullptr);
|
|
|
|
char* tuple_data = nullptr;
|
|
if (input_batch.is_compressed()) {
|
|
// Decompress tuple data into data pool
|
|
const char* compressed_data = input_batch.tuple_data().c_str();
|
|
size_t compressed_size = input_batch.tuple_data().size();
|
|
size_t uncompressed_size = 0;
|
|
bool success =
|
|
snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size);
|
|
DCHECK(success) << "snappy::GetUncompressedLength failed";
|
|
tuple_data = (char*)_tuple_data_pool.allocate(uncompressed_size);
|
|
success = snappy::RawUncompress(compressed_data, compressed_size, tuple_data);
|
|
DCHECK(success) << "snappy::RawUncompress failed";
|
|
} else {
|
|
// Tuple data uncompressed, copy directly into data pool
|
|
tuple_data = (char*)_tuple_data_pool.allocate(input_batch.tuple_data().size());
|
|
memcpy(tuple_data, input_batch.tuple_data().c_str(), input_batch.tuple_data().size());
|
|
}
|
|
|
|
// convert input_batch.tuple_offsets into pointers
|
|
int tuple_idx = 0;
|
|
// For historical reasons, the original offset was stored using int32,
|
|
// so that if a rowbatch is larger than 2GB, the passed offset may generate an error due to value overflow.
|
|
// So in the new version, a new_tuple_offsets structure is added to store offsets using int64.
|
|
// Here, to maintain compatibility, both versions of offsets are used, with preference given to new_tuple_offsets.
|
|
// TODO(cmy): in the next version, the original tuple_offsets should be removed.
|
|
if (input_batch.new_tuple_offsets_size() > 0) {
|
|
for (int64_t offset : input_batch.new_tuple_offsets()) {
|
|
if (offset == -1) {
|
|
_tuple_ptrs[tuple_idx++] = nullptr;
|
|
} else {
|
|
_tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
|
|
}
|
|
}
|
|
} else {
|
|
for (int32_t offset : input_batch.tuple_offsets()) {
|
|
if (offset == -1) {
|
|
_tuple_ptrs[tuple_idx++] = nullptr;
|
|
} else {
|
|
_tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Check whether we have slots that require offset-to-pointer conversion.
|
|
if (!_row_desc.has_varlen_slots()) {
|
|
return;
|
|
}
|
|
|
|
const auto& tuple_descs = _row_desc.tuple_descriptors();
|
|
|
|
// For every unique tuple, convert string offsets contained in tuple data into
|
|
// pointers. Tuples were serialized in the order we are deserializing them in,
|
|
// so the first occurrence of a tuple will always have a higher offset than any tuple
|
|
// we already converted.
|
|
for (int i = 0; i < _num_rows; ++i) {
|
|
TupleRow* row = get_row(i);
|
|
for (size_t j = 0; j < tuple_descs.size(); ++j) {
|
|
auto desc = tuple_descs[j];
|
|
if (desc->string_slots().empty() && desc->collection_slots().empty()) {
|
|
continue;
|
|
}
|
|
|
|
Tuple* tuple = row->get_tuple(j);
|
|
if (tuple == nullptr) {
|
|
continue;
|
|
}
|
|
|
|
for (auto slot : desc->string_slots()) {
|
|
DCHECK(slot->type().is_string_type());
|
|
StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
|
|
int64_t offset = convert_to<int64_t>(string_val->ptr);
|
|
string_val->ptr = tuple_data + offset;
|
|
|
|
// Why we do this mask? Field len of StringValue is changed from int to size_t in
|
|
// Doris 0.11. When upgrading, some bits of len sent from 0.10 is random value,
|
|
// this works fine in version 0.10, however in 0.11 this will lead to an invalid
|
|
// length. So we make the high bits zero here.
|
|
string_val->len &= 0x7FFFFFFFL;
|
|
}
|
|
|
|
// copy collection slots
|
|
for (auto slot_collection : desc->collection_slots()) {
|
|
DCHECK(slot_collection->type().is_collection_type());
|
|
|
|
CollectionValue* array_val =
|
|
tuple->get_collection_slot(slot_collection->tuple_offset());
|
|
const auto& item_type_desc = slot_collection->type().children[0];
|
|
CollectionValue::deserialize_collection(array_val, tuple_data, item_type_desc);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void RowBatch::clear() {
|
|
if (_cleared) {
|
|
return;
|
|
}
|
|
|
|
_tuple_data_pool.free_all();
|
|
_agg_object_pool.clear();
|
|
for (int i = 0; i < _io_buffers.size(); ++i) {
|
|
_io_buffers[i]->return_buffer();
|
|
}
|
|
|
|
for (BufferInfo& buffer_info : _buffers) {
|
|
ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(buffer_info.client, &buffer_info.buffer);
|
|
}
|
|
|
|
close_tuple_streams();
|
|
for (int i = 0; i < _blocks.size(); ++i) {
|
|
_blocks[i]->del();
|
|
}
|
|
DCHECK(_tuple_ptrs != nullptr);
|
|
free(_tuple_ptrs);
|
|
_tuple_ptrs = nullptr;
|
|
_cleared = true;
|
|
}
|
|
|
|
RowBatch::~RowBatch() {
|
|
clear();
|
|
}
|
|
|
|
Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
|
|
size_t* compressed_size, bool allow_transfer_large_data) {
|
|
// num_rows
|
|
output_batch->set_num_rows(_num_rows);
|
|
// row_tuples
|
|
_row_desc.to_protobuf(output_batch->mutable_row_tuples());
|
|
// tuple_offsets: must clear before reserve
|
|
// TODO(cmy): the tuple_offsets should be removed after v1.1.0, use new_tuple_offsets instead.
|
|
// keep tuple_offsets here is just for compatibility.
|
|
output_batch->clear_tuple_offsets();
|
|
output_batch->mutable_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row);
|
|
output_batch->clear_new_tuple_offsets();
|
|
output_batch->mutable_new_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row);
|
|
// is_compressed
|
|
output_batch->set_is_compressed(false);
|
|
// tuple data
|
|
size_t tuple_byte_size = total_byte_size();
|
|
std::string* mutable_tuple_data = output_batch->mutable_tuple_data();
|
|
mutable_tuple_data->resize(tuple_byte_size);
|
|
|
|
// Copy tuple data, including strings, into output_batch (converting string
|
|
// pointers into offsets in the process)
|
|
int64_t offset = 0; // current offset into output_batch->tuple_data
|
|
char* tuple_data = mutable_tuple_data->data();
|
|
const auto& tuple_descs = _row_desc.tuple_descriptors();
|
|
const auto& mutable_tuple_offsets = output_batch->mutable_tuple_offsets();
|
|
const auto& mutable_new_tuple_offsets = output_batch->mutable_new_tuple_offsets();
|
|
|
|
for (int i = 0; i < _num_rows; ++i) {
|
|
TupleRow* row = get_row(i);
|
|
for (size_t j = 0; j < tuple_descs.size(); ++j) {
|
|
auto desc = tuple_descs[j];
|
|
if (row->get_tuple(j) == nullptr) {
|
|
// NULLs are encoded as -1
|
|
mutable_tuple_offsets->Add(-1);
|
|
mutable_new_tuple_offsets->Add(-1);
|
|
continue;
|
|
}
|
|
// Record offset before creating copy (which increments offset and tuple_data)
|
|
mutable_tuple_offsets->Add((int32_t)offset);
|
|
mutable_new_tuple_offsets->Add(offset);
|
|
row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true);
|
|
CHECK_LE(offset, tuple_byte_size);
|
|
}
|
|
}
|
|
CHECK_EQ(offset, tuple_byte_size)
|
|
<< "offset: " << offset << " vs. tuple_byte_size: " << tuple_byte_size;
|
|
|
|
size_t max_compressed_size = snappy::MaxCompressedLength(tuple_byte_size);
|
|
bool can_compress = config::compress_rowbatches && tuple_byte_size > 0;
|
|
if (can_compress) {
|
|
try {
|
|
// Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails
|
|
_compression_scratch.resize(max_compressed_size);
|
|
} catch (...) {
|
|
can_compress = false;
|
|
std::exception_ptr p = std::current_exception();
|
|
LOG(WARNING) << "Try to alloc " << max_compressed_size
|
|
<< " bytes for compression scratch failed. "
|
|
<< (p ? p.__cxa_exception_type()->name() : "null");
|
|
}
|
|
}
|
|
if (can_compress) {
|
|
// Try compressing tuple_data to _compression_scratch, swap if compressed data is
|
|
// smaller
|
|
size_t compressed_size = 0;
|
|
char* compressed_output = _compression_scratch.data();
|
|
snappy::RawCompress(mutable_tuple_data->data(), tuple_byte_size, compressed_output,
|
|
&compressed_size);
|
|
if (LIKELY(compressed_size < tuple_byte_size)) {
|
|
_compression_scratch.resize(compressed_size);
|
|
mutable_tuple_data->swap(_compression_scratch);
|
|
output_batch->set_is_compressed(true);
|
|
}
|
|
|
|
VLOG_ROW << "uncompressed tuple_byte_size: " << tuple_byte_size
|
|
<< ", compressed size: " << compressed_size;
|
|
}
|
|
|
|
// return compressed and uncompressed size
|
|
size_t pb_size = get_batch_size(*output_batch);
|
|
*uncompressed_size = pb_size - mutable_tuple_data->size() + tuple_byte_size;
|
|
*compressed_size = pb_size;
|
|
if (!allow_transfer_large_data && pb_size > std::numeric_limits<int32_t>::max()) {
|
|
// the protobuf has a hard limit of 2GB for serialized data.
|
|
return Status::InternalError(fmt::format(
|
|
"The rowbatch is large than 2GB({}), can not send by Protobuf.", pb_size));
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
// when row from files can't fill into tuple with schema limitation, increase the _num_uncommitted_rows in row batch,
|
|
void RowBatch::increase_uncommitted_rows() {
|
|
_num_uncommitted_rows++;
|
|
}
|
|
|
|
void RowBatch::add_io_buffer(DiskIoMgr::BufferDescriptor* buffer) {
|
|
DCHECK(buffer != nullptr);
|
|
_io_buffers.push_back(buffer);
|
|
_auxiliary_mem_usage += buffer->buffer_len();
|
|
buffer->update_mem_tracker(_mem_tracker.get());
|
|
}
|
|
|
|
Status RowBatch::resize_and_allocate_tuple_buffer(RuntimeState* state, int64_t* tuple_buffer_size,
|
|
uint8_t** buffer) {
|
|
int64_t row_size = _row_desc.get_row_size();
|
|
// Avoid divide-by-zero. Don't need to modify capacity for empty rows anyway.
|
|
if (row_size != 0) {
|
|
_capacity = std::max(1, std::min<int>(_capacity, FIXED_LEN_BUFFER_LIMIT / row_size));
|
|
}
|
|
*tuple_buffer_size = row_size * _capacity;
|
|
// TODO(dhc): change allocate to try_allocate?
|
|
*buffer = _tuple_data_pool.allocate(*tuple_buffer_size);
|
|
if (*buffer == nullptr) {
|
|
std::stringstream ss;
|
|
ss << "Failed to allocate tuple buffer" << *tuple_buffer_size;
|
|
LOG(WARNING) << ss.str();
|
|
return state->set_mem_limit_exceeded(ss.str());
|
|
}
|
|
return Status::OK();
|
|
}
|
|
|
|
void RowBatch::add_tuple_stream(BufferedTupleStream2* stream) {
|
|
DCHECK(stream != nullptr);
|
|
_tuple_streams.push_back(stream);
|
|
_auxiliary_mem_usage += stream->byte_size();
|
|
}
|
|
|
|
void RowBatch::add_block(BufferedBlockMgr2::Block* block) {
|
|
DCHECK(block != nullptr);
|
|
_blocks.push_back(block);
|
|
_auxiliary_mem_usage += block->buffer_len();
|
|
}
|
|
|
|
void RowBatch::reset() {
|
|
_num_rows = 0;
|
|
_capacity = _tuple_ptrs_size / (_num_tuples_per_row * sizeof(Tuple*));
|
|
_has_in_flight_row = false;
|
|
|
|
// TODO: Change this to Clear() and investigate the repercussions.
|
|
_tuple_data_pool.free_all();
|
|
_agg_object_pool.clear();
|
|
for (int i = 0; i < _io_buffers.size(); ++i) {
|
|
_io_buffers[i]->return_buffer();
|
|
}
|
|
_io_buffers.clear();
|
|
|
|
for (BufferInfo& buffer_info : _buffers) {
|
|
ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(buffer_info.client, &buffer_info.buffer);
|
|
}
|
|
_buffers.clear();
|
|
|
|
close_tuple_streams();
|
|
for (int i = 0; i < _blocks.size(); ++i) {
|
|
_blocks[i]->del();
|
|
}
|
|
_blocks.clear();
|
|
_auxiliary_mem_usage = 0;
|
|
_need_to_return = false;
|
|
_flush = FlushMode::NO_FLUSH_RESOURCES;
|
|
_needs_deep_copy = false;
|
|
}
|
|
|
|
void RowBatch::close_tuple_streams() {
|
|
for (int i = 0; i < _tuple_streams.size(); ++i) {
|
|
_tuple_streams[i]->close();
|
|
delete _tuple_streams[i];
|
|
}
|
|
_tuple_streams.clear();
|
|
}
|
|
|
|
void RowBatch::transfer_resource_ownership(RowBatch* dest) {
|
|
dest->_auxiliary_mem_usage += _tuple_data_pool.total_allocated_bytes();
|
|
dest->_tuple_data_pool.acquire_data(&_tuple_data_pool, false);
|
|
dest->_agg_object_pool.acquire_data(&_agg_object_pool);
|
|
for (int i = 0; i < _io_buffers.size(); ++i) {
|
|
DiskIoMgr::BufferDescriptor* buffer = _io_buffers[i];
|
|
dest->_io_buffers.push_back(buffer);
|
|
dest->_auxiliary_mem_usage += buffer->buffer_len();
|
|
buffer->update_mem_tracker(dest->_mem_tracker.get());
|
|
}
|
|
_io_buffers.clear();
|
|
|
|
for (BufferInfo& buffer_info : _buffers) {
|
|
dest->add_buffer(buffer_info.client, std::move(buffer_info.buffer),
|
|
FlushMode::NO_FLUSH_RESOURCES);
|
|
}
|
|
_buffers.clear();
|
|
|
|
for (int i = 0; i < _tuple_streams.size(); ++i) {
|
|
dest->_tuple_streams.push_back(_tuple_streams[i]);
|
|
dest->_auxiliary_mem_usage += _tuple_streams[i]->byte_size();
|
|
}
|
|
// Resource release should be done by dest RowBatch. if we don't clear the corresponding resources.
|
|
// This Rowbatch calls the reset() method, dest Rowbatch will also call the reset() method again,
|
|
// which will cause the core problem of double delete
|
|
_tuple_streams.clear();
|
|
|
|
for (int i = 0; i < _blocks.size(); ++i) {
|
|
dest->_blocks.push_back(_blocks[i]);
|
|
dest->_auxiliary_mem_usage += _blocks[i]->buffer_len();
|
|
}
|
|
_blocks.clear();
|
|
|
|
dest->_need_to_return |= _need_to_return;
|
|
|
|
if (_needs_deep_copy) {
|
|
dest->mark_needs_deep_copy();
|
|
} else if (_flush == FlushMode::FLUSH_RESOURCES) {
|
|
dest->mark_flush_resources();
|
|
}
|
|
reset();
|
|
}
|
|
|
|
vectorized::Block RowBatch::convert_to_vec_block() const {
|
|
std::vector<vectorized::MutableColumnPtr> columns;
|
|
for (const auto tuple_desc : _row_desc.tuple_descriptors()) {
|
|
for (const auto slot_desc : tuple_desc->slots()) {
|
|
columns.emplace_back(slot_desc->get_empty_mutable_column());
|
|
}
|
|
}
|
|
|
|
std::vector<SlotDescriptor*> slot_descs;
|
|
std::vector<int> tuple_idx;
|
|
int column_numbers = 0;
|
|
for (int i = 0; i < _row_desc.tuple_descriptors().size(); ++i) {
|
|
auto tuple_desc = _row_desc.tuple_descriptors()[i];
|
|
for (int j = 0; j < tuple_desc->slots().size(); ++j) {
|
|
slot_descs.push_back(tuple_desc->slots()[j]);
|
|
tuple_idx.push_back(i);
|
|
}
|
|
column_numbers += tuple_desc->slots().size();
|
|
}
|
|
for (int i = 0; i < column_numbers; ++i) {
|
|
auto slot_desc = slot_descs[i];
|
|
for (int j = 0; j < _num_rows; ++j) {
|
|
TupleRow* src_row = get_row(j);
|
|
auto tuple = src_row->get_tuple(tuple_idx[i]);
|
|
if (slot_desc->is_nullable() && tuple->is_null(slot_desc->null_indicator_offset())) {
|
|
columns[i]->insert_data(nullptr, 0);
|
|
} else if (slot_desc->type().is_string_type()) {
|
|
auto string_value =
|
|
static_cast<const StringValue*>(tuple->get_slot(slot_desc->tuple_offset()));
|
|
columns[i]->insert_data(string_value->ptr, string_value->len);
|
|
} else {
|
|
columns[i]->insert_data(
|
|
static_cast<const char*>(tuple->get_slot(slot_desc->tuple_offset())),
|
|
slot_desc->slot_size());
|
|
}
|
|
}
|
|
}
|
|
|
|
doris::vectorized::ColumnsWithTypeAndName columns_with_type_and_name;
|
|
auto n_columns = 0;
|
|
for (const auto tuple_desc : _row_desc.tuple_descriptors()) {
|
|
for (const auto slot_desc : tuple_desc->slots()) {
|
|
columns_with_type_and_name.emplace_back(columns[n_columns++]->get_ptr(),
|
|
slot_desc->get_data_type_ptr(),
|
|
slot_desc->col_name());
|
|
}
|
|
}
|
|
|
|
return {columns_with_type_and_name};
|
|
}
|
|
|
|
size_t RowBatch::get_batch_size(const PRowBatch& batch) {
|
|
size_t result = batch.tuple_data().size();
|
|
result += batch.row_tuples().size() * sizeof(int32_t);
|
|
// TODO(cmy): remove batch.tuple_offsets
|
|
result += batch.tuple_offsets().size() * sizeof(int32_t);
|
|
result += batch.new_tuple_offsets().size() * sizeof(int64_t);
|
|
return result;
|
|
}
|
|
|
|
void RowBatch::acquire_state(RowBatch* src) {
|
|
// DCHECK(_row_desc.equals(src->_row_desc));
|
|
DCHECK_EQ(_num_tuples_per_row, src->_num_tuples_per_row);
|
|
DCHECK_EQ(_tuple_ptrs_size, src->_tuple_ptrs_size);
|
|
DCHECK_EQ(_auxiliary_mem_usage, 0);
|
|
|
|
// The destination row batch should be empty.
|
|
DCHECK(!_has_in_flight_row);
|
|
DCHECK_EQ(_num_rows, 0);
|
|
|
|
for (int i = 0; i < src->_io_buffers.size(); ++i) {
|
|
DiskIoMgr::BufferDescriptor* buffer = src->_io_buffers[i];
|
|
_io_buffers.push_back(buffer);
|
|
_auxiliary_mem_usage += buffer->buffer_len();
|
|
buffer->update_mem_tracker(_mem_tracker.get());
|
|
}
|
|
src->_io_buffers.clear();
|
|
src->_auxiliary_mem_usage = 0;
|
|
|
|
DCHECK(src->_tuple_streams.empty());
|
|
DCHECK(src->_blocks.empty());
|
|
|
|
_has_in_flight_row = src->_has_in_flight_row;
|
|
_num_rows = src->_num_rows;
|
|
_capacity = src->_capacity;
|
|
_need_to_return = src->_need_to_return;
|
|
// tuple_ptrs_ were allocated with malloc so can be swapped between batches.
|
|
std::swap(_tuple_ptrs, src->_tuple_ptrs);
|
|
src->transfer_resource_ownership(this);
|
|
}
|
|
|
|
void RowBatch::deep_copy_to(RowBatch* dst) {
|
|
DCHECK(dst->_row_desc.equals(_row_desc));
|
|
DCHECK_EQ(dst->_num_rows, 0);
|
|
DCHECK_GE(dst->_capacity, _num_rows);
|
|
dst->add_rows(_num_rows);
|
|
for (int i = 0; i < _num_rows; ++i) {
|
|
TupleRow* src_row = get_row(i);
|
|
TupleRow* dst_row = convert_to<TupleRow*>(dst->_tuple_ptrs + i * _num_tuples_per_row);
|
|
src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), &dst->_tuple_data_pool, false);
|
|
}
|
|
dst->commit_rows(_num_rows);
|
|
}
|
|
|
|
// TODO: consider computing size of batches as they are built up
|
|
size_t RowBatch::total_byte_size() const {
|
|
size_t result = 0;
|
|
|
|
// Sum total variable length byte sizes.
|
|
for (int i = 0; i < _num_rows; ++i) {
|
|
TupleRow* row = get_row(i);
|
|
const auto& tuple_descs = _row_desc.tuple_descriptors();
|
|
for (size_t j = 0; j < tuple_descs.size(); ++j) {
|
|
auto desc = tuple_descs[j];
|
|
Tuple* tuple = row->get_tuple(j);
|
|
if (tuple == nullptr) {
|
|
continue;
|
|
}
|
|
result += desc->byte_size();
|
|
|
|
for (auto slot : desc->string_slots()) {
|
|
DCHECK(slot->type().is_string_type());
|
|
if (tuple->is_null(slot->null_indicator_offset())) {
|
|
continue;
|
|
}
|
|
StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
|
|
result += string_val->len;
|
|
}
|
|
|
|
// compute slot collection size
|
|
for (auto slot_collection : desc->collection_slots()) {
|
|
DCHECK(slot_collection->type().is_collection_type());
|
|
if (tuple->is_null(slot_collection->null_indicator_offset())) {
|
|
continue;
|
|
}
|
|
CollectionValue* array_val =
|
|
tuple->get_collection_slot(slot_collection->tuple_offset());
|
|
const auto& item_type_desc = slot_collection->type().children[0];
|
|
result += array_val->get_byte_size(item_type_desc);
|
|
}
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
void RowBatch::add_buffer(BufferPool::ClientHandle* client, BufferPool::BufferHandle&& buffer,
|
|
FlushMode flush) {
|
|
_auxiliary_mem_usage += buffer.len();
|
|
BufferInfo buffer_info;
|
|
buffer_info.client = client;
|
|
buffer_info.buffer = std::move(buffer);
|
|
_buffers.push_back(std::move(buffer_info));
|
|
if (flush == FlushMode::FLUSH_RESOURCES) mark_flush_resources();
|
|
}
|
|
|
|
std::string RowBatch::to_string() {
|
|
std::stringstream out;
|
|
for (int i = 0; i < _num_rows; ++i) {
|
|
out << get_row(i)->to_string(_row_desc) << "\n";
|
|
}
|
|
return out.str();
|
|
}
|
|
|
|
} // end namespace doris
|