diff --git a/be/src/exec/arrow/arrow_reader.cpp b/be/src/exec/arrow/arrow_reader.cpp index 83f3e191f6..d26efd32aa 100644 --- a/be/src/exec/arrow/arrow_reader.cpp +++ b/be/src/exec/arrow/arrow_reader.cpp @@ -29,17 +29,21 @@ #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/mem_pool.h" +#include "runtime/runtime_state.h" #include "runtime/tuple.h" #include "util/string_util.h" #include "util/thrift_util.h" +#include "vec/core/block.h" +#include "vec/utils/arrow_column_to_doris_column.h" namespace doris { -// Broker - -ArrowReaderWrap::ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file, bool case_sensitive) - : _batch_size(batch_size), +ArrowReaderWrap::ArrowReaderWrap(RuntimeState* state, + const std::vector& file_slot_descs, + FileReader* file_reader, int32_t num_of_columns_from_file, + bool case_sensitive) + : _state(state), + _file_slot_descs(file_slot_descs), _num_of_columns_from_file(num_of_columns_from_file), _case_sensitive(case_sensitive) { _arrow_file = std::shared_ptr(new ArrowFile(file_reader)); @@ -65,11 +69,11 @@ void ArrowReaderWrap::close() { } } -Status ArrowReaderWrap::column_indices(const std::vector& tuple_slot_descs) { - DCHECK(_num_of_columns_from_file <= tuple_slot_descs.size()); +Status ArrowReaderWrap::column_indices() { + DCHECK(_num_of_columns_from_file <= _file_slot_descs.size()); _include_column_ids.clear(); for (int i = 0; i < _num_of_columns_from_file; i++) { - auto slot_desc = tuple_slot_descs.at(i); + auto slot_desc = _file_slot_descs.at(i); // Get the Column Reader for the boolean column auto iter = _map_column.find(slot_desc->col_name()); if (iter != _map_column.end()) { @@ -84,7 +88,7 @@ Status ArrowReaderWrap::column_indices(const std::vector& tuple return Status::OK(); } -int ArrowReaderWrap::get_cloumn_index(std::string column_name) { +int ArrowReaderWrap::get_column_index(std::string column_name) { std::string real_column_name = _case_sensitive ? column_name : to_lower(column_name); auto iter = _map_column.find(real_column_name); if (iter != _map_column.end()) { @@ -97,6 +101,37 @@ int ArrowReaderWrap::get_cloumn_index(std::string column_name) { } } +Status ArrowReaderWrap::get_next_block(vectorized::Block* block, bool* eof) { + size_t rows = 0; + do { + if (_batch == nullptr || _arrow_batch_cur_idx >= _batch->num_rows()) { + RETURN_IF_ERROR(next_batch(&_batch, eof)); + if (*eof) { + return Status::OK(); + } + } + + size_t num_elements = std::min((_state->batch_size() - block->rows()), + (_batch->num_rows() - _arrow_batch_cur_idx)); + for (auto i = 0; i < _file_slot_descs.size(); ++i) { + SlotDescriptor* slot_desc = _file_slot_descs[i]; + if (slot_desc == nullptr) { + continue; + } + std::string real_column_name = + is_case_sensitive() ? slot_desc->col_name() : slot_desc->col_name_lower_case(); + auto* array = _batch->GetColumnByName(real_column_name).get(); + auto& column_with_type_and_name = block->get_by_name(slot_desc->col_name()); + RETURN_IF_ERROR(arrow_column_to_doris_column( + array, _arrow_batch_cur_idx, column_with_type_and_name.column, + column_with_type_and_name.type, num_elements, _state->timezone_obj())); + } + rows += num_elements; + _arrow_batch_cur_idx += num_elements; + } while (!(*eof) && rows < _state->batch_size()); + return Status::OK(); +} + Status ArrowReaderWrap::next_batch(std::shared_ptr* batch, bool* eof) { std::unique_lock lock(_mtx); while (!_closed && _queue.empty()) { @@ -114,6 +149,7 @@ Status ArrowReaderWrap::next_batch(std::shared_ptr* batch, b *batch = _queue.front(); _queue.pop_front(); _queue_writer_cond.notify_one(); + _arrow_batch_cur_idx = 0; return Status::OK(); } diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h index 24c381316e..35703e4bbd 100644 --- a/be/src/exec/arrow/arrow_reader.h +++ b/be/src/exec/arrow/arrow_reader.h @@ -37,6 +37,7 @@ #include "gen_cpp/PaloBrokerService_types.h" #include "gen_cpp/PlanNodes_types.h" #include "gen_cpp/Types_types.h" +#include "vec/exec/format/generic_reader.h" namespace doris { @@ -77,38 +78,42 @@ private: }; // base of arrow reader -class ArrowReaderWrap { +class ArrowReaderWrap : public vectorized::GenericReader { public: - ArrowReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, - bool caseSensitive); + ArrowReaderWrap(RuntimeState* state, const std::vector& file_slot_descs, + FileReader* file_reader, int32_t num_of_columns_from_file, bool caseSensitive); virtual ~ArrowReaderWrap(); virtual Status init_reader(const TupleDescriptor* tuple_desc, - const std::vector& tuple_slot_descs, const std::vector& conjunct_ctxs, const std::string& timezone) = 0; // for row - virtual Status read(Tuple* tuple, const std::vector& tuple_slot_descs, - MemPool* mem_pool, bool* eof) { + virtual Status read(Tuple* tuple, MemPool* mem_pool, bool* eof) { return Status::NotSupported("Not Implemented read"); } // for vec + Status get_next_block(vectorized::Block* block, bool* eof) override; + // This method should be deprecated once the old scanner is removed. + // And user should use "get_next_block" instead. Status next_batch(std::shared_ptr* batch, bool* eof); + std::shared_ptr& statistics() { return _statistics; } void close(); virtual Status size(int64_t* size) { return Status::NotSupported("Not Implemented size"); } - int get_cloumn_index(std::string column_name); + int get_column_index(std::string column_name); void prefetch_batch(); bool is_case_sensitive() { return _case_sensitive; } protected: - virtual Status column_indices(const std::vector& tuple_slot_descs); + virtual Status column_indices(); virtual void read_batches(arrow::RecordBatchVector& batches, int current_group) = 0; virtual bool filter_row_group(int current_group) = 0; protected: - const int64_t _batch_size; + RuntimeState* _state; + std::vector _file_slot_descs; + const int32_t _num_of_columns_from_file; std::shared_ptr _arrow_file; std::shared_ptr<::arrow::RecordBatchReader> _rb_reader; @@ -128,6 +133,10 @@ protected: const size_t _max_queue_size = config::parquet_reader_max_buffer_size; std::thread _thread; bool _case_sensitive; + + // The following fields are only valid when using "get_block()" interface. + std::shared_ptr _batch; + size_t _arrow_batch_cur_idx = 0; }; } // namespace doris diff --git a/be/src/exec/arrow/orc_reader.cpp b/be/src/exec/arrow/orc_reader.cpp index 3223c64b6a..65a67909ba 100644 --- a/be/src/exec/arrow/orc_reader.cpp +++ b/be/src/exec/arrow/orc_reader.cpp @@ -23,15 +23,18 @@ #include "common/logging.h" #include "io/file_reader.h" #include "runtime/mem_pool.h" +#include "runtime/runtime_state.h" #include "runtime/tuple.h" #include "util/string_util.h" namespace doris { -ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file, int64_t range_start_offset, - int64_t range_size, bool case_sensitive) - : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file, case_sensitive), +ORCReaderWrap::ORCReaderWrap(RuntimeState* state, + const std::vector& file_slot_descs, + FileReader* file_reader, int32_t num_of_columns_from_file, + int64_t range_start_offset, int64_t range_size, bool case_sensitive) + : ArrowReaderWrap(state, file_slot_descs, file_reader, num_of_columns_from_file, + case_sensitive), _range_start_offset(range_start_offset), _range_size(range_size) { _reader = nullptr; @@ -39,7 +42,6 @@ ORCReaderWrap::ORCReaderWrap(FileReader* file_reader, int64_t batch_size, } Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc, - const std::vector& tuple_slot_descs, const std::vector& conjunct_ctxs, const std::string& timezone) { // Open ORC file reader @@ -73,7 +75,7 @@ Status ORCReaderWrap::init_reader(const TupleDescriptor* tuple_desc, _map_column.emplace(schemaName, i + 1); } - RETURN_IF_ERROR(column_indices(tuple_slot_descs)); + RETURN_IF_ERROR(column_indices()); _thread = std::thread(&ArrowReaderWrap::prefetch_batch, this); @@ -133,7 +135,7 @@ Status ORCReaderWrap::_next_stripe_reader(bool* eof) { // which may cause OOM issues by loading the whole stripe into memory. // Note this will only read rows for the current stripe, not the entire file. arrow::Result> maybe_rb_reader = - _reader->NextStripeReader(_batch_size, _include_column_ids); + _reader->NextStripeReader(_state->batch_size(), _include_column_ids); if (!maybe_rb_reader.ok()) { LOG(WARNING) << "Get RecordBatch Failed. " << maybe_rb_reader.status(); return Status::InternalError(maybe_rb_reader.status().ToString()); @@ -162,4 +164,4 @@ bool ORCReaderWrap::filter_row_group(int current_group) { return false; } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/exec/arrow/orc_reader.h b/be/src/exec/arrow/orc_reader.h index c166196d83..a6455e8400 100644 --- a/be/src/exec/arrow/orc_reader.h +++ b/be/src/exec/arrow/orc_reader.h @@ -32,12 +32,12 @@ namespace doris { // Reader of ORC file class ORCReaderWrap final : public ArrowReaderWrap { public: - ORCReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, + ORCReaderWrap(RuntimeState* state, const std::vector& file_slot_descs, + FileReader* file_reader, int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size, bool case_sensitive = true); ~ORCReaderWrap() override = default; Status init_reader(const TupleDescriptor* tuple_desc, - const std::vector& tuple_slot_descs, const std::vector& conjunct_ctxs, const std::string& timezone) override; diff --git a/be/src/exec/arrow/parquet_reader.cpp b/be/src/exec/arrow/parquet_reader.cpp index 97b033b4f7..fc8c1ca5f5 100644 --- a/be/src/exec/arrow/parquet_reader.cpp +++ b/be/src/exec/arrow/parquet_reader.cpp @@ -37,10 +37,13 @@ namespace doris { // Broker -ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file, int64_t range_start_offset, - int64_t range_size, bool case_sensitive) - : ArrowReaderWrap(file_reader, batch_size, num_of_columns_from_file, case_sensitive), +ParquetReaderWrap::ParquetReaderWrap(RuntimeState* state, + const std::vector& file_slot_descs, + FileReader* file_reader, int32_t num_of_columns_from_file, + int64_t range_start_offset, int64_t range_size, + bool case_sensitive) + : ArrowReaderWrap(state, file_slot_descs, file_reader, num_of_columns_from_file, + case_sensitive), _rows_of_group(0), _current_line_of_group(0), _current_line_of_batch(0), @@ -48,7 +51,6 @@ ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int64_t batch_size _range_size(range_size) {} Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc, - const std::vector& tuple_slot_descs, const std::vector& conjunct_ctxs, const std::string& timezone) { try { @@ -97,7 +99,7 @@ Status ParquetReaderWrap::init_reader(const TupleDescriptor* tuple_desc, _timezone = timezone; - RETURN_IF_ERROR(column_indices(tuple_slot_descs)); + RETURN_IF_ERROR(column_indices()); if (config::parquet_predicate_push_down) { int64_t file_size = 0; size(&file_size); @@ -238,8 +240,7 @@ Status ParquetReaderWrap::init_parquet_type() { return Status::OK(); } -Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& tuple_slot_descs, - MemPool* mem_pool, bool* eof) { +Status ParquetReaderWrap::read(Tuple* tuple, MemPool* mem_pool, bool* eof) { if (_batch == nullptr) { _current_line_of_group += _rows_of_group; return read_record_batch(eof); @@ -251,7 +252,7 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector& try { size_t slots = _include_column_ids.size(); for (size_t i = 0; i < slots; ++i) { - auto slot_desc = tuple_slot_descs[i]; + auto slot_desc = _file_slot_descs[i]; column_index = i; // column index in batch record switch (_parquet_column_type[i]) { case arrow::Type::type::STRING: { diff --git a/be/src/exec/arrow/parquet_reader.h b/be/src/exec/arrow/parquet_reader.h index d5d0165665..a2a80a1966 100644 --- a/be/src/exec/arrow/parquet_reader.h +++ b/be/src/exec/arrow/parquet_reader.h @@ -62,16 +62,15 @@ class RowGroupReader; class ParquetReaderWrap final : public ArrowReaderWrap { public: // batch_size is not use here - ParquetReaderWrap(FileReader* file_reader, int64_t batch_size, int32_t num_of_columns_from_file, + ParquetReaderWrap(RuntimeState* state, const std::vector& file_slot_descs, + FileReader* file_reader, int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size, bool case_sensitive = true); ~ParquetReaderWrap() override = default; // Read - Status read(Tuple* tuple, const std::vector& tuple_slot_descs, - MemPool* mem_pool, bool* eof) override; + Status read(Tuple* tuple, MemPool* mem_pool, bool* eof) override; Status size(int64_t* size) override; Status init_reader(const TupleDescriptor* tuple_desc, - const std::vector& tuple_slot_descs, const std::vector& conjunct_ctxs, const std::string& timezone) override; Status init_parquet_type(); diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index d7aa4041e2..e6da51f71d 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -55,8 +55,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo } _cur_file_eof = false; } - RETURN_IF_ERROR( - _cur_file_reader->read(_src_tuple, _src_slot_descs, tuple_pool, &_cur_file_eof)); + RETURN_IF_ERROR(_cur_file_reader->read(_src_tuple, tuple_pool, &_cur_file_eof)); // range of current file const TBrokerRangeDesc& range = _ranges.at(_next_range - 1); if (range.__isset.num_of_columns_from_file) { @@ -106,11 +105,11 @@ Status ParquetScanner::open_next_reader() { if (range.__isset.num_of_columns_from_file) { num_of_columns_from_file = range.num_of_columns_from_file; } - _cur_file_reader = new ParquetReaderWrap(file_reader.release(), _state->batch_size(), + _cur_file_reader = new ParquetReaderWrap(_state, _src_slot_descs, file_reader.release(), num_of_columns_from_file, 0, 0); auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); - Status status = _cur_file_reader->init_reader(tuple_desc, _src_slot_descs, _conjunct_ctxs, - _state->timezone()); + Status status = + _cur_file_reader->init_reader(tuple_desc, _conjunct_ctxs, _state->timezone()); if (status.is_end_of_file()) { continue; } else { diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index d8fc97e6b8..36cadb284f 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -244,7 +244,7 @@ set(VEC_FILES exec/scan/new_file_scan_node.cpp exec/scan/new_file_scanner.cpp exec/scan/new_file_text_scanner.cpp -) + ) add_library(Vec STATIC ${VEC_FILES} diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp index 998d40ea6f..62d75a4e14 100644 --- a/be/src/vec/exec/file_arrow_scanner.cpp +++ b/be/src/vec/exec/file_arrow_scanner.cpp @@ -71,12 +71,12 @@ Status FileArrowScanner::_open_next_reader() { int32_t num_of_columns_from_file = _file_slot_descs.size(); _cur_file_reader = - _new_arrow_reader(file_reader.release(), _state->batch_size(), - num_of_columns_from_file, range.start_offset, range.size); + _new_arrow_reader(_file_slot_descs, file_reader.release(), num_of_columns_from_file, + range.start_offset, range.size); auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); - Status status = _cur_file_reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs, - _state->timezone()); + Status status = + _cur_file_reader->init_reader(tuple_desc, _conjunct_ctxs, _state->timezone()); if (status.is_end_of_file()) { continue; } else { @@ -226,11 +226,10 @@ VFileParquetScanner::VFileParquetScanner(RuntimeState* state, RuntimeProfile* pr _init_profiles(profile); } -ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file, - int64_t range_start_offset, - int64_t range_size) { - return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file, +ArrowReaderWrap* VFileParquetScanner::_new_arrow_reader( + const std::vector& file_slot_descs, FileReader* file_reader, + int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) { + return new ParquetReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file, range_start_offset, range_size, false); } @@ -250,12 +249,11 @@ VFileORCScanner::VFileORCScanner(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter) : FileArrowScanner(state, profile, params, ranges, pre_filter_texprs, counter) {} -ArrowReaderWrap* VFileORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file, - int64_t range_start_offset, - int64_t range_size) { - return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file, range_start_offset, - range_size, false); +ArrowReaderWrap* VFileORCScanner::_new_arrow_reader( + const std::vector& file_slot_descs, FileReader* file_reader, + int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) { + return new ORCReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file, + range_start_offset, range_size, false); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/file_arrow_scanner.h b/be/src/vec/exec/file_arrow_scanner.h index 93e391af92..113bd54d6e 100644 --- a/be/src/vec/exec/file_arrow_scanner.h +++ b/be/src/vec/exec/file_arrow_scanner.h @@ -52,7 +52,8 @@ public: void close() override; protected: - virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, + virtual ArrowReaderWrap* _new_arrow_reader(const std::vector& file_slot_descs, + FileReader* file_reader, int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) = 0; virtual void _update_profile(std::shared_ptr& statistics) {} @@ -83,9 +84,9 @@ public: ~VFileParquetScanner() override = default; protected: - ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file, int64_t range_start_offset, - int64_t range_size) override; + ArrowReaderWrap* _new_arrow_reader(const std::vector& file_slot_descs, + FileReader* file_reader, int32_t num_of_columns_from_file, + int64_t range_start_offset, int64_t range_size) override; void _init_profiles(RuntimeProfile* profile) override; void _update_profile(std::shared_ptr& statistics) override; @@ -108,9 +109,9 @@ public: ~VFileORCScanner() override = default; protected: - ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file, int64_t range_start_offset, - int64_t range_size) override; + ArrowReaderWrap* _new_arrow_reader(const std::vector& file_slot_descs, + FileReader* file_reader, int32_t num_of_columns_from_file, + int64_t range_start_offset, int64_t range_size) override; void _init_profiles(RuntimeProfile* profile) override {}; }; diff --git a/be/src/vec/exec/file_hdfs_scanner.cpp b/be/src/vec/exec/file_hdfs_scanner.cpp index 6ba8fe1b27..8484b8d754 100644 --- a/be/src/vec/exec/file_hdfs_scanner.cpp +++ b/be/src/vec/exec/file_hdfs_scanner.cpp @@ -50,7 +50,7 @@ Status ParquetFileHdfsScanner::get_next(vectorized::Block* block, bool* eof) { } RETURN_IF_ERROR(init_block(block)); bool range_eof = false; - RETURN_IF_ERROR(_reader->read_next_batch(block, &range_eof)); + RETURN_IF_ERROR(_reader->get_next_block(block, &range_eof)); if (block->rows() > 0) { _fill_columns_from_path(block, block->rows()); } @@ -89,4 +89,4 @@ void ParquetFileHdfsScanner::close() { FileScanner::close(); } -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h new file mode 100644 index 0000000000..d830a8c0b8 --- /dev/null +++ b/be/src/vec/exec/format/generic_reader.h @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" + +namespace doris::vectorized { + +class Block; +// This a reader interface for all file readers. +// A GenericReader is responsible for reading a file and return +// a set of blocks with specified schema, +class GenericReader { +public: + virtual Status get_next_block(Block* block, bool* eof) = 0; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index bda676e1c3..1319755355 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -99,7 +99,7 @@ Status ParquetReader::_init_read_columns(const std::vector& tup return Status::OK(); } -Status ParquetReader::read_next_batch(Block* block, bool* eof) { +Status ParquetReader::get_next_block(Block* block, bool* eof) { int32_t num_of_readers = _row_group_readers.size(); DCHECK(num_of_readers <= _read_row_groups.size()); if (_read_row_groups.empty()) { @@ -322,4 +322,4 @@ int64_t ParquetReader::_get_column_start_offset(const tparquet::ColumnMetaData& } return column.data_page_offset; } -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 9facffa623..95ffa10bd6 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -28,6 +28,7 @@ #include "gen_cpp/parquet_types.h" #include "io/file_reader.h" #include "vec/core/block.h" +#include "vec/exec/format/generic_reader.h" #include "vparquet_file_metadata.h" #include "vparquet_group_reader.h" #include "vparquet_page_index.h" @@ -63,18 +64,18 @@ private: // int64_t chunk_size; }; -class ParquetReader { +class ParquetReader : public GenericReader { public: ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file, size_t batch_size, int64_t range_start_offset, int64_t range_size, cctz::time_zone* ctz); - ~ParquetReader(); + virtual ~ParquetReader(); Status init_reader(const TupleDescriptor* tuple_desc, const std::vector& tuple_slot_descs, std::vector& conjunct_ctxs, const std::string& timezone); - Status read_next_batch(Block* block, bool* eof); + Status get_next_block(Block* block, bool* eof) override; // std::shared_ptr& statistics() { return _statistics; } void close(); diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp index b204cd4174..9355e36a4c 100644 --- a/be/src/vec/exec/scan/new_file_arrow_scanner.cpp +++ b/be/src/vec/exec/scan/new_file_arrow_scanner.cpp @@ -215,13 +215,13 @@ Status NewFileArrowScanner::_open_next_reader() { int32_t num_of_columns_from_file = _file_slot_descs.size(); _cur_file_reader = - _new_arrow_reader(file_reader.release(), _state->batch_size(), - num_of_columns_from_file, range.start_offset, range.size); + _new_arrow_reader(_file_slot_descs, file_reader.release(), num_of_columns_from_file, + range.start_offset, range.size); auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_parent->output_tuple_id()); // TODO _conjunct_ctxs is empty for now. _conjunct_ctxs is not empty. - Status status = _cur_file_reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs, - _state->timezone()); + Status status = + _cur_file_reader->init_reader(tuple_desc, _conjunct_ctxs, _state->timezone()); if (status.is_end_of_file()) { continue; } else { @@ -246,12 +246,10 @@ NewFileParquetScanner::NewFileParquetScanner(RuntimeState* state, NewFileScanNod // _init_profiles(profile); } -ArrowReaderWrap* NewFileParquetScanner::_new_arrow_reader(FileReader* file_reader, - int64_t batch_size, - int32_t num_of_columns_from_file, - int64_t range_start_offset, - int64_t range_size) { - return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file, +ArrowReaderWrap* NewFileParquetScanner::_new_arrow_reader( + const std::vector& file_slot_descs, FileReader* file_reader, + int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) { + return new ParquetReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file, range_start_offset, range_size, false); } @@ -262,11 +260,10 @@ NewFileORCScanner::NewFileORCScanner(RuntimeState* state, NewFileScanNode* paren : NewFileArrowScanner(state, parent, limit, scan_range, tracker, profile, pre_filter_texprs) {} -ArrowReaderWrap* NewFileORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file, - int64_t range_start_offset, - int64_t range_size) { - return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file, range_start_offset, - range_size, false); +ArrowReaderWrap* NewFileORCScanner::_new_arrow_reader( + const std::vector& file_slot_descs, FileReader* file_reader, + int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) { + return new ORCReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file, + range_start_offset, range_size, false); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/new_file_arrow_scanner.h b/be/src/vec/exec/scan/new_file_arrow_scanner.h index d1447fbf0c..89e76f6623 100644 --- a/be/src/vec/exec/scan/new_file_arrow_scanner.h +++ b/be/src/vec/exec/scan/new_file_arrow_scanner.h @@ -34,7 +34,8 @@ public: protected: Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) override; - virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, + virtual ArrowReaderWrap* _new_arrow_reader(const std::vector& file_slot_descs, + FileReader* file_reader, int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) = 0; // Convert input block to output block, if needed. @@ -64,9 +65,9 @@ public: ~NewFileParquetScanner() override = default; protected: - ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file, int64_t range_start_offset, - int64_t range_size) override; + ArrowReaderWrap* _new_arrow_reader(const std::vector& file_slot_descs, + FileReader* file_reader, int32_t num_of_columns_from_file, + int64_t range_start_offset, int64_t range_size) override; void _init_profiles(RuntimeProfile* profile) override {}; }; @@ -80,9 +81,9 @@ public: ~NewFileORCScanner() override = default; protected: - ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file, int64_t range_start_offset, - int64_t range_size) override; + ArrowReaderWrap* _new_arrow_reader(const std::vector& file_slot_descs, + FileReader* file_reader, int32_t num_of_columns_from_file, + int64_t range_start_offset, int64_t range_size) override; void _init_profiles(RuntimeProfile* profile) override {}; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp index 1e5597f9a0..eb120470a5 100644 --- a/be/src/vec/exec/varrow_scanner.cpp +++ b/be/src/vec/exec/varrow_scanner.cpp @@ -77,11 +77,11 @@ Status VArrowScanner::_open_next_reader() { num_of_columns_from_file = range.num_of_columns_from_file; } _cur_file_reader = - _new_arrow_reader(file_reader.release(), _state->batch_size(), - num_of_columns_from_file, range.start_offset, range.size); + _new_arrow_reader(_src_slot_descs, file_reader.release(), num_of_columns_from_file, + range.start_offset, range.size); auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); - Status status = _cur_file_reader->init_reader(tuple_desc, _src_slot_descs, _conjunct_ctxs, - _state->timezone()); + Status status = + _cur_file_reader->init_reader(tuple_desc, _conjunct_ctxs, _state->timezone()); if (status.is_end_of_file()) { continue; @@ -287,4 +287,4 @@ void VArrowScanner::close() { } } -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/exec/varrow_scanner.h b/be/src/vec/exec/varrow_scanner.h index 7eff7ab329..e67300332d 100644 --- a/be/src/vec/exec/varrow_scanner.h +++ b/be/src/vec/exec/varrow_scanner.h @@ -63,7 +63,8 @@ public: virtual void close() override; protected: - virtual ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, + virtual ArrowReaderWrap* _new_arrow_reader(const std::vector& file_slot_descs, + FileReader* file_reader, int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) = 0; diff --git a/be/src/vec/exec/vorc_scanner.cpp b/be/src/vec/exec/vorc_scanner.cpp index 81db28f2e6..4c7c80d731 100644 --- a/be/src/vec/exec/vorc_scanner.cpp +++ b/be/src/vec/exec/vorc_scanner.cpp @@ -29,11 +29,12 @@ VORCScanner::VORCScanner(RuntimeState* state, RuntimeProfile* profile, : VArrowScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter) {} -ArrowReaderWrap* VORCScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, +ArrowReaderWrap* VORCScanner::_new_arrow_reader(const std::vector& file_slot_descs, + FileReader* file_reader, int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) { - return new ORCReaderWrap(file_reader, batch_size, num_of_columns_from_file, range_start_offset, - range_size); + return new ORCReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file, + range_start_offset, range_size); } } // namespace doris::vectorized diff --git a/be/src/vec/exec/vorc_scanner.h b/be/src/vec/exec/vorc_scanner.h index b7bd1fdf67..78002e8a43 100644 --- a/be/src/vec/exec/vorc_scanner.h +++ b/be/src/vec/exec/vorc_scanner.h @@ -46,9 +46,9 @@ public: ~VORCScanner() override = default; protected: - ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file, int64_t range_start_offset, - int64_t range_size) override; + ArrowReaderWrap* _new_arrow_reader(const std::vector& file_slot_descs, + FileReader* file_reader, int32_t num_of_columns_from_file, + int64_t range_start_offset, int64_t range_size) override; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp index f4d74a6207..4176ba5899 100644 --- a/be/src/vec/exec/vparquet_scanner.cpp +++ b/be/src/vec/exec/vparquet_scanner.cpp @@ -30,11 +30,10 @@ VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile, : VArrowScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter) {} -ArrowReaderWrap* VParquetScanner::_new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file, - int64_t range_start_offset, - int64_t range_size) { - return new ParquetReaderWrap(file_reader, batch_size, num_of_columns_from_file, +ArrowReaderWrap* VParquetScanner::_new_arrow_reader( + const std::vector& file_slot_descs, FileReader* file_reader, + int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) { + return new ParquetReaderWrap(_state, file_slot_descs, file_reader, num_of_columns_from_file, range_start_offset, range_size); } diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/vparquet_scanner.h index d8cf597dbe..7af00a1c38 100644 --- a/be/src/vec/exec/vparquet_scanner.h +++ b/be/src/vec/exec/vparquet_scanner.h @@ -47,9 +47,9 @@ public: ~VParquetScanner() override = default; protected: - ArrowReaderWrap* _new_arrow_reader(FileReader* file_reader, int64_t batch_size, - int32_t num_of_columns_from_file, int64_t range_start_offset, - int64_t range_size) override; + ArrowReaderWrap* _new_arrow_reader(const std::vector& file_slot_descs, + FileReader* file_reader, int32_t num_of_columns_from_file, + int64_t range_start_offset, int64_t range_size) override; }; } // namespace doris::vectorized diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp b/be/test/vec/exec/parquet/parquet_reader_test.cpp index 895fbbd4ac..f763a6bd5a 100644 --- a/be/test/vec/exec/parquet/parquet_reader_test.cpp +++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp @@ -112,7 +112,7 @@ TEST_F(ParquetReaderTest, normal) { ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name())); } bool eof = false; - p_reader->read_next_batch(block, &eof); + p_reader->get_next_block(block, &eof); for (auto& col : block->get_columns_with_type_and_name()) { ASSERT_EQ(col.column->size(), 10); } @@ -236,4 +236,4 @@ TEST_F(ParquetReaderTest, scanner) { } } // namespace vectorized -} // namespace doris \ No newline at end of file +} // namespace doris