From d857b4af1b42209c2715f3db33c88a6ee12ade1c Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Wed, 11 Jan 2023 09:37:35 +0800 Subject: [PATCH] [refactor](remove row batch) remove impala rowbatch structure (#15767) * [refactor](remove row batch) remove impala rowbatch structure Co-authored-by: yiguolei --- be/src/exec/exec_node.cpp | 1 - be/src/exec/table_connector.h | 1 - be/src/olap/delta_writer.cpp | 1 - be/src/runtime/CMakeLists.txt | 3 +- be/src/runtime/cache/result_cache.h | 1 - be/src/runtime/cache/result_node.h | 1 - be/src/runtime/plan_fragment_executor.cpp | 1 - be/src/runtime/row_batch.cpp | 507 ------------------ be/src/runtime/row_batch.h | 478 ----------------- be/src/runtime/tablets_channel.cpp | 1 - be/src/util/arrow/block_convertor.h | 5 +- be/src/util/arrow/row_batch.cpp | 344 ------------ be/src/util/arrow/row_batch.h | 17 - be/src/vec/core/block.cpp | 1 - .../exec/data_gen_functions/vnumbers_tvf.cpp | 1 - .../vec/exec/join/vnested_loop_join_node.cpp | 1 - be/src/vec/exec/vaggregation_node.cpp | 1 - be/src/vec/exec/vanalytic_eval_node.cpp | 1 - be/src/vec/exec/vdata_gen_scan_node.cpp | 1 - be/src/vec/exec/vmysql_scan_node.cpp | 1 - be/src/vec/exec/vschema_scan_node.cpp | 1 - be/src/vec/exec/vset_operation_node.h | 1 - be/src/vec/exec/vsort_node.cpp | 1 - be/src/vec/runtime/vsorted_run_merger.cpp | 1 - be/src/vec/sink/vresult_file_sink.cpp | 1 - be/src/vec/sink/vtablet_sink.cpp | 1 - be/src/vec/sink/vtablet_sink.h | 1 - be/test/exprs/binary_predicate_test.cpp | 1 - be/test/exprs/in_op_test.cpp | 1 - be/test/runtime/data_spliter_test.cpp | 1 - be/test/runtime/fragment_mgr_test.cpp | 1 - be/test/vec/core/block_test.cpp | 1 - be/test/vec/exec/vjson_scanner_test.cpp | 1 - be/test/vec/exec/vorc_scanner_test.cpp | 1 - be/test/vec/exec/vparquet_scanner_test.cpp | 1 - be/test/vec/exprs/vexpr_test.cpp | 27 - 36 files changed, 3 insertions(+), 1407 deletions(-) delete mode 100644 be/src/runtime/row_batch.cpp delete mode 100644 be/src/runtime/row_batch.h diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index e29f5535f9..3a2d713403 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -31,7 +31,6 @@ #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "util/debug_util.h" #include "util/runtime_profile.h" diff --git a/be/src/exec/table_connector.h b/be/src/exec/table_connector.h index 9f5330a486..6ba0c26b8b 100644 --- a/be/src/exec/table_connector.h +++ b/be/src/exec/table_connector.h @@ -28,7 +28,6 @@ #include "common/status.h" #include "exprs/expr_context.h" #include "runtime/descriptors.h" -#include "runtime/row_batch.h" #include "vec/exprs/vexpr_context.h" namespace doris { diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 22b7f568d1..ee17286b6b 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -24,7 +24,6 @@ #include "olap/schema.h" #include "olap/storage_engine.h" #include "runtime/load_channel_mgr.h" -#include "runtime/row_batch.h" #include "runtime/tuple_row.h" #include "service/backend_options.h" #include "util/brpc_client_cache.h" diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 864dad3099..b2930ffb8a 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -37,8 +37,7 @@ set(RUNTIME_FILES primitive_type.cpp raw_value.cpp result_buffer_mgr.cpp - result_writer.cpp - row_batch.cpp + result_writer.cpp runtime_state.cpp runtime_filter_mgr.cpp runtime_predicate.cpp diff --git a/be/src/runtime/cache/result_cache.h b/be/src/runtime/cache/result_cache.h index de1a3711a6..0148bad786 100644 --- a/be/src/runtime/cache/result_cache.h +++ b/be/src/runtime/cache/result_cache.h @@ -32,7 +32,6 @@ #include "runtime/cache/cache_utils.h" #include "runtime/cache/result_node.h" #include "runtime/mem_pool.h" -#include "runtime/row_batch.h" #include "runtime/tuple_row.h" namespace doris { diff --git a/be/src/runtime/cache/result_node.h b/be/src/runtime/cache/result_node.h index 9b7b8a17a3..ca1b344d9f 100644 --- a/be/src/runtime/cache/result_node.h +++ b/be/src/runtime/cache/result_node.h @@ -34,7 +34,6 @@ #include "olap/olap_define.h" #include "runtime/cache/cache_utils.h" #include "runtime/mem_pool.h" -#include "runtime/row_batch.h" #include "runtime/tuple_row.h" #include "util/uid_util.h" diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 4ff1e99c32..f36cefaf51 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -32,7 +32,6 @@ #include "runtime/memory/mem_tracker.h" #include "runtime/result_buffer_mgr.h" #include "runtime/result_queue_mgr.h" -#include "runtime/row_batch.h" #include "runtime/runtime_filter_mgr.h" #include "runtime/thread_context.h" #include "util/container_util.hpp" diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp deleted file mode 100644 index 7a86f922e9..0000000000 --- a/be/src/runtime/row_batch.cpp +++ /dev/null @@ -1,507 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// This file is copied from -// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/row-batch.cc -// and modified by Doris - -#include "runtime/row_batch.h" - -#include -#include // for intptr_t - -#include "common/utils.h" -#include "gen_cpp/Data_types.h" -#include "gen_cpp/data.pb.h" -#include "runtime/collection_value.h" -#include "runtime/exec_env.h" -#include "runtime/runtime_state.h" -#include "runtime/string_value.h" -#include "runtime/thread_context.h" -#include "runtime/tuple_row.h" -#include "util/exception.h" -#include "vec/columns/column_vector.h" -#include "vec/core/block.h" - -using std::vector; - -namespace doris { - -const int RowBatch::AT_CAPACITY_MEM_USAGE = 8 * 1024 * 1024; -const int RowBatch::FIXED_LEN_BUFFER_LIMIT = AT_CAPACITY_MEM_USAGE / 2; - -RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity) - : _has_in_flight_row(false), - _num_rows(0), - _num_uncommitted_rows(0), - _capacity(capacity), - _flush(FlushMode::NO_FLUSH_RESOURCES), - _needs_deep_copy(false), - _num_tuples_per_row(row_desc.tuple_descriptors().size()), - _row_desc(row_desc), - _auxiliary_mem_usage(0), - _need_to_return(false), - _tuple_data_pool() { - DCHECK_GT(capacity, 0); - _tuple_ptrs_size = _capacity * _num_tuples_per_row * sizeof(Tuple*); - DCHECK_GT(_tuple_ptrs_size, 0); - _tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size)); - DCHECK(_tuple_ptrs != nullptr); -} - -// TODO: we want our input_batch's tuple_data to come from our (not yet implemented) -// global runtime memory segment; how do we get thrift to allocate it from there? -// maybe change line (in Data_types.cc generated from Data.thrift) -// xfer += iprot->readString(this->tuple_data[_i9]); -// to allocated string data in special mempool -// (change via python script that runs over Data_types.cc) -RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch) - : _has_in_flight_row(false), - _num_rows(input_batch.num_rows()), - _num_uncommitted_rows(0), - _capacity(_num_rows), - _flush(FlushMode::NO_FLUSH_RESOURCES), - _needs_deep_copy(false), - _num_tuples_per_row(input_batch.row_tuples_size()), - _row_desc(row_desc), - _auxiliary_mem_usage(0), - _need_to_return(false) { - _tuple_ptrs_size = _num_rows * _num_tuples_per_row * sizeof(Tuple*); - DCHECK_GT(_tuple_ptrs_size, 0); - _tuple_ptrs = (Tuple**)(malloc(_tuple_ptrs_size)); - DCHECK(_tuple_ptrs != nullptr); - - char* tuple_data = nullptr; - if (input_batch.is_compressed()) { - // Decompress tuple data into data pool - const char* compressed_data = input_batch.tuple_data().c_str(); - size_t compressed_size = input_batch.tuple_data().size(); - size_t uncompressed_size = 0; - bool success = - snappy::GetUncompressedLength(compressed_data, compressed_size, &uncompressed_size); - DCHECK(success) << "snappy::GetUncompressedLength failed"; - tuple_data = (char*)_tuple_data_pool.allocate(uncompressed_size); - success = snappy::RawUncompress(compressed_data, compressed_size, tuple_data); - DCHECK(success) << "snappy::RawUncompress failed"; - } else { - // Tuple data uncompressed, copy directly into data pool - tuple_data = (char*)_tuple_data_pool.allocate(input_batch.tuple_data().size()); - memcpy(tuple_data, input_batch.tuple_data().c_str(), input_batch.tuple_data().size()); - } - - // convert input_batch.tuple_offsets into pointers - int tuple_idx = 0; - // For historical reasons, the original offset was stored using int32, - // so that if a rowbatch is larger than 2GB, the passed offset may generate an error due to value overflow. - // So in the new version, a new_tuple_offsets structure is added to store offsets using int64. - // Here, to maintain compatibility, both versions of offsets are used, with preference given to new_tuple_offsets. - // TODO(cmy): in the next version, the original tuple_offsets should be removed. - if (input_batch.new_tuple_offsets_size() > 0) { - for (int64_t offset : input_batch.new_tuple_offsets()) { - if (offset == -1) { - _tuple_ptrs[tuple_idx++] = nullptr; - } else { - _tuple_ptrs[tuple_idx++] = convert_to(tuple_data + offset); - } - } - } else { - for (int32_t offset : input_batch.tuple_offsets()) { - if (offset == -1) { - _tuple_ptrs[tuple_idx++] = nullptr; - } else { - _tuple_ptrs[tuple_idx++] = convert_to(tuple_data + offset); - } - } - } - - // Check whether we have slots that require offset-to-pointer conversion. - if (!_row_desc.has_varlen_slots()) { - return; - } - - const auto& tuple_descs = _row_desc.tuple_descriptors(); - - // For every unique tuple, convert string offsets contained in tuple data into - // pointers. Tuples were serialized in the order we are deserializing them in, - // so the first occurrence of a tuple will always have a higher offset than any tuple - // we already converted. - for (int i = 0; i < _num_rows; ++i) { - TupleRow* row = get_row(i); - for (size_t j = 0; j < tuple_descs.size(); ++j) { - auto desc = tuple_descs[j]; - if (desc->string_slots().empty() && desc->collection_slots().empty()) { - continue; - } - - Tuple* tuple = row->get_tuple(j); - if (tuple == nullptr) { - continue; - } - - for (auto slot : desc->string_slots()) { - DCHECK(slot->type().is_string_type()); - if (tuple->is_null(slot->null_indicator_offset())) { - continue; - } - - StringValue* string_val = tuple->get_string_slot(slot->tuple_offset()); - int64_t offset = convert_to(string_val->ptr); - string_val->ptr = tuple_data + offset; - - // Why we do this mask? Field len of StringValue is changed from int to size_t in - // Doris 0.11. When upgrading, some bits of len sent from 0.10 is random value, - // this works fine in version 0.10, however in 0.11 this will lead to an invalid - // length. So we make the high bits zero here. - string_val->len &= 0x7FFFFFFFL; - } - - // copy collection slots - for (auto slot_collection : desc->collection_slots()) { - DCHECK(slot_collection->type().is_collection_type()); - if (tuple->is_null(slot_collection->null_indicator_offset())) { - continue; - } - - CollectionValue* array_val = - tuple->get_collection_slot(slot_collection->tuple_offset()); - const auto& item_type_desc = slot_collection->type().children[0]; - CollectionValue::deserialize_collection(array_val, tuple_data, item_type_desc); - } - } - } -} - -void RowBatch::clear() { - if (_cleared) { - return; - } - - _tuple_data_pool.free_all(); - _agg_object_pool.clear(); - for (int i = 0; i < _io_buffers.size(); ++i) { - _io_buffers[i]->return_buffer(); - } - - for (BufferInfo& buffer_info : _buffers) { - ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(buffer_info.client, &buffer_info.buffer); - } - - DCHECK(_tuple_ptrs != nullptr); - free(_tuple_ptrs); - _tuple_ptrs = nullptr; - _cleared = true; -} - -RowBatch::~RowBatch() { - clear(); -} - -static inline size_t align_tuple_offset(size_t offset) { - if (config::rowbatch_align_tuple_offset) { - return (offset + alignof(std::max_align_t) - 1) & (~(alignof(std::max_align_t) - 1)); - } - - return offset; -} - -Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size, - size_t* compressed_size, bool allow_transfer_large_data) { - // num_rows - output_batch->set_num_rows(_num_rows); - // row_tuples - _row_desc.to_protobuf(output_batch->mutable_row_tuples()); - // tuple_offsets: must clear before reserve - // TODO(cmy): the tuple_offsets should be removed after v1.1.0, use new_tuple_offsets instead. - // keep tuple_offsets here is just for compatibility. - output_batch->clear_tuple_offsets(); - output_batch->mutable_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row); - output_batch->clear_new_tuple_offsets(); - output_batch->mutable_new_tuple_offsets()->Reserve(_num_rows * _num_tuples_per_row); - // is_compressed - output_batch->set_is_compressed(false); - // tuple data - size_t tuple_byte_size = total_byte_size(); - std::string* mutable_tuple_data = output_batch->mutable_tuple_data(); - mutable_tuple_data->resize(tuple_byte_size); - - // Copy tuple data, including strings, into output_batch (converting string - // pointers into offsets in the process) - int64_t offset = 0; // current offset into output_batch->tuple_data - char* tuple_data = mutable_tuple_data->data(); - const auto& tuple_descs = _row_desc.tuple_descriptors(); - const auto& mutable_tuple_offsets = output_batch->mutable_tuple_offsets(); - const auto& mutable_new_tuple_offsets = output_batch->mutable_new_tuple_offsets(); - - for (int i = 0; i < _num_rows; ++i) { - TupleRow* row = get_row(i); - for (size_t j = 0; j < tuple_descs.size(); ++j) { - auto desc = tuple_descs[j]; - if (row->get_tuple(j) == nullptr) { - // NULLs are encoded as -1 - mutable_tuple_offsets->Add(-1); - mutable_new_tuple_offsets->Add(-1); - continue; - } - - int64_t old_offset = offset; - offset = align_tuple_offset(offset); - tuple_data += offset - old_offset; - - // Record offset before creating copy (which increments offset and tuple_data) - mutable_tuple_offsets->Add((int32_t)offset); - mutable_new_tuple_offsets->Add(offset); - row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true); - CHECK_GE(offset, 0); - } - } - CHECK_EQ(offset, tuple_byte_size) - << "offset: " << offset << " vs. tuple_byte_size: " << tuple_byte_size; - - size_t max_compressed_size = snappy::MaxCompressedLength(tuple_byte_size); - bool can_compress = config::compress_rowbatches && tuple_byte_size > 0; - if (can_compress) { - try { - // Allocation of extra-long contiguous memory may fail, and data compression cannot be used if it fails - _compression_scratch.resize(max_compressed_size); - } catch (...) { - can_compress = false; - std::exception_ptr p = std::current_exception(); - LOG(WARNING) << "Try to alloc " << max_compressed_size - << " bytes for compression scratch failed. " - << get_current_exception_type_name(p); - } - } - if (can_compress) { - // Try compressing tuple_data to _compression_scratch, swap if compressed data is - // smaller - size_t compressed_size = 0; - char* compressed_output = _compression_scratch.data(); - snappy::RawCompress(mutable_tuple_data->data(), tuple_byte_size, compressed_output, - &compressed_size); - if (LIKELY(compressed_size < tuple_byte_size)) { - _compression_scratch.resize(compressed_size); - mutable_tuple_data->swap(_compression_scratch); - output_batch->set_is_compressed(true); - } - - VLOG_ROW << "uncompressed tuple_byte_size: " << tuple_byte_size - << ", compressed size: " << compressed_size; - } - - // return compressed and uncompressed size - size_t pb_size = get_batch_size(*output_batch); - *uncompressed_size = pb_size - mutable_tuple_data->size() + tuple_byte_size; - *compressed_size = pb_size; - if (!allow_transfer_large_data && pb_size > std::numeric_limits::max()) { - // the protobuf has a hard limit of 2GB for serialized data. - return Status::InternalError( - "The rowbatch is large than 2GB({}), can not send by Protobuf.", pb_size); - } - return Status::OK(); -} - -// when row from files can't fill into tuple with schema limitation, increase the _num_uncommitted_rows in row batch, -void RowBatch::increase_uncommitted_rows() { - _num_uncommitted_rows++; -} - -void RowBatch::add_io_buffer(DiskIoMgr::BufferDescriptor* buffer) { - DCHECK(buffer != nullptr); - _io_buffers.push_back(buffer); - _auxiliary_mem_usage += buffer->buffer_len(); -} - -Status RowBatch::resize_and_allocate_tuple_buffer(RuntimeState* state, int64_t* tuple_buffer_size, - uint8_t** buffer) { - int64_t row_size = _row_desc.get_row_size(); - // Avoid divide-by-zero. Don't need to modify capacity for empty rows anyway. - if (row_size != 0) { - _capacity = std::max(1, std::min(_capacity, FIXED_LEN_BUFFER_LIMIT / row_size)); - } - *tuple_buffer_size = row_size * _capacity; - // TODO(dhc): change allocate to try_allocate? - *buffer = _tuple_data_pool.allocate(*tuple_buffer_size); - if (*buffer == nullptr) { - std::stringstream ss; - ss << "Failed to allocate tuple buffer" << *tuple_buffer_size; - LOG(WARNING) << ss.str(); - return state->set_mem_limit_exceeded(ss.str()); - } - return Status::OK(); -} - -void RowBatch::reset() { - _num_rows = 0; - _capacity = _tuple_ptrs_size / (_num_tuples_per_row * sizeof(Tuple*)); - _has_in_flight_row = false; - - // TODO: Change this to Clear() and investigate the repercussions. - _tuple_data_pool.free_all(); - _agg_object_pool.clear(); - for (int i = 0; i < _io_buffers.size(); ++i) { - _io_buffers[i]->return_buffer(); - } - _io_buffers.clear(); - - for (BufferInfo& buffer_info : _buffers) { - ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(buffer_info.client, &buffer_info.buffer); - } - _buffers.clear(); - - _auxiliary_mem_usage = 0; - _need_to_return = false; - _flush = FlushMode::NO_FLUSH_RESOURCES; - _needs_deep_copy = false; -} - -void RowBatch::transfer_resource_ownership(RowBatch* dest) { - dest->_auxiliary_mem_usage += _tuple_data_pool.total_allocated_bytes(); - dest->_tuple_data_pool.acquire_data(&_tuple_data_pool, false); - dest->_agg_object_pool.acquire_data(&_agg_object_pool); - for (int i = 0; i < _io_buffers.size(); ++i) { - DiskIoMgr::BufferDescriptor* buffer = _io_buffers[i]; - dest->_io_buffers.push_back(buffer); - dest->_auxiliary_mem_usage += buffer->buffer_len(); - } - _io_buffers.clear(); - - for (BufferInfo& buffer_info : _buffers) { - dest->add_buffer(buffer_info.client, std::move(buffer_info.buffer), - FlushMode::NO_FLUSH_RESOURCES); - } - _buffers.clear(); - - dest->_need_to_return |= _need_to_return; - - if (_needs_deep_copy) { - dest->mark_needs_deep_copy(); - } else if (_flush == FlushMode::FLUSH_RESOURCES) { - dest->mark_flush_resources(); - } - reset(); -} - -size_t RowBatch::get_batch_size(const PRowBatch& batch) { - size_t result = batch.tuple_data().size(); - result += batch.row_tuples().size() * sizeof(int32_t); - // TODO(cmy): remove batch.tuple_offsets - result += batch.tuple_offsets().size() * sizeof(int32_t); - result += batch.new_tuple_offsets().size() * sizeof(int64_t); - return result; -} - -void RowBatch::acquire_state(RowBatch* src) { - // DCHECK(_row_desc.equals(src->_row_desc)); - DCHECK_EQ(_num_tuples_per_row, src->_num_tuples_per_row); - // DCHECK_EQ(_tuple_ptrs_size, src->_tuple_ptrs_size); - DCHECK_EQ(_auxiliary_mem_usage, 0); - - // The destination row batch should be empty. - DCHECK(!_has_in_flight_row); - DCHECK_EQ(_num_rows, 0); - - for (int i = 0; i < src->_io_buffers.size(); ++i) { - DiskIoMgr::BufferDescriptor* buffer = src->_io_buffers[i]; - _io_buffers.push_back(buffer); - _auxiliary_mem_usage += buffer->buffer_len(); - } - src->_io_buffers.clear(); - src->_auxiliary_mem_usage = 0; - - _has_in_flight_row = src->_has_in_flight_row; - _num_rows = src->_num_rows; - _capacity = src->_capacity; - _need_to_return = src->_need_to_return; - // tuple_ptrs_ were allocated with malloc so can be swapped between batches. - std::swap(_tuple_ptrs, src->_tuple_ptrs); - src->transfer_resource_ownership(this); -} - -void RowBatch::deep_copy_to(RowBatch* dst) { - DCHECK(dst->_row_desc.equals(_row_desc)); - DCHECK_EQ(dst->_num_rows, 0); - DCHECK_GE(dst->_capacity, _num_rows); - dst->add_rows(_num_rows); - for (int i = 0; i < _num_rows; ++i) { - TupleRow* src_row = get_row(i); - TupleRow* dst_row = convert_to(dst->_tuple_ptrs + i * _num_tuples_per_row); - src_row->deep_copy(dst_row, _row_desc.tuple_descriptors(), &dst->_tuple_data_pool, false); - } - dst->commit_rows(_num_rows); -} - -// TODO: consider computing size of batches as they are built up -size_t RowBatch::total_byte_size() const { - size_t result = 0; - - // Sum total variable length byte sizes. - for (int i = 0; i < _num_rows; ++i) { - TupleRow* row = get_row(i); - const auto& tuple_descs = _row_desc.tuple_descriptors(); - for (size_t j = 0; j < tuple_descs.size(); ++j) { - auto desc = tuple_descs[j]; - Tuple* tuple = row->get_tuple(j); - if (tuple == nullptr) { - continue; - } - result = align_tuple_offset(result); - result += desc->byte_size(); - - for (auto slot : desc->string_slots()) { - DCHECK(slot->type().is_string_type()); - if (tuple->is_null(slot->null_indicator_offset())) { - continue; - } - StringValue* string_val = tuple->get_string_slot(slot->tuple_offset()); - result += string_val->len; - } - - // compute slot collection size - for (auto slot_collection : desc->collection_slots()) { - DCHECK(slot_collection->type().is_collection_type()); - if (tuple->is_null(slot_collection->null_indicator_offset())) { - continue; - } - CollectionValue* array_val = - tuple->get_collection_slot(slot_collection->tuple_offset()); - const auto& item_type_desc = slot_collection->type().children[0]; - result += array_val->get_byte_size(item_type_desc); - } - } - } - - return result; -} - -void RowBatch::add_buffer(BufferPool::ClientHandle* client, BufferPool::BufferHandle&& buffer, - FlushMode flush) { - _auxiliary_mem_usage += buffer.len(); - BufferInfo buffer_info; - buffer_info.client = client; - buffer_info.buffer = std::move(buffer); - _buffers.push_back(std::move(buffer_info)); - if (flush == FlushMode::FLUSH_RESOURCES) mark_flush_resources(); -} - -std::string RowBatch::to_string() { - std::stringstream out; - for (int i = 0; i < _num_rows; ++i) { - out << get_row(i)->to_string(_row_desc) << "\n"; - } - return out.str(); -} - -} // end namespace doris diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h deleted file mode 100644 index bf920e1905..0000000000 --- a/be/src/runtime/row_batch.h +++ /dev/null @@ -1,478 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. -// This file is copied from -// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/row-batch.h -// and modified by Doris - -#pragma once - -#include -#include - -#include "common/logging.h" -#include "runtime/bufferpool/buffer_pool.h" -#include "runtime/descriptors.h" -#include "runtime/disk_io_mgr.h" -#include "runtime/mem_pool.h" - -namespace doris::vectorized { -class Block; -} - -namespace doris { - -class BufferedTupleStream2; -class Tuple; -class TupleRow; -class TupleDescriptor; -class PRowBatch; - -// A RowBatch encapsulates a batch of rows, each composed of a number of tuples. -// The maximum number of rows is fixed at the time of construction, and the caller -// can add rows up to that capacity. -// The row batch reference a few different sources of memory. -// 1. TupleRow ptrs - this is always owned and managed by the row batch. -// 2. Tuple memory - this is allocated (or transferred to) the row batches tuple pool. -// 3. Auxiliary tuple memory (e.g. string data) - this can either be stored externally -// (don't copy strings) or from the tuple pool (strings are copied). If external, -// the data is in an io buffer that may not be attached to this row batch. The -// creator of that row batch has to make sure that the io buffer is not recycled -// until all batches that reference the memory have been consumed. -// In order to minimize memory allocations, RowBatches and PRowBatches that have been -// serialized and sent over the wire should be reused (this prevents _compression_scratch -// from being needlessly reallocated). -// -// Row batches and memory usage: We attempt to stream row batches through the plan -// tree without copying the data. This means that row batches are often not-compact -// and reference memory outside of the row batch. This results in most row batches -// having a very small memory footprint and in some row batches having a very large -// one (it contains all the memory that other row batches are referencing). An example -// is IoBuffers which are only attached to one row batch. Only when the row batch reaches -// a blocking operator or the root of the fragment is the row batch memory freed. -// This means that in some cases (e.g. very selective queries), we still need to -// pass the row batch through the exec nodes (even if they have no rows) to trigger -// memory deletion. at_capacity() encapsulates the check that we are not accumulating -// excessive memory. -// -// A row batch is considered at capacity if all the rows are full or it has accumulated -// auxiliary memory up to a soft cap. (See _at_capacity_mem_usage comment). -// TODO: stick _tuple_ptrs into a pool? -class RowBatch { -public: - /// Flag indicating whether the resources attached to a RowBatch need to be flushed. - /// Defined here as a convenience for other modules that need to communicate flushing - /// modes. - enum class FlushMode { - FLUSH_RESOURCES, - NO_FLUSH_RESOURCES, - }; - - // Create RowBatch for a maximum of 'capacity' rows of tuples specified - // by 'row_desc'. - RowBatch(const RowDescriptor& row_desc, int capacity); - - // Populate a row batch from input_batch by copying input_batch's - // tuple_data into the row batch's mempool and converting all offsets - // in the data back into pointers. - // TODO: figure out how to transfer the data from input_batch to this RowBatch - // (so that we don't need to make yet another copy) - RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch); - - // Releases all resources accumulated at this row batch. This includes - // - tuple_ptrs - // - tuple mem pool data - // - buffer handles from the io mgr - virtual ~RowBatch(); - - // used to c - void clear(); - - static const int INVALID_ROW_INDEX = -1; - - // Add n rows of tuple pointers after the last committed row and return its index. - // The rows are uninitialized and each tuple of the row must be set. - // Returns INVALID_ROW_INDEX if the row batch cannot fit n rows. - // Two consecutive add_row() calls without a commit_last_row() between them - // have the same effect as a single call. - int add_rows(int n) { - if (_num_rows + n > _capacity) { - return INVALID_ROW_INDEX; - } - - _has_in_flight_row = true; - return _num_rows; - } - - int add_row() { return add_rows(1); } - - void commit_rows(int n) { - DCHECK_GE(n, 0); - DCHECK_LE(_num_rows + n, _capacity); - _num_rows += n; - _has_in_flight_row = false; - } - - void commit_last_row() { commit_rows(1); } - - bool in_flight() const { return _has_in_flight_row; } - - // Set function can be used to reduce the number of rows in the batch. This is only - // used in the limit case where more rows were added than necessary. - void set_num_rows(int num_rows) { - DCHECK_LE(num_rows, _num_rows); - _num_rows = num_rows; - } - - // Returns true if the row batch has filled all the rows or has accumulated - // enough memory. - bool at_capacity() const { - return _num_rows == _capacity || _auxiliary_mem_usage >= AT_CAPACITY_MEM_USAGE || - _need_to_return; - } - - // Returns true if the row batch has filled all the rows or has accumulated - // enough memory. tuple_pool is an intermediate memory pool containing tuple data - // that will eventually be attached to this row batch. We need to make sure - // the tuple pool does not accumulate excessive memory. - bool at_capacity(const MemPool* tuple_pool) const { - DCHECK(tuple_pool != nullptr); - return at_capacity() || tuple_pool->total_allocated_bytes() > AT_CAPACITY_MEM_USAGE; - } - - // Returns true if row_batch has reached capacity. - bool is_full() const { return _num_rows == _capacity; } - - // Returns true if uncommitted rows has reached capacity. - bool is_full_uncommitted() { return _num_uncommitted_rows == _capacity; } - - // Returns true if the row batch has accumulated enough external memory (in MemPools - // and io buffers). This would be a trigger to compact the row batch or reclaim - // the memory in some way. - bool at_resource_limit() { - return tuple_data_pool()->total_allocated_bytes() > MAX_MEM_POOL_SIZE; - } - - // The total size of all data represented in this row batch (tuples and referenced - // string data). - size_t total_byte_size() const; - - TupleRow* get_row(int row_idx) const { - DCHECK(_tuple_ptrs != nullptr); - DCHECK_GE(row_idx, 0); - //DCHECK_LT(row_idx, _num_rows + (_has_in_flight_row ? 1 : 0)); - return reinterpret_cast(_tuple_ptrs + row_idx * _num_tuples_per_row); - } - - /// An iterator for going through a row batch, starting at 'row_idx'. - /// If 'limit' is specified, it will iterate up to row number 'row_idx + limit' - /// or the last row, whichever comes first. Otherwise, it will iterate till the last - /// row in the batch. This is more efficient than using GetRow() as it avoids loading - /// the row batch state and doing multiplication on each loop with GetRow(). - class Iterator { - public: - Iterator(RowBatch* parent, int row_idx, int limit = -1) - : _num_tuples_per_row(parent->_num_tuples_per_row), - _row(parent->_tuple_ptrs + _num_tuples_per_row * row_idx), - _row_batch_end(parent->_tuple_ptrs + - _num_tuples_per_row * - (limit == -1 ? parent->_num_rows - : std::min(row_idx + limit, - parent->_num_rows))), - _parent(parent) { - DCHECK_GE(row_idx, 0); - DCHECK_GT(_num_tuples_per_row, 0); - /// We allow empty row batches with _num_rows == capacity_ == 0. - /// That's why we cannot call GetRow() above to initialize '_row'. - DCHECK_LE(row_idx, parent->_capacity); - } - - /// Return the current row pointed to by the row pointer. - TupleRow* get() { return reinterpret_cast(_row); } - - /// Increment the row pointer and return the next row. - TupleRow* next() { - _row += _num_tuples_per_row; - DCHECK_LE((_row - _parent->_tuple_ptrs) / _num_tuples_per_row, _parent->_capacity); - return get(); - } - - /// Returns true if the iterator is beyond the last row for read iterators. - /// Useful for read iterators to determine the limit. Write iterators should use - /// RowBatch::AtCapacity() instead. - bool at_end() const { return _row >= _row_batch_end; } - - /// Returns the row batch which this iterator is iterating through. - RowBatch* parent() const { return _parent; } - - private: - /// Number of tuples per row. - const int _num_tuples_per_row; - - /// Pointer to the current row. - Tuple** _row; - - /// Pointer to the row after the last row for read iterators. - Tuple** const _row_batch_end; - - /// The row batch being iterated on. - RowBatch* const _parent; - }; - - int num_tuples_per_row() const { return _num_tuples_per_row; } - int row_byte_size() const { return _num_tuples_per_row * sizeof(Tuple*); } - MemPool* tuple_data_pool() { return &_tuple_data_pool; } - ObjectPool* agg_object_pool() { return &_agg_object_pool; } - int num_io_buffers() const { return _io_buffers.size(); } - - // increase # of uncommitted rows - void increase_uncommitted_rows(); - - // Resets the row batch, returning all resources it has accumulated. - void reset(); - - // Add io buffer to this row batch. - void add_io_buffer(DiskIoMgr::BufferDescriptor* buffer); - - // Add tuple stream to this row batch. The row batch takes ownership of the stream - // and will call Close() on the stream and delete it when freeing resources. - void add_tuple_stream(BufferedTupleStream2* stream); - - /// Adds a buffer to this row batch. The buffer is deleted when freeing resources. - /// The buffer's memory remains accounted against the original owner, even when the - /// ownership of batches is transferred. If the original owner wants the memory to be - /// released, it should call this with 'mode' FLUSH_RESOURCES (see MarkFlushResources() - /// for further explanation). - /// TODO: IMPALA-4179: after IMPALA-3200, simplify the ownership transfer model and - /// make it consistent between buffers and I/O buffers. - void add_buffer(BufferPool::ClientHandle* client, BufferPool::BufferHandle&& buffer, - FlushMode flush); - - // Called to indicate this row batch must be returned up the operator tree. - // This is used to control memory management for streaming rows. - // TODO: consider using this mechanism instead of add_io_buffer/add_tuple_stream. This is - // the property we need rather than meticulously passing resources up so the operator - // tree. - void mark_need_to_return() { _need_to_return = true; } - - bool need_to_return() const { return _need_to_return; } - - /// Used by an operator to indicate that it cannot produce more rows until the - /// resources that it has attached to the row batch are freed or acquired by an - /// ancestor operator. After this is called, the batch is at capacity and no more rows - /// can be added. The "flush" mark is transferred by TransferResourceOwnership(). This - /// ensures that batches are flushed by streaming operators all the way up the operator - /// tree. Blocking operators can still accumulate batches with this flag. - /// TODO: IMPALA-3200: blocking operators should acquire all memory resources including - /// attached blocks/buffers, so that MarkFlushResources() can guarantee that the - /// resources will not be accounted against the original operator (this is currently - /// not true for Blocks, which can't be transferred). - void mark_flush_resources() { - DCHECK_LE(_num_rows, _capacity); - _capacity = _num_rows; - _flush = FlushMode::FLUSH_RESOURCES; - } - - /// Called to indicate that some resources backing this batch were not attached and - /// will be cleaned up after the next GetNext() call. This means that the batch must - /// be returned up the operator tree. Blocking operators must deep-copy any rows from - /// this batch or preceding batches. - /// - /// This is a stronger version of MarkFlushResources(), because blocking operators - /// are not allowed to accumulate batches with the 'needs_deep_copy' flag. - /// TODO: IMPALA-4179: always attach backing resources and remove this flag. - void mark_needs_deep_copy() { - mark_flush_resources(); // No more rows should be added to the batch. - _needs_deep_copy = true; - } - - bool needs_deep_copy() const { return _needs_deep_copy; } - - // Transfer ownership of resources to dest. This includes tuple data in mem - // pool and io buffers. - // we firstly update dest resource, and then reset current resource - void transfer_resource_ownership(RowBatch* dest); - - void copy_row(TupleRow* src, TupleRow* dest) { - memcpy(dest, src, _num_tuples_per_row * sizeof(Tuple*)); - } - - // Copy 'num_rows' rows from 'src' to 'dest' within the batch. Useful for exec - // nodes that skip an offset and copied more than necessary. - void copy_rows(int dest, int src, int num_rows) { - DCHECK_LE(dest, src); - DCHECK_LE(src + num_rows, _capacity); - memmove(_tuple_ptrs + _num_tuples_per_row * dest, _tuple_ptrs + _num_tuples_per_row * src, - num_rows * _num_tuples_per_row * sizeof(Tuple*)); - } - - void clear_row(TupleRow* row) { memset(row, 0, _num_tuples_per_row * sizeof(Tuple*)); } - - // Acquires state from the 'src' row batch into this row batch. This includes all IO - // buffers and tuple data. - // This row batch must be empty and have the same row descriptor as the src batch. - // This is used for scan nodes which produce RowBatches asynchronously. Typically, - // an ExecNode is handed a row batch to populate (pull model) but ScanNodes have - // multiple threads which push row batches. - // TODO: this is wasteful and makes a copy that's unnecessary. Think about cleaning - // this up. - // TODO: rename this or unify with TransferResourceOwnership() - void acquire_state(RowBatch* src); - - // Deep copy all rows this row batch into dst, using memory allocated from - // dst's _tuple_data_pool. Only valid when dst is empty. - // TODO: the current implementation of deep copy can produce an oversized - // row batch if there are duplicate tuples in this row batch. - void deep_copy_to(RowBatch* dst); - - // Create a serialized version of this row batch in output_batch, attaching all of the - // data it references to output_batch.tuple_data. output_batch.tuple_data will be - // snappy-compressed unless the compressed data is larger than the uncompressed - // data. Use output_batch.is_compressed to determine whether tuple_data is compressed. - // If an in-flight row is present in this row batch, it is ignored. - // This function does not reset(). - // Returns the uncompressed serialized size (this will be the true size of output_batch - // if tuple_data is actually uncompressed). - Status serialize(PRowBatch* output_batch, size_t* uncompressed_size, size_t* compressed_size, - bool allow_transfer_large_data = false); - - // Utility function: returns total size of batch. - static size_t get_batch_size(const PRowBatch& batch); - - int num_rows() const { return _num_rows; } - int capacity() const { return _capacity; } - - int num_buffers() const { return _buffers.size(); } - - const RowDescriptor& row_desc() const { return _row_desc; } - - // Max memory that this row batch can accumulate in _tuple_data_pool before it - // is considered at capacity. - /// This is a soft capacity: row batches may exceed the capacity, preferably only by a - /// row's worth of data. - static const int AT_CAPACITY_MEM_USAGE; - - // Max memory out of AT_CAPACITY_MEM_USAGE that should be used for fixed-length data, - // in order to leave room for variable-length data. - static const int FIXED_LEN_BUFFER_LIMIT; - - /// Allocates a buffer large enough for the fixed-length portion of 'capacity_' rows in - /// this batch from 'tuple_data_pool_'. 'capacity_' is reduced if the allocation would - /// exceed FIXED_LEN_BUFFER_LIMIT. Always returns enough space for at least one row. - /// Returns Status::MemoryLimitExceeded("Memory limit exceeded") and sets 'buffer' to nullptr if a memory limit would - /// have been exceeded. 'state' is used to log the error. - /// On success, sets 'buffer_size' to the size in bytes and 'buffer' to the buffer. - Status resize_and_allocate_tuple_buffer(RuntimeState* state, int64_t* buffer_size, - uint8_t** buffer); - - void set_scanner_id(int id) { _scanner_id = id; } - int scanner_id() const { return _scanner_id; } - - static const int MAX_MEM_POOL_SIZE = 32 * 1024 * 1024; - std::string to_string(); - -private: - // All members need to be handled in RowBatch::swap() - - bool _has_in_flight_row; // if true, last row hasn't been committed yet - int _num_rows; // # of committed rows - int _num_uncommitted_rows; // # of uncommited rows in row batch mem pool - int _capacity; // maximum # of rows - - /// If FLUSH_RESOURCES, the resources attached to this batch should be freed or - /// acquired by a new owner as soon as possible. See MarkFlushResources(). If - /// FLUSH_RESOURCES, AtCapacity() is also true. - FlushMode _flush; - - /// If true, this batch references unowned memory that will be cleaned up soon. - /// See MarkNeedsDeepCopy(). If true, 'flush_' is FLUSH_RESOURCES and - /// AtCapacity() is true. - bool _needs_deep_copy; - - int _num_tuples_per_row; - RowDescriptor _row_desc; - - // Array of pointers with _capacity * _num_tuples_per_row elements. - // The memory ownership depends on whether legacy joins and aggs are enabled. - // - // Memory is malloc'd and owned by RowBatch: - // If enable_partitioned_hash_join=true and enable_partitioned_aggregation=true - // then the memory is owned by this RowBatch and is freed upon its destruction. - // This mode is more performant especially with SubplanNodes in the ExecNode tree - // because the tuple pointers are not transferred and do not have to be re-created - // in every Reset(). - // - // Memory is allocated from MemPool: - // Otherwise, the memory is allocated from _tuple_data_pool. As a result, the - // pointer memory is transferred just like tuple data, and must be re-created - // in Reset(). This mode is required for the legacy join and agg which rely on - // the tuple pointers being allocated from the _tuple_data_pool, so they can - // acquire ownership of the tuple pointers. - Tuple** _tuple_ptrs; - int _tuple_ptrs_size; - - // Sum of all auxiliary bytes. This includes IoBuffers and memory from - // TransferResourceOwnership(). - int64_t _auxiliary_mem_usage; - - // If true, this batch is considered at capacity. This is explicitly set by streaming - // components that return rows via row batches. - bool _need_to_return; - - // holding (some of the) data referenced by rows - MemPool _tuple_data_pool; - - // holding some complex agg object data (bitmap, hll) - ObjectPool _agg_object_pool; - - // IO buffers current owned by this row batch. Ownership of IO buffers transfer - // between row batches. Any IO buffer will be owned by at most one row batch - // (i.e. they are not ref counted) so most row batches don't own any. - std::vector _io_buffers; - - struct BufferInfo { - BufferPool::ClientHandle* client; - BufferPool::BufferHandle buffer; - }; - /// Pages attached to this row batch. See AddBuffer() for ownership semantics. - std::vector _buffers; - - // String to write compressed tuple data to in serialize(). - // This is a string so we can swap() with the string in the PRowBatch we're serializing - // to (we don't compress directly into the PRowBatch in case the compressed data is - // longer than the uncompressed data). Swapping avoids copying data to the PRowBatch and - // avoids excess memory allocations: since we reuse RowBatches and PRowBatchs, and - // assuming all row batches are roughly the same size, all strings will eventually be - // allocated to the right size. - std::string _compression_scratch; - - int _scanner_id; - bool _cleared = false; -}; - -/// Macros for iterating through '_row_batch', starting at '_start_row_idx'. -/// '_row_batch' is the row batch to iterate through. -/// '_start_row_idx' is the starting row index. -/// '_iter' is the iterator. -/// '_limit' is the max number of rows to iterate over. -#define FOREACH_ROW(_row_batch, _start_row_idx, _iter) \ - for (RowBatch::Iterator _iter(_row_batch, _start_row_idx); !_iter.at_end(); _iter.next()) - -#define FOREACH_ROW_LIMIT(_row_batch, _start_row_idx, _limit, _iter) \ - for (RowBatch::Iterator _iter(_row_batch, _start_row_idx, _limit); !_iter.at_end(); \ - _iter.next()) - -} // namespace doris diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 089b5ba4be..23045b997a 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -22,7 +22,6 @@ #include "olap/memtable.h" #include "olap/storage_engine.h" #include "runtime/load_channel.h" -#include "runtime/row_batch.h" #include "util/doris_metrics.h" namespace doris { diff --git a/be/src/util/arrow/block_convertor.h b/be/src/util/arrow/block_convertor.h index bc4aae037e..c76ae73c9f 100644 --- a/be/src/util/arrow/block_convertor.h +++ b/be/src/util/arrow/block_convertor.h @@ -22,8 +22,8 @@ #include "common/status.h" #include "vec/core/block.h" -// This file will convert Doris RowBatch to/from Arrow's RecordBatch -// RowBatch is used by Doris query engine to exchange data between +// This file will convert Doris Block to/from Arrow's RecordBatch +// Block is used by Doris query engine to exchange data between // each execute node. namespace arrow { @@ -37,7 +37,6 @@ class Schema; namespace doris { class ObjectPool; -class RowBatch; class RowDescriptor; Status convert_to_arrow_batch(const vectorized::Block& block, diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp index 46eca71b6a..01b930b52e 100644 --- a/be/src/util/arrow/row_batch.cpp +++ b/be/src/util/arrow/row_batch.cpp @@ -40,7 +40,6 @@ #include "runtime/descriptor_helper.h" #include "runtime/descriptors.h" #include "runtime/large_int_value.h" -#include "runtime/row_batch.h" #include "util/arrow/utils.h" #include "util/types.h" @@ -120,349 +119,6 @@ Status convert_to_arrow_schema(const RowDescriptor& row_desc, return Status::OK(); } -Status convert_to_doris_type(const arrow::DataType& type, TSlotDescriptorBuilder* builder) { - switch (type.id()) { - case arrow::Type::INT8: - builder->type(TYPE_TINYINT); - break; - case arrow::Type::INT16: - builder->type(TYPE_SMALLINT); - break; - case arrow::Type::INT32: - builder->type(TYPE_INT); - break; - case arrow::Type::INT64: - builder->type(TYPE_BIGINT); - break; - case arrow::Type::FLOAT: - builder->type(TYPE_FLOAT); - break; - case arrow::Type::DOUBLE: - builder->type(TYPE_DOUBLE); - break; - case arrow::Type::BOOL: - builder->type(TYPE_BOOLEAN); - break; - default: - return Status::InvalidArgument("Unknown arrow type id({})", type.id()); - } - return Status::OK(); -} - -Status convert_to_slot_desc(const arrow::Field& field, int column_pos, - TSlotDescriptorBuilder* builder) { - RETURN_IF_ERROR(convert_to_doris_type(*field.type(), builder)); - builder->column_name(field.name()).nullable(field.nullable()).column_pos(column_pos); - return Status::OK(); -} - -Status convert_to_row_desc(ObjectPool* pool, const arrow::Schema& schema, - RowDescriptor** row_desc) { - TDescriptorTableBuilder builder; - TTupleDescriptorBuilder tuple_builder; - for (int i = 0; i < schema.num_fields(); ++i) { - auto field = schema.field(i); - TSlotDescriptorBuilder slot_builder; - RETURN_IF_ERROR(convert_to_slot_desc(*field, i, &slot_builder)); - tuple_builder.add_slot(slot_builder.build()); - } - tuple_builder.build(&builder); - DescriptorTbl* tbl = nullptr; - RETURN_IF_ERROR(DescriptorTbl::create(pool, builder.desc_tbl(), &tbl)); - auto tuple_desc = tbl->get_tuple_descriptor(0); - *row_desc = pool->add(new RowDescriptor(tuple_desc, false)); - return Status::OK(); -} - -// Convert RowBatch to an Arrow::Array -// We should keep this function to keep compatible with arrow's type visitor -// Now we inherit TypeVisitor to use default Visit implementation -class FromRowBatchConverter : public arrow::TypeVisitor { -public: - FromRowBatchConverter(const RowBatch& batch, const std::shared_ptr& schema, - arrow::MemoryPool* pool) - : _batch(batch), _schema(schema), _pool(pool), _cur_field_idx(-1) { - // obtain local time zone - time_t ts = 0; - struct tm t; - char buf[16]; - localtime_r(&ts, &t); - strftime(buf, sizeof(buf), "%Z", &t); - _time_zone = buf; - } - - ~FromRowBatchConverter() override {} - - // Use base class function - using arrow::TypeVisitor::Visit; - -#define PRIMITIVE_VISIT(TYPE) \ - arrow::Status Visit(const arrow::TYPE& type) override { return _visit(type); } - - PRIMITIVE_VISIT(Int8Type); - PRIMITIVE_VISIT(Int16Type); - PRIMITIVE_VISIT(Int32Type); - PRIMITIVE_VISIT(Int64Type); - PRIMITIVE_VISIT(FloatType); - PRIMITIVE_VISIT(DoubleType); - -#undef PRIMITIVE_VISIT - - // process string-transformable field - arrow::Status Visit(const arrow::StringType& type) override { - arrow::StringBuilder builder(_pool); - size_t num_rows = _batch.num_rows(); - ARROW_RETURN_NOT_OK(builder.Reserve(num_rows)); - for (size_t i = 0; i < num_rows; ++i) { - bool is_null = _cur_slot_ref->is_null_bit_set(_batch.get_row(i)); - if (is_null) { - ARROW_RETURN_NOT_OK(builder.AppendNull()); - continue; - } - auto cell_ptr = _cur_slot_ref->get_slot(_batch.get_row(i)); - PrimitiveType primitive_type = _cur_slot_ref->type().type; - switch (primitive_type) { - case TYPE_VARCHAR: - case TYPE_CHAR: - case TYPE_HLL: - case TYPE_STRING: { - const StringValue* string_val = (const StringValue*)(cell_ptr); - if (string_val->len == 0) { - // 0x01 is a magic num, not useful actually, just for present "" - //char* tmp_val = reinterpret_cast(0x01); - ARROW_RETURN_NOT_OK(builder.Append("")); - } else { - ARROW_RETURN_NOT_OK(builder.Append(string_val->ptr, string_val->len)); - } - break; - } - case TYPE_DATE: - case TYPE_DATETIME: { - char buf[64]; - const DateTimeValue* time_val = (const DateTimeValue*)(cell_ptr); - int len = time_val->to_buffer(buf); - ARROW_RETURN_NOT_OK(builder.Append(buf, len)); - break; - } - case TYPE_LARGEINT: { - auto string_temp = LargeIntValue::to_string( - reinterpret_cast(cell_ptr)->value); - ARROW_RETURN_NOT_OK(builder.Append(string_temp.data(), string_temp.size())); - break; - } - default: { - LOG(WARNING) << "can't convert this type = " << primitive_type << "to arrow type"; - return arrow::Status::TypeError("unsupported column type"); - } - } - } - return builder.Finish(&_arrays[_cur_field_idx]); - } - - // process doris DecimalV2 - arrow::Status Visit(const arrow::Decimal128Type& type) override { - std::shared_ptr s_decimal_ptr = - std::make_shared(27, 9); - arrow::Decimal128Builder builder(s_decimal_ptr, _pool); - size_t num_rows = _batch.num_rows(); - ARROW_RETURN_NOT_OK(builder.Reserve(num_rows)); - for (size_t i = 0; i < num_rows; ++i) { - bool is_null = _cur_slot_ref->is_null_bit_set(_batch.get_row(i)); - if (is_null) { - ARROW_RETURN_NOT_OK(builder.AppendNull()); - continue; - } - auto cell_ptr = _cur_slot_ref->get_slot(_batch.get_row(i)); - PackedInt128* p_value = reinterpret_cast(cell_ptr); - int64_t high = (p_value->value) >> 64; - uint64 low = p_value->value; - arrow::Decimal128 value(high, low); - ARROW_RETURN_NOT_OK(builder.Append(value)); - } - return builder.Finish(&_arrays[_cur_field_idx]); - } - // process boolean - arrow::Status Visit(const arrow::BooleanType& type) override { - arrow::BooleanBuilder builder(_pool); - size_t num_rows = _batch.num_rows(); - ARROW_RETURN_NOT_OK(builder.Reserve(num_rows)); - for (size_t i = 0; i < num_rows; ++i) { - bool is_null = _cur_slot_ref->is_null_bit_set(_batch.get_row(i)); - if (is_null) { - ARROW_RETURN_NOT_OK(builder.AppendNull()); - continue; - } - auto cell_ptr = _cur_slot_ref->get_slot(_batch.get_row(i)); - ARROW_RETURN_NOT_OK(builder.Append(*(bool*)cell_ptr)); - } - return builder.Finish(&_arrays[_cur_field_idx]); - } - - Status convert(std::shared_ptr* out); - -private: - template - typename std::enable_if::value, arrow::Status>::type - _visit(const T& type) { - arrow::NumericBuilder builder(_pool); - - size_t num_rows = _batch.num_rows(); - ARROW_RETURN_NOT_OK(builder.Reserve(num_rows)); - for (size_t i = 0; i < num_rows; ++i) { - bool is_null = _cur_slot_ref->is_null_bit_set(_batch.get_row(i)); - if (is_null) { - ARROW_RETURN_NOT_OK(builder.AppendNull()); - continue; - } - auto cell_ptr = _cur_slot_ref->get_slot(_batch.get_row(i)); - ARROW_RETURN_NOT_OK(builder.Append(*(typename T::c_type*)cell_ptr)); - } - return builder.Finish(&_arrays[_cur_field_idx]); - } - -private: - const RowBatch& _batch; - const std::shared_ptr& _schema; - arrow::MemoryPool* _pool; - - size_t _cur_field_idx; - std::unique_ptr _cur_slot_ref; - - std::string _time_zone; - - std::vector> _arrays; -}; - -Status FromRowBatchConverter::convert(std::shared_ptr* out) { - std::vector slot_descs; - for (auto tuple_desc : _batch.row_desc().tuple_descriptors()) { - for (auto desc : tuple_desc->slots()) { - slot_descs.push_back(desc); - } - } - size_t num_fields = _schema->num_fields(); - if (slot_descs.size() != num_fields) { - return Status::InvalidArgument("number fields not match"); - } - - _arrays.resize(num_fields); - - for (size_t idx = 0; idx < num_fields; ++idx) { - _cur_field_idx = idx; - _cur_slot_ref.reset(new SlotRef(slot_descs[idx])); - RETURN_IF_ERROR(_cur_slot_ref->prepare(slot_descs[idx], _batch.row_desc())); - auto arrow_st = arrow::VisitTypeInline(*_schema->field(idx)->type(), this); - if (!arrow_st.ok()) { - return to_status(arrow_st); - } - } - *out = arrow::RecordBatch::Make(_schema, _batch.num_rows(), std::move(_arrays)); - return Status::OK(); -} - -Status convert_to_arrow_batch(const RowBatch& batch, const std::shared_ptr& schema, - arrow::MemoryPool* pool, - std::shared_ptr* result) { - FromRowBatchConverter converter(batch, schema, pool); - return converter.convert(result); -} - -// Convert Arrow Array to RowBatch -class ToRowBatchConverter : public arrow::ArrayVisitor { -public: - using arrow::ArrayVisitor::Visit; - - ToRowBatchConverter(const arrow::RecordBatch& batch, const RowDescriptor& row_desc) - : _batch(batch), _row_desc(row_desc) {} - -#define PRIMITIVE_VISIT(TYPE) \ - arrow::Status Visit(const arrow::TYPE& array) override { return _visit(array); } - - PRIMITIVE_VISIT(Int8Array); - PRIMITIVE_VISIT(Int16Array); - PRIMITIVE_VISIT(Int32Array); - PRIMITIVE_VISIT(Int64Array); - PRIMITIVE_VISIT(FloatArray); - PRIMITIVE_VISIT(DoubleArray); - -#undef PRIMITIVE_VISIT - - // Convert to a RowBatch - Status convert(std::shared_ptr* result); - -private: - template - typename std::enable_if::value, - arrow::Status>::type - _visit(const T& array) { - auto raw_values = array.raw_values(); - for (size_t i = 0; i < array.length(); ++i) { - auto row = _output->get_row(i); - auto tuple = _cur_slot_ref->get_tuple(row); - if (array.IsValid(i)) { - tuple->set_not_null(_cur_slot_ref->null_indicator_offset()); - auto slot = _cur_slot_ref->get_slot(row); - *(typename T::TypeClass::c_type*)slot = raw_values[i]; - } else { - tuple->set_null(_cur_slot_ref->null_indicator_offset()); - } - } - return arrow::Status::OK(); - } - -private: - const arrow::RecordBatch& _batch; - const RowDescriptor& _row_desc; - - std::unique_ptr _cur_slot_ref; - std::shared_ptr _output; -}; - -Status ToRowBatchConverter::convert(std::shared_ptr* result) { - std::vector slot_descs; - for (auto tuple_desc : _row_desc.tuple_descriptors()) { - for (auto desc : tuple_desc->slots()) { - slot_descs.push_back(desc); - } - } - size_t num_fields = slot_descs.size(); - if (num_fields != _batch.schema()->num_fields()) { - return Status::InvalidArgument("Schema not match"); - } - // TODO(zc): check if field type match - - size_t num_rows = _batch.num_rows(); - _output.reset(new RowBatch(_row_desc, num_rows)); - _output->commit_rows(num_rows); - auto pool = _output->tuple_data_pool(); - for (size_t row_id = 0; row_id < num_rows; ++row_id) { - auto row = _output->get_row(row_id); - for (int tuple_id = 0; tuple_id < _row_desc.tuple_descriptors().size(); ++tuple_id) { - auto tuple_desc = _row_desc.tuple_descriptors()[tuple_id]; - auto tuple = pool->allocate(tuple_desc->byte_size()); - row->set_tuple(tuple_id, (Tuple*)tuple); - } - } - for (size_t idx = 0; idx < num_fields; ++idx) { - _cur_slot_ref.reset(new SlotRef(slot_descs[idx])); - RETURN_IF_ERROR(_cur_slot_ref->prepare(slot_descs[idx], _row_desc)); - auto arrow_st = arrow::VisitArrayInline(*_batch.column(idx), this); - if (!arrow_st.ok()) { - return to_status(arrow_st); - } - } - - *result = std::move(_output); - - return Status::OK(); -} - -Status convert_to_row_batch(const arrow::RecordBatch& batch, const RowDescriptor& row_desc, - std::shared_ptr* result) { - ToRowBatchConverter converter(batch, row_desc); - return converter.convert(result); -} - Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result) { // create sink memory buffer outputstream with the computed capacity int64_t capacity; diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h index f75b060502..25203ec170 100644 --- a/be/src/util/arrow/row_batch.h +++ b/be/src/util/arrow/row_batch.h @@ -36,29 +36,12 @@ class Schema; namespace doris { class ObjectPool; -class RowBatch; class RowDescriptor; // Convert Doris RowDescriptor to Arrow Schema. Status convert_to_arrow_schema(const RowDescriptor& row_desc, std::shared_ptr* result); -// Convert an Arrow Schema to a Doris RowDescriptor which will be add to -// input pool. -// Why we should -Status convert_to_row_desc(ObjectPool* pool, const arrow::Schema& schema, RowDescriptor** row_desc); - -// Convert a Doris RowBatch to an Arrow RecordBatch. A valid Arrow Schema -// who should match RowBatch's schema is given. Memory used by result RecordBatch -// will be allocated from input pool. -Status convert_to_arrow_batch(const RowBatch& batch, const std::shared_ptr& schema, - arrow::MemoryPool* pool, std::shared_ptr* result); - -// Convert an Arrow RecordBatch to a Doris RowBatch. A valid RowDescriptor -// whose schema is the same with RecordBatch's should be given. -Status convert_to_row_batch(const arrow::RecordBatch& batch, const RowDescriptor& row_desc, - std::shared_ptr* result); - Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result); } // namespace doris diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 817a2ac643..ebe46fdd0c 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -26,7 +26,6 @@ #include "agent/be_exec_version_manager.h" #include "common/status.h" #include "runtime/descriptors.h" -#include "runtime/row_batch.h" #include "runtime/tuple.h" #include "runtime/tuple_row.h" #include "udf/udf.h" diff --git a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp b/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp index 33a179c3ae..d8107ba925 100644 --- a/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp +++ b/be/src/vec/exec/data_gen_functions/vnumbers_tvf.cpp @@ -21,7 +21,6 @@ #include "exec/exec_node.h" #include "gen_cpp/PlanNodes_types.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/string_value.h" #include "runtime/tuple_row.h" diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp index 7125149494..39ae99a36e 100644 --- a/be/src/vec/exec/join/vnested_loop_join_node.cpp +++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp @@ -25,7 +25,6 @@ #include "exprs/expr.h" #include "exprs/runtime_filter_slots_cross.h" #include "gen_cpp/PlanNodes_types.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "util/runtime_profile.h" #include "util/simd/bits.h" diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 2e3330f0a5..ecf4d87081 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -21,7 +21,6 @@ #include "exec/exec_node.h" #include "runtime/mem_pool.h" -#include "runtime/row_batch.h" #include "vec/core/block.h" #include "vec/data_types/data_type_nullable.h" #include "vec/data_types/data_type_string.h" diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index acd8fafe53..b2235de9ab 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -18,7 +18,6 @@ #include "vec/exec/vanalytic_eval_node.h" #include "runtime/descriptors.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "vec/exprs/vexpr.h" diff --git a/be/src/vec/exec/vdata_gen_scan_node.cpp b/be/src/vec/exec/vdata_gen_scan_node.cpp index ae3d7d0986..e91626d49c 100644 --- a/be/src/vec/exec/vdata_gen_scan_node.cpp +++ b/be/src/vec/exec/vdata_gen_scan_node.cpp @@ -21,7 +21,6 @@ #include "common/status.h" #include "gen_cpp/PlanNodes_types.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/string_value.h" #include "runtime/tuple_row.h" diff --git a/be/src/vec/exec/vmysql_scan_node.cpp b/be/src/vec/exec/vmysql_scan_node.cpp index e0f520f6ae..3fb03bd19c 100644 --- a/be/src/vec/exec/vmysql_scan_node.cpp +++ b/be/src/vec/exec/vmysql_scan_node.cpp @@ -20,7 +20,6 @@ #include "exec/text_converter.h" #include "exec/text_converter.hpp" #include "gen_cpp/PlanNodes_types.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/string_value.h" #include "runtime/tuple_row.h" diff --git a/be/src/vec/exec/vschema_scan_node.cpp b/be/src/vec/exec/vschema_scan_node.cpp index 1bdd5c8563..9e3e6a1c42 100644 --- a/be/src/vec/exec/vschema_scan_node.cpp +++ b/be/src/vec/exec/vschema_scan_node.cpp @@ -20,7 +20,6 @@ #include "exec/text_converter.h" #include "exec/text_converter.hpp" #include "gen_cpp/PlanNodes_types.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/string_value.h" #include "runtime/tuple_row.h" diff --git a/be/src/vec/exec/vset_operation_node.h b/be/src/vec/exec/vset_operation_node.h index 1e339e3a80..ad9cbab7e7 100644 --- a/be/src/vec/exec/vset_operation_node.h +++ b/be/src/vec/exec/vset_operation_node.h @@ -18,7 +18,6 @@ #pragma once #include "exec/exec_node.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "vec/core/materialize_block.h" #include "vec/exec/join/join_op.h" diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 97916632a9..4ebdd05506 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -19,7 +19,6 @@ #include "common/config.h" #include "pipeline/pipeline.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "util/debug_util.h" #include "vec/common/sort/heap_sorter.h" diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index 21b7a0f521..17f87aac2e 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -20,7 +20,6 @@ #include #include "runtime/descriptors.h" -#include "runtime/row_batch.h" #include "util/debug_util.h" #include "util/defer_op.h" #include "util/runtime_profile.h" diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp index a25683e6da..2c8f79de48 100644 --- a/be/src/vec/sink/vresult_file_sink.cpp +++ b/be/src/vec/sink/vresult_file_sink.cpp @@ -21,7 +21,6 @@ #include "runtime/buffer_control_block.h" #include "runtime/exec_env.h" #include "runtime/result_buffer_mgr.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "util/uid_util.h" #include "vec/exprs/vexpr.h" diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 03a0181e55..7f756aff83 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -27,7 +27,6 @@ #include "exprs/expr_context.h" #include "olap/hll.h" #include "runtime/exec_env.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" #include "runtime/tuple_row.h" diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h index c299fdca55..b62544cb7c 100644 --- a/be/src/vec/sink/vtablet_sink.h +++ b/be/src/vec/sink/vtablet_sink.h @@ -33,7 +33,6 @@ #include "exprs/expr_context.h" #include "gen_cpp/Types_types.h" #include "gen_cpp/internal_service.pb.h" -#include "runtime/row_batch.h" #include "runtime/thread_context.h" #include "util/bitmap.h" #include "util/countdown_latch.h" diff --git a/be/test/exprs/binary_predicate_test.cpp b/be/test/exprs/binary_predicate_test.cpp index a53d3754fc..af96e332cb 100644 --- a/be/test/exprs/binary_predicate_test.cpp +++ b/be/test/exprs/binary_predicate_test.cpp @@ -24,7 +24,6 @@ #include "exprs/expr.h" #include "exprs/int_literal.h" #include "gen_cpp/Exprs_types.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "util/debug_util.h" diff --git a/be/test/exprs/in_op_test.cpp b/be/test/exprs/in_op_test.cpp index c1de71f4e9..fc49b4dd5b 100644 --- a/be/test/exprs/in_op_test.cpp +++ b/be/test/exprs/in_op_test.cpp @@ -23,7 +23,6 @@ #include "exprs/in_predicate.h" #include "exprs/int_literal.h" #include "gen_cpp/Exprs_types.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "util/debug_util.h" diff --git a/be/test/runtime/data_spliter_test.cpp b/be/test/runtime/data_spliter_test.cpp index 1f0ab50c5c..b497978e3b 100644 --- a/be/test/runtime/data_spliter_test.cpp +++ b/be/test/runtime/data_spliter_test.cpp @@ -26,7 +26,6 @@ #include "olap/olap_main.cpp" #include "runtime/descriptors.h" #include "runtime/dpp_sink_internal.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/tuple.h" #include "runtime/tuple_row.h" diff --git a/be/test/runtime/fragment_mgr_test.cpp b/be/test/runtime/fragment_mgr_test.cpp index 55939cc04f..3dfc8b0a37 100644 --- a/be/test/runtime/fragment_mgr_test.cpp +++ b/be/test/runtime/fragment_mgr_test.cpp @@ -22,7 +22,6 @@ #include "common/config.h" #include "exec/data_sink.h" #include "runtime/plan_fragment_executor.h" -#include "runtime/row_batch.h" namespace doris { diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp index 51a743e738..a4de5e9129 100644 --- a/be/test/vec/core/block_test.cpp +++ b/be/test/vec/core/block_test.cpp @@ -26,7 +26,6 @@ #include "agent/be_exec_version_manager.h" #include "exec/schema_scanner.h" #include "gen_cpp/data.pb.h" -#include "runtime/row_batch.h" #include "runtime/string_value.h" #include "runtime/tuple_row.h" #include "vec/columns/column_array.h" diff --git a/be/test/vec/exec/vjson_scanner_test.cpp b/be/test/vec/exec/vjson_scanner_test.cpp index 9149fb3dd7..a4d7a6c5d5 100644 --- a/be/test/vec/exec/vjson_scanner_test.cpp +++ b/be/test/vec/exec/vjson_scanner_test.cpp @@ -32,7 +32,6 @@ #include "io/local_file_reader.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/tuple.h" #include "runtime/user_function_cache.h" diff --git a/be/test/vec/exec/vorc_scanner_test.cpp b/be/test/vec/exec/vorc_scanner_test.cpp index 39e3bb56ed..d47052efce 100644 --- a/be/test/vec/exec/vorc_scanner_test.cpp +++ b/be/test/vec/exec/vorc_scanner_test.cpp @@ -32,7 +32,6 @@ #include "gen_cpp/PlanNodes_types.h" #include "io/local_file_reader.h" #include "runtime/descriptors.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/tuple.h" #include "runtime/user_function_cache.h" diff --git a/be/test/vec/exec/vparquet_scanner_test.cpp b/be/test/vec/exec/vparquet_scanner_test.cpp index c08a69a005..49b3ba3c56 100644 --- a/be/test/vec/exec/vparquet_scanner_test.cpp +++ b/be/test/vec/exec/vparquet_scanner_test.cpp @@ -28,7 +28,6 @@ #include "gen_cpp/PlanNodes_types.h" #include "io/local_file_reader.h" #include "runtime/descriptors.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/tuple.h" #include "runtime/user_function_cache.h" diff --git a/be/test/vec/exprs/vexpr_test.cpp b/be/test/vec/exprs/vexpr_test.cpp index edea60aa12..8eb195535e 100644 --- a/be/test/vec/exprs/vexpr_test.cpp +++ b/be/test/vec/exprs/vexpr_test.cpp @@ -30,7 +30,6 @@ #include "runtime/large_int_value.h" #include "runtime/memory/chunk_allocator.h" #include "runtime/primitive_type.h" -#include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/tuple.h" #include "runtime/tuple_row.h" @@ -48,25 +47,12 @@ TEST(TEST_VEXPR, ABSTEST) { auto tuple_desc = const_cast(desc_tbl->get_tuple_descriptor(0)); doris::RowDescriptor row_desc(tuple_desc, false); - doris::RowBatch row_batch(row_desc, 1024); std::string expr_json = R"|({"1":{"lst":["rec",2,{"1":{"i32":20},"2":{"rec":{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":6}}}}]}}},"4":{"i32":1},"20":{"i32":-1},"26":{"rec":{"1":{"rec":{"2":{"str":"abs"}}},"2":{"i32":0},"3":{"lst":["rec",1,{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":5}}}}]}}]},"4":{"rec":{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":6}}}}]}}},"5":{"tf":0},"7":{"str":"abs(INT)"},"9":{"rec":{"1":{"str":"_ZN5doris13MathFunctions3absEPN9doris_udf15FunctionContextERKNS1_6IntValE"}}},"11":{"i64":0}}}},{"1":{"i32":16},"2":{"rec":{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":5}}}}]}}},"4":{"i32":0},"15":{"rec":{"1":{"i32":0},"2":{"i32":0}}},"20":{"i32":-1},"23":{"i32":-1}}]}})|"; doris::TExpr exprx = apache::thrift::from_json_string(expr_json); doris::vectorized::VExprContext* context = nullptr; doris::vectorized::VExpr::create_expr_tree(&object_pool, exprx, &context); - int32_t k1 = -100; - for (int i = 0; i < 1024; ++i, k1++) { - auto idx = row_batch.add_row(); - doris::TupleRow* tuple_row = row_batch.get_row(idx); - auto tuple = - (doris::Tuple*)(row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size())); - auto slot_desc = tuple_desc->slots()[0]; - memcpy(tuple->get_slot(slot_desc->tuple_offset()), &k1, slot_desc->slot_size()); - tuple_row->set_tuple(0, tuple); - row_batch.commit_last_row(); - } - doris::RuntimeState runtime_stat(doris::TUniqueId(), doris::TQueryOptions(), doris::TQueryGlobals(), nullptr); runtime_stat.init_mem_trackers(); @@ -85,7 +71,6 @@ TEST(TEST_VEXPR, ABSTEST2) { schema_scanner.init(¶m, &object_pool); auto tuple_desc = const_cast(schema_scanner.tuple_desc()); RowDescriptor row_desc(tuple_desc, false); - RowBatch row_batch(row_desc, 1024); std::string expr_json = R"|({"1":{"lst":["rec",2,{"1":{"i32":20},"2":{"rec":{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":6}}}}]}}},"4":{"i32":1},"20":{"i32":-1},"26":{"rec":{"1":{"rec":{"2":{"str":"abs"}}},"2":{"i32":0},"3":{"lst":["rec",1,{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":5}}}}]}}]},"4":{"rec":{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":6}}}}]}}},"5":{"tf":0},"7":{"str":"abs(INT)"},"9":{"rec":{"1":{"str":"_ZN5doris13MathFunctions3absEPN9doris_udf15FunctionContextERKNS1_6IntValE"}}},"11":{"i64":0}}}},{"1":{"i32":16},"2":{"rec":{"1":{"lst":["rec",1,{"1":{"i32":0},"2":{"rec":{"1":{"i32":5}}}}]}}},"4":{"i32":0},"15":{"rec":{"1":{"i32":0},"2":{"i32":0}}},"20":{"i32":-1},"23":{"i32":-1}}]}})|"; TExpr exprx = apache::thrift::from_json_string(expr_json); @@ -93,18 +78,6 @@ TEST(TEST_VEXPR, ABSTEST2) { doris::vectorized::VExprContext* context = nullptr; doris::vectorized::VExpr::create_expr_tree(&object_pool, exprx, &context); - int32_t k1 = -100; - for (int i = 0; i < 1024; ++i, k1++) { - auto idx = row_batch.add_row(); - doris::TupleRow* tuple_row = row_batch.get_row(idx); - auto tuple = - (doris::Tuple*)(row_batch.tuple_data_pool()->allocate(tuple_desc->byte_size())); - auto slot_desc = tuple_desc->slots()[0]; - memcpy(tuple->get_slot(slot_desc->tuple_offset()), &k1, slot_desc->slot_size()); - tuple_row->set_tuple(0, tuple); - row_batch.commit_last_row(); - } - doris::RuntimeState runtime_stat(doris::TUniqueId(), doris::TQueryOptions(), doris::TQueryGlobals(), nullptr); runtime_stat.init_mem_trackers();