Files
doris/be/src/runtime/row_batch.cpp
HappenLee e1d7233e9c [feature](vectorization) Support Vectorized Exec Engine In Doris (#7785)
# Proposed changes

Issue Number: close #6238

    Co-authored-by: HappenLee <happenlee@hotmail.com>
    Co-authored-by: stdpain <34912776+stdpain@users.noreply.github.com>
    Co-authored-by: Zhengguo Yang <yangzhgg@gmail.com>
    Co-authored-by: wangbo <506340561@qq.com>
    Co-authored-by: emmymiao87 <522274284@qq.com>
    Co-authored-by: Pxl <952130278@qq.com>
    Co-authored-by: zhangstar333 <87313068+zhangstar333@users.noreply.github.com>
    Co-authored-by: thinker <zchw100@qq.com>
    Co-authored-by: Zeno Yang <1521564989@qq.com>
    Co-authored-by: Wang Shuo <wangshuo128@gmail.com>
    Co-authored-by: zhoubintao <35688959+zbtzbtzbt@users.noreply.github.com>
    Co-authored-by: Gabriel <gabrielleebuaa@gmail.com>
    Co-authored-by: xinghuayu007 <1450306854@qq.com>
    Co-authored-by: weizuo93 <weizuo@apache.org>
    Co-authored-by: yiguolei <guoleiyi@tencent.com>
    Co-authored-by: anneji-dev <85534151+anneji-dev@users.noreply.github.com>
    Co-authored-by: awakeljw <993007281@qq.com>
    Co-authored-by: taberylyang <95272637+taberylyang@users.noreply.github.com>
    Co-authored-by: Cui Kaifeng <48012748+azurenake@users.noreply.github.com>


## Problem Summary:

### 1. Some code from clickhouse

**ClickHouse is an excellent implementation of the vectorized execution engine database,
so here we have referenced and learned a lot from its excellent implementation in terms of
data structure and function implementation.
We are based on ClickHouse v19.16.2.2 and would like to thank the ClickHouse community and developers.**

The following comment has been added to the code from Clickhouse, eg:
// This file is copied from
// https://github.com/ClickHouse/ClickHouse/blob/master/src/Interpreters/AggregationCommon.h
// and modified by Doris

### 2. Support exec node and query:
* vaggregation_node
* vanalytic_eval_node
* vassert_num_rows_node
* vblocking_join_node
* vcross_join_node
* vempty_set_node
* ves_http_scan_node
* vexcept_node
* vexchange_node
* vintersect_node
* vmysql_scan_node
* vodbc_scan_node
* volap_scan_node
* vrepeat_node
* vschema_scan_node
* vselect_node
* vset_operation_node
* vsort_node
* vunion_node
* vhash_join_node

You can run exec engine of SSB/TPCH and 70% TPCDS stand query test set.

### 3. Data Model

Vec Exec Engine Support **Dup/Agg/Unq** table, Support Block Reader Vectorized.
Segment Vec is working in process.

### 4. How to use

1. Set the environment variable `set enable_vectorized_engine = true; `(required)
2. Set the environment variable `set batch_size = 4096; ` (recommended)

### 5. Some diff from origin exec engine

https://github.com/doris-vectorized/doris-vectorized/issues/294

## Checklist(Required)

1. Does it affect the original behavior: (No)
2. Has unit tests been added: (Yes)
3. Has document been added or modified: (No)
4. Does it need to update dependencies: (No)
5. Are there any changes that cannot be rolled back: (Yes)
2022-01-18 10:07:15 +08:00

821 lines
32 KiB
C++

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/row_batch.h"
#include <snappy/snappy.h>
#include <stdint.h> // for intptr_t
#include "common/utils.h"
#include "gen_cpp/Data_types.h"
#include "gen_cpp/data.pb.h"
#include "runtime/buffered_tuple_stream2.inline.h"
#include "runtime/collection_value.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "runtime/tuple_row.h"
#include "vec/columns/column_vector.h"
#include "vec/core/block.h"
using std::vector;
namespace doris {
const int RowBatch::AT_CAPACITY_MEM_USAGE = 8 * 1024 * 1024;
const int RowBatch::FIXED_LEN_BUFFER_LIMIT = AT_CAPACITY_MEM_USAGE / 2;
RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_tracker)
: _mem_tracker(mem_tracker),
_has_in_flight_row(false),
_num_rows(0),
_num_uncommitted_rows(0),
_capacity(capacity),
_flush(FlushMode::NO_FLUSH_RESOURCES),
_needs_deep_copy(false),
_num_tuples_per_row(row_desc.tuple_descriptors().size()),
_row_desc(row_desc),
_auxiliary_mem_usage(0),
_need_to_return(false),
_tuple_data_pool(_mem_tracker) {
DCHECK(_mem_tracker != nullptr);
DCHECK_GT(capacity, 0);
_tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*);
DCHECK_GT(_tuple_ptrs_size, 0);
// TODO: switch to Init() pattern so we can check memory limit and return Status.
if (config::enable_partitioned_aggregation) {
_mem_tracker->Consume(_tuple_ptrs_size);
_tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size));
DCHECK(_tuple_ptrs != nullptr);
} else {
_tuple_ptrs = (Tuple**)(_tuple_data_pool.allocate(_tuple_ptrs_size));
}
}
// TODO: we want our input_batch's tuple_data to come from our (not yet implemented)
// global runtime memory segment; how do we get thrift to allocate it from there?
// maybe change line (in Data_types.cc generated from Data.thrift)
// xfer += iprot->readString(this->tuple_data[_i9]);
// to allocated string data in special mempool
// (change via python script that runs over Data_types.cc)
RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch, MemTracker* tracker)
: _mem_tracker(tracker),
_has_in_flight_row(false),
_num_rows(input_batch.num_rows()),
_num_uncommitted_rows(0),
_capacity(_num_rows),
_flush(FlushMode::NO_FLUSH_RESOURCES),
_needs_deep_copy(false),
_num_tuples_per_row(input_batch.row_tuples_size()),
_row_desc(row_desc),
_auxiliary_mem_usage(0),
_need_to_return(false),
_tuple_data_pool(_mem_tracker) {
DCHECK(_mem_tracker != nullptr);
_tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*);
DCHECK_GT(_tuple_ptrs_size, 0);
// TODO: switch to Init() pattern so we can check memory limit and return Status.
if (config::enable_partitioned_aggregation) {
_mem_tracker->Consume(_tuple_ptrs_size);
_tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size));
DCHECK(_tuple_ptrs != nullptr);
} else {
_tuple_ptrs = (Tuple**)_tuple_data_pool.allocate(_tuple_ptrs_size);
}
char* tuple_data = nullptr;
if (input_batch.is_compressed()) {
// Decompress tuple data into data pool
const char* compressed_data = input_batch.tuple_data().c_str();
size_t compressed_size = input_batch.tuple_data().size();
size_t uncompressed_size = 0;
bool success =
snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size);
DCHECK(success) << "snappy::GetUncompressedLength failed";
tuple_data = (char*)_tuple_data_pool.allocate(uncompressed_size);
success = snappy::RawUncompress(compressed_data, compressed_size, tuple_data);
DCHECK(success) << "snappy::RawUncompress failed";
} else {
// Tuple data uncompressed, copy directly into data pool
tuple_data = (char*)_tuple_data_pool.allocate(input_batch.tuple_data().size());
memcpy(tuple_data, input_batch.tuple_data().c_str(), input_batch.tuple_data().size());
}
// convert input_batch.tuple_offsets into pointers
int tuple_idx = 0;
for (auto offset : input_batch.tuple_offsets()) {
if (offset == -1) {
_tuple_ptrs[tuple_idx++] = nullptr;
} else {
_tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
}
}
// Check whether we have slots that require offset-to-pointer conversion.
if (!_row_desc.has_varlen_slots()) {
return;
}
const auto& tuple_descs = _row_desc.tuple_descriptors();
// For every unique tuple, convert string offsets contained in tuple data into
// pointers. Tuples were serialized in the order we are deserializing them in,
// so the first occurrence of a tuple will always have a higher offset than any tuple
// we already converted.
for (int i = 0; i < _num_rows; ++i) {
TupleRow* row = get_row(i);
for (size_t j = 0; j < tuple_descs.size(); ++j) {
auto desc = tuple_descs[j];
if (desc->string_slots().empty() && desc->collection_slots().empty()) {
continue;
}
Tuple* tuple = row->get_tuple(j);
if (tuple == nullptr) {
continue;
}
for (auto slot : desc->string_slots()) {
DCHECK(slot->type().is_string_type());
StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
int offset = convert_to<int>(string_val->ptr);
string_val->ptr = tuple_data + offset;
// Why we do this mask? Field len of StringValue is changed from int to size_t in
// Doris 0.11. When upgrading, some bits of len sent from 0.10 is random value,
// this works fine in version 0.10, however in 0.11 this will lead to an invalid
// length. So we make the high bits zero here.
string_val->len &= 0x7FFFFFFFL;
}
// copy collection slots
for (auto slot_collection : desc->collection_slots()) {
DCHECK(slot_collection->type().is_collection_type());
CollectionValue* array_val =
tuple->get_collection_slot(slot_collection->tuple_offset());
// assgin data and null_sign pointer position in tuple_data
int data_offset = convert_to<int>(array_val->data());
array_val->set_data(tuple_data + data_offset);
int null_offset = convert_to<int>(array_val->null_signs());
array_val->set_null_signs(convert_to<bool*>(tuple_data + null_offset));
const TypeDescriptor& item_type = slot_collection->type().children.at(0);
if (!item_type.is_string_type()) {
continue;
}
// copy every string item
for (size_t k = 0; k < array_val->length(); ++k) {
if (array_val->is_null_at(k)) {
continue;
}
StringValue* dst_item_v = convert_to<StringValue*>(
(uint8_t*)array_val->data() + k * item_type.get_slot_size());
if (dst_item_v->len != 0) {
int offset = convert_to<int>(dst_item_v->ptr);
dst_item_v->ptr = tuple_data + offset;
}
}
}
}
}
}
// TODO: we want our input_batch's tuple_data to come from our (not yet implemented)
// global runtime memory segment; how do we get thrift to allocate it from there?
// maybe change line (in Data_types.cc generated from Data.thrift)
// xfer += iprot->readString(this->tuple_data[_i9]);
// to allocated string data in special mempool
// (change via python script that runs over Data_types.cc)
RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch, MemTracker* tracker)
: _mem_tracker(tracker),
_has_in_flight_row(false),
_num_rows(input_batch.num_rows),
_num_uncommitted_rows(0),
_capacity(_num_rows),
_flush(FlushMode::NO_FLUSH_RESOURCES),
_needs_deep_copy(false),
_num_tuples_per_row(input_batch.row_tuples.size()),
_row_desc(row_desc),
_auxiliary_mem_usage(0),
_need_to_return(false),
_tuple_data_pool(_mem_tracker) {
DCHECK(_mem_tracker != nullptr);
_tuple_ptrs_size = _num_rows * input_batch.row_tuples.size() * sizeof(Tuple*);
DCHECK_GT(_tuple_ptrs_size, 0);
// TODO: switch to Init() pattern so we can check memory limit and return Status.
if (config::enable_partitioned_aggregation) {
_mem_tracker->Consume(_tuple_ptrs_size);
_tuple_ptrs = (Tuple**)malloc(_tuple_ptrs_size);
DCHECK(_tuple_ptrs != nullptr);
} else {
_tuple_ptrs = (Tuple**)_tuple_data_pool.allocate(_tuple_ptrs_size);
}
char* tuple_data = nullptr;
if (input_batch.is_compressed) {
// Decompress tuple data into data pool
const char* compressed_data = input_batch.tuple_data.c_str();
size_t compressed_size = input_batch.tuple_data.size();
size_t uncompressed_size = 0;
bool success =
snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size);
DCHECK(success) << "snappy::GetUncompressedLength failed";
tuple_data = (char*)_tuple_data_pool.allocate(uncompressed_size);
success = snappy::RawUncompress(compressed_data, compressed_size, tuple_data);
DCHECK(success) << "snappy::RawUncompress failed";
} else {
// Tuple data uncompressed, copy directly into data pool
tuple_data = (char*)_tuple_data_pool.allocate(input_batch.tuple_data.size());
memcpy(tuple_data, input_batch.tuple_data.c_str(), input_batch.tuple_data.size());
}
// convert input_batch.tuple_offsets into pointers
int tuple_idx = 0;
for (auto offset : input_batch.tuple_offsets) {
if (offset == -1) {
_tuple_ptrs[tuple_idx++] = nullptr;
} else {
_tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
}
}
// Check whether we have slots that require offset-to-pointer conversion.
if (!_row_desc.has_varlen_slots()) {
return;
}
const auto& tuple_descs = _row_desc.tuple_descriptors();
// For every unique tuple, convert string offsets contained in tuple data into
// pointers. Tuples were serialized in the order we are deserializing them in,
// so the first occurrence of a tuple will always have a higher offset than any tuple
// we already converted.
for (int i = 0; i < _num_rows; ++i) {
TupleRow* row = get_row(i);
for (size_t j = 0; j < tuple_descs.size(); ++j) {
auto desc = tuple_descs[j];
if (desc->string_slots().empty() && desc->collection_slots().empty()) {
continue;
}
Tuple* tuple = row->get_tuple(j);
if (tuple == nullptr) {
continue;
}
for (auto slot : desc->string_slots()) {
DCHECK(slot->type().is_string_type());
StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
int offset = convert_to<int>(string_val->ptr);
string_val->ptr = tuple_data + offset;
// Why we do this mask? Field len of StringValue is changed from int to size_t in
// Doris 0.11. When upgrading, some bits of len sent from 0.10 is random value,
// this works fine in version 0.10, however in 0.11 this will lead to an invalid
// length. So we make the high bits zero here.
string_val->len &= 0x7FFFFFFFL;
}
// copy collection slot
for (auto slot_collection : desc->collection_slots()) {
DCHECK(slot_collection->type().is_collection_type());
CollectionValue* array_val =
tuple->get_collection_slot(slot_collection->tuple_offset());
int offset = convert_to<int>(array_val->data());
array_val->set_data(tuple_data + offset);
int null_offset = convert_to<int>(array_val->null_signs());
array_val->set_null_signs(convert_to<bool*>(tuple_data + null_offset));
const TypeDescriptor& item_type = slot_collection->type().children.at(0);
if (!item_type.is_string_type()) {
continue;
}
// copy string item
for (size_t k = 0; k < array_val->length(); ++k) {
if (array_val->is_null_at(k)) {
continue;
}
StringValue* dst_item_v = convert_to<StringValue*>(
(uint8_t*)array_val->data() + k * item_type.get_slot_size());
if (dst_item_v->len != 0) {
int offset = convert_to<int>(dst_item_v->ptr);
dst_item_v->ptr = tuple_data + offset;
}
}
}
}
}
}
void RowBatch::clear() {
if (_cleared) {
return;
}
_tuple_data_pool.free_all();
_agg_object_pool.clear();
for (int i = 0; i < _io_buffers.size(); ++i) {
_io_buffers[i]->return_buffer();
}
for (BufferInfo& buffer_info : _buffers) {
ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(buffer_info.client, &buffer_info.buffer);
}
close_tuple_streams();
for (int i = 0; i < _blocks.size(); ++i) {
_blocks[i]->del();
}
if (config::enable_partitioned_aggregation) {
DCHECK(_tuple_ptrs != nullptr);
free(_tuple_ptrs);
_mem_tracker->Release(_tuple_ptrs_size);
_tuple_ptrs = nullptr;
}
_cleared = true;
}
RowBatch::~RowBatch() {
clear();
}
size_t RowBatch::serialize(TRowBatch* output_batch) {
// why does Thrift not generate a Clear() function?
output_batch->row_tuples.clear();
output_batch->tuple_offsets.clear();
output_batch->is_compressed = false;
output_batch->num_rows = _num_rows;
_row_desc.to_thrift(&output_batch->row_tuples);
output_batch->tuple_offsets.reserve(_num_rows * _num_tuples_per_row);
size_t size = total_byte_size();
output_batch->tuple_data.resize(size);
// Copy tuple data, including strings, into output_batch (converting string
// pointers into offsets in the process)
int offset = 0; // current offset into output_batch->tuple_data
char* tuple_data = output_batch->tuple_data.data();
const auto& tuple_descs = _row_desc.tuple_descriptors();
for (int i = 0; i < _num_rows; ++i) {
TupleRow* row = get_row(i);
for (size_t j = 0; j < tuple_descs.size(); ++j) {
auto desc = tuple_descs[j];
if (row->get_tuple(j) == nullptr) {
// NULLs are encoded as -1
output_batch->tuple_offsets.push_back(-1);
continue;
}
// Record offset before creating copy (which increments offset and tuple_data)
output_batch->tuple_offsets.push_back(offset);
row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true);
DCHECK_LE(offset, size);
}
}
DCHECK_EQ(offset, size);
if (config::compress_rowbatches && size > 0) {
// Try compressing tuple_data to _compression_scratch, swap if compressed data is
// smaller
size_t max_compressed_size = snappy::MaxCompressedLength(size);
if (_compression_scratch.size() < max_compressed_size) {
_compression_scratch.resize(max_compressed_size);
}
size_t compressed_size = 0;
char* compressed_output = _compression_scratch.data();
snappy::RawCompress(output_batch->tuple_data.c_str(), size, compressed_output,
&compressed_size);
if (LIKELY(compressed_size < size)) {
_compression_scratch.resize(compressed_size);
output_batch->tuple_data.swap(_compression_scratch);
output_batch->is_compressed = true;
}
VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size;
}
// The size output_batch would be if we didn't compress tuple_data (will be equal to
// actual batch size if tuple_data isn't compressed)
return get_batch_size(*output_batch) - output_batch->tuple_data.size() + size;
}
size_t RowBatch::serialize(PRowBatch* output_batch) {
// num_rows
output_batch->set_num_rows(_num_rows);
// row_tuples
_row_desc.to_protobuf(output_batch->mutable_row_tuples());
// tuple_offsets: must clear before reserve
output_batch->clear_tuple_offsets();
output_batch->mutable_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row);
// is_compressed
output_batch->set_is_compressed(false);
// tuple data
size_t size = total_byte_size();
auto mutable_tuple_data = output_batch->mutable_tuple_data();
mutable_tuple_data->resize(size);
// Copy tuple data, including strings, into output_batch (converting string
// pointers into offsets in the process)
int offset = 0; // current offset into output_batch->tuple_data
char* tuple_data = mutable_tuple_data->data();
const auto& tuple_descs = _row_desc.tuple_descriptors();
const auto& mutable_tuple_offsets = output_batch->mutable_tuple_offsets();
for (int i = 0; i < _num_rows; ++i) {
TupleRow* row = get_row(i);
for (size_t j = 0; j < tuple_descs.size(); ++j) {
auto desc = tuple_descs[j];
if (row->get_tuple(j) == nullptr) {
// NULLs are encoded as -1
mutable_tuple_offsets->Add(-1);
continue;
}
// Record offset before creating copy (which increments offset and tuple_data)
mutable_tuple_offsets->Add(offset);
row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true);
DCHECK_LE(offset, size);
}
}
DCHECK_EQ(offset, size);
if (config::compress_rowbatches && size > 0) {
// Try compressing tuple_data to _compression_scratch, swap if compressed data is
// smaller
uint32_t max_compressed_size = snappy::MaxCompressedLength(size);
if (_compression_scratch.size() < max_compressed_size) {
_compression_scratch.resize(max_compressed_size);
}
size_t compressed_size = 0;
char* compressed_output = _compression_scratch.data();
snappy::RawCompress(mutable_tuple_data->data(), size, compressed_output, &compressed_size);
if (LIKELY(compressed_size < size)) {
_compression_scratch.resize(compressed_size);
mutable_tuple_data->swap(_compression_scratch);
output_batch->set_is_compressed(true);
}
VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size;
}
// The size output_batch would be if we didn't compress tuple_data (will be equal to
// actual batch size if tuple_data isn't compressed)
return get_batch_size(*output_batch) - mutable_tuple_data->size() + size;
}
// when row from files can't fill into tuple with schema limitation, increase the _num_uncommitted_rows in row batch,
void RowBatch::increase_uncommitted_rows() {
_num_uncommitted_rows++;
}
void RowBatch::add_io_buffer(DiskIoMgr::BufferDescriptor* buffer) {
DCHECK(buffer != nullptr);
_io_buffers.push_back(buffer);
_auxiliary_mem_usage += buffer->buffer_len();
buffer->set_mem_tracker(std::shared_ptr<MemTracker>(_mem_tracker)); // TODO(yingchun): fixme
}
Status RowBatch::resize_and_allocate_tuple_buffer(RuntimeState* state, int64_t* tuple_buffer_size,
uint8_t** buffer) {
int64_t row_size = _row_desc.get_row_size();
// Avoid divide-by-zero. Don't need to modify capacity for empty rows anyway.
if (row_size != 0) {
_capacity = std::max(1, std::min<int>(_capacity, FIXED_LEN_BUFFER_LIMIT / row_size));
}
*tuple_buffer_size = row_size * _capacity;
// TODO(dhc): change allocate to try_allocate?
*buffer = _tuple_data_pool.allocate(*tuple_buffer_size);
if (*buffer == nullptr) {
std::stringstream ss;
ss << "Failed to allocate tuple buffer" << *tuple_buffer_size;
LOG(WARNING) << ss.str();
return state->set_mem_limit_exceeded(ss.str());
}
return Status::OK();
}
void RowBatch::add_tuple_stream(BufferedTupleStream2* stream) {
DCHECK(stream != nullptr);
_tuple_streams.push_back(stream);
_auxiliary_mem_usage += stream->byte_size();
}
void RowBatch::add_block(BufferedBlockMgr2::Block* block) {
DCHECK(block != nullptr);
_blocks.push_back(block);
_auxiliary_mem_usage += block->buffer_len();
}
void RowBatch::reset() {
_num_rows = 0;
_capacity = _tuple_ptrs_size / (_num_tuples_per_row * sizeof(Tuple*));
_has_in_flight_row = false;
// TODO: Change this to Clear() and investigate the repercussions.
_tuple_data_pool.free_all();
_agg_object_pool.clear();
for (int i = 0; i < _io_buffers.size(); ++i) {
_io_buffers[i]->return_buffer();
}
_io_buffers.clear();
for (BufferInfo& buffer_info : _buffers) {
ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(buffer_info.client, &buffer_info.buffer);
}
_buffers.clear();
close_tuple_streams();
for (int i = 0; i < _blocks.size(); ++i) {
_blocks[i]->del();
}
_blocks.clear();
_auxiliary_mem_usage = 0;
if (!config::enable_partitioned_aggregation) {
_tuple_ptrs = (Tuple**)(_tuple_data_pool.allocate(_tuple_ptrs_size));
}
_need_to_return = false;
_flush = FlushMode::NO_FLUSH_RESOURCES;
_needs_deep_copy = false;
}
void RowBatch::close_tuple_streams() {
for (int i = 0; i < _tuple_streams.size(); ++i) {
_tuple_streams[i]->close();
delete _tuple_streams[i];
}
_tuple_streams.clear();
}
void RowBatch::transfer_resource_ownership(RowBatch* dest) {
dest->_auxiliary_mem_usage += _tuple_data_pool.total_allocated_bytes();
dest->_tuple_data_pool.acquire_data(&_tuple_data_pool, false);
dest->_agg_object_pool.acquire_data(&_agg_object_pool);
for (int i = 0; i < _io_buffers.size(); ++i) {
DiskIoMgr::BufferDescriptor* buffer = _io_buffers[i];
dest->_io_buffers.push_back(buffer);
dest->_auxiliary_mem_usage += buffer->buffer_len();
buffer->set_mem_tracker(
std::shared_ptr<MemTracker>(dest->_mem_tracker)); // TODO(yingchun): fixme
}
_io_buffers.clear();
for (BufferInfo& buffer_info : _buffers) {
dest->add_buffer(buffer_info.client, std::move(buffer_info.buffer),
FlushMode::NO_FLUSH_RESOURCES);
}
_buffers.clear();
for (int i = 0; i < _tuple_streams.size(); ++i) {
dest->_tuple_streams.push_back(_tuple_streams[i]);
dest->_auxiliary_mem_usage += _tuple_streams[i]->byte_size();
}
// Resource release should be done by dest RowBatch. if we don't clear the corresponding resources.
// This Rowbatch calls the reset() method, dest Rowbatch will also call the reset() method again,
// which will cause the core problem of double delete
_tuple_streams.clear();
for (int i = 0; i < _blocks.size(); ++i) {
dest->_blocks.push_back(_blocks[i]);
dest->_auxiliary_mem_usage += _blocks[i]->buffer_len();
}
_blocks.clear();
dest->_need_to_return |= _need_to_return;
if (_needs_deep_copy) {
dest->mark_needs_deep_copy();
} else if (_flush == FlushMode::FLUSH_RESOURCES) {
dest->mark_flush_resources();
}
reset();
}
vectorized::Block RowBatch::convert_to_vec_block() const {
std::vector<vectorized::MutableColumnPtr> columns;
for (const auto tuple_desc : _row_desc.tuple_descriptors()) {
for (const auto slot_desc : tuple_desc->slots()) {
columns.emplace_back(slot_desc->get_empty_mutable_column());
}
}
std::vector<SlotDescriptor*> slot_descs;
std::vector<int> tuple_idx;
int column_numbers = 0;
for (int i = 0; i < _row_desc.tuple_descriptors().size(); ++i) {
auto tuple_desc = _row_desc.tuple_descriptors()[i];
for (int j = 0; j < tuple_desc->slots().size(); ++j) {
slot_descs.push_back(tuple_desc->slots()[j]);
tuple_idx.push_back(i);
}
column_numbers += tuple_desc->slots().size();
}
for (int i = 0; i < column_numbers; ++i) {
auto slot_desc = slot_descs[i];
for (int j = 0; j < _num_rows; ++j) {
TupleRow* src_row = get_row(j);
auto tuple = src_row->get_tuple(tuple_idx[i]);
if (slot_desc->is_nullable() && tuple->is_null(slot_desc->null_indicator_offset())) {
columns[i]->insert_data(nullptr, 0);
} else if (slot_desc->type().is_string_type()) {
auto string_value =
static_cast<const StringValue*>(tuple->get_slot(slot_desc->tuple_offset()));
columns[i]->insert_data(string_value->ptr, string_value->len);
} else {
columns[i]->insert_data(
static_cast<const char*>(tuple->get_slot(slot_desc->tuple_offset())),
slot_desc->slot_size());
}
}
}
doris::vectorized::ColumnsWithTypeAndName columns_with_type_and_name;
auto n_columns = 0;
for (const auto tuple_desc : _row_desc.tuple_descriptors()) {
for (const auto slot_desc : tuple_desc->slots()) {
columns_with_type_and_name.emplace_back(columns[n_columns++]->get_ptr(),
slot_desc->get_data_type_ptr(),
slot_desc->col_name());
}
}
return {columns_with_type_and_name};
}
size_t RowBatch::get_batch_size(const TRowBatch& batch) {
size_t result = batch.tuple_data.size();
result += batch.row_tuples.size() * sizeof(TTupleId);
result += batch.tuple_offsets.size() * sizeof(int32_t);
return result;
}
size_t RowBatch::get_batch_size(const PRowBatch& batch) {
size_t result = batch.tuple_data().size();
result += batch.row_tuples().size() * sizeof(int32_t);
result += batch.tuple_offsets().size() * sizeof(int32_t);
return result;
}
void RowBatch::acquire_state(RowBatch* src) {
// DCHECK(_row_desc.equals(src->_row_desc));
DCHECK_EQ(_num_tuples_per_row, src->_num_tuples_per_row);
DCHECK_EQ(_tuple_ptrs_size, src->_tuple_ptrs_size);
DCHECK_EQ(_auxiliary_mem_usage, 0);
// The destination row batch should be empty.
DCHECK(!_has_in_flight_row);
DCHECK_EQ(_num_rows, 0);
for (int i = 0; i < src->_io_buffers.size(); ++i) {
DiskIoMgr::BufferDescriptor* buffer = src->_io_buffers[i];
_io_buffers.push_back(buffer);
_auxiliary_mem_usage += buffer->buffer_len();
buffer->set_mem_tracker(std::shared_ptr<MemTracker>(_mem_tracker)); // TODO(yingchun): fixme
}
src->_io_buffers.clear();
src->_auxiliary_mem_usage = 0;
DCHECK(src->_tuple_streams.empty());
DCHECK(src->_blocks.empty());
_has_in_flight_row = src->_has_in_flight_row;
_num_rows = src->_num_rows;
_capacity = src->_capacity;
_need_to_return = src->_need_to_return;
if (!config::enable_partitioned_aggregation) {
// Tuple pointers are allocated from tuple_data_pool_ so are transferred.
_tuple_ptrs = src->_tuple_ptrs;
src->_tuple_ptrs = nullptr;
} else {
// tuple_ptrs_ were allocated with malloc so can be swapped between batches.
std::swap(_tuple_ptrs, src->_tuple_ptrs);
}
src->transfer_resource_ownership(this);
}
void RowBatch::deep_copy_to(RowBatch* dst) {
DCHECK(dst->_row_desc.equals(_row_desc));
DCHECK_EQ(dst->_num_rows, 0);
DCHECK_GE(dst->_capacity, _num_rows);
dst->add_rows(_num_rows);
for (int i = 0; i < _num_rows; ++i) {
TupleRow* src_row = get_row(i);
TupleRow* dst_row = convert_to<TupleRow*>(dst->_tuple_ptrs + i * _num_tuples_per_row);
src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), &dst->_tuple_data_pool, false);
}
dst->commit_rows(_num_rows);
}
// TODO: consider computing size of batches as they are built up
size_t RowBatch::total_byte_size() const {
size_t result = 0;
// Sum total variable length byte sizes.
for (int i = 0; i < _num_rows; ++i) {
TupleRow* row = get_row(i);
const auto& tuple_descs = _row_desc.tuple_descriptors();
for (size_t j = 0; j < tuple_descs.size(); ++j) {
auto desc = tuple_descs[j];
Tuple* tuple = row->get_tuple(j);
if (tuple == nullptr) {
continue;
}
result += desc->byte_size();
for (auto slot : desc->string_slots()) {
DCHECK(slot->type().is_string_type());
if (tuple->is_null(slot->null_indicator_offset())) {
continue;
}
StringValue* string_val = tuple->get_string_slot(slot->tuple_offset());
result += string_val->len;
}
// compute slot collection size
for (auto slot_collection : desc->collection_slots()) {
DCHECK(slot_collection->type().is_collection_type());
if (tuple->is_null(slot_collection->null_indicator_offset())) {
continue;
}
// compute data null_signs size
CollectionValue* array_val =
tuple->get_collection_slot(slot_collection->tuple_offset());
result += array_val->length() * sizeof(bool);
const TypeDescriptor& item_type = slot_collection->type().children.at(0);
result += array_val->length() * item_type.get_slot_size();
if (!item_type.is_string_type()) {
continue;
}
// compute string type item size
for (int k = 0; k < array_val->length(); ++k) {
if (array_val->is_null_at(k)) {
continue;
}
StringValue* dst_item_v = convert_to<StringValue*>(
(uint8_t*)array_val->data() + k * item_type.get_slot_size());
result += dst_item_v->len;
}
}
}
}
return result;
}
void RowBatch::add_buffer(BufferPool::ClientHandle* client, BufferPool::BufferHandle&& buffer,
FlushMode flush) {
_auxiliary_mem_usage += buffer.len();
BufferInfo buffer_info;
buffer_info.client = client;
buffer_info.buffer = std::move(buffer);
_buffers.push_back(std::move(buffer_info));
if (flush == FlushMode::FLUSH_RESOURCES) mark_flush_resources();
}
std::string RowBatch::to_string() {
std::stringstream out;
for (int i = 0; i < _num_rows; ++i) {
out << get_row(i)->to_string(_row_desc) << "\n";
}
return out.str();
}
} // end namespace doris