From e8a344b683d8145f0da3f55ddeda25f8b2d85312 Mon Sep 17 00:00:00 2001 From: slothever <18522955+wsjz@users.noreply.github.com> Date: Mon, 8 Aug 2022 10:21:24 +0800 Subject: [PATCH] [feature-wip](parquet-reader) add predicate filter and column reader (#11488) --- be/src/vec/exec/file_hdfs_scanner.cpp | 28 ++- be/src/vec/exec/file_hdfs_scanner.h | 14 +- .../format/parquet/vparquet_column_reader.cpp | 51 +++- .../format/parquet/vparquet_column_reader.h | 36 ++- .../format/parquet/vparquet_group_reader.cpp | 225 ++++++++++++++++-- .../format/parquet/vparquet_group_reader.h | 55 ++++- .../exec/format/parquet/vparquet_reader.cpp | 53 +++-- .../vec/exec/format/parquet/vparquet_reader.h | 29 ++- be/test/CMakeLists.txt | 1 + .../vec/exec/parquet/parquet_reader_test.cpp | 48 ++++ 10 files changed, 468 insertions(+), 72 deletions(-) create mode 100644 be/test/vec/exec/parquet/parquet_reader_test.cpp diff --git a/be/src/vec/exec/file_hdfs_scanner.cpp b/be/src/vec/exec/file_hdfs_scanner.cpp index e45a78d316..08c5084b6d 100644 --- a/be/src/vec/exec/file_hdfs_scanner.cpp +++ b/be/src/vec/exec/file_hdfs_scanner.cpp @@ -17,6 +17,8 @@ #include "file_hdfs_scanner.h" +#include "io/file_factory.h" + namespace doris::vectorized { Status ParquetFileHdfsScanner::open() { @@ -24,7 +26,31 @@ Status ParquetFileHdfsScanner::open() { } Status ParquetFileHdfsScanner::get_next(vectorized::Block* block, bool* eof) { - return Status(); + // todo: get block from queue + auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId); + if (_next_range >= _ranges.size()) { + _scanner_eof = true; + return Status::OK(); + } + const TFileRangeDesc& range = _ranges[_next_range++]; + std::unique_ptr file_reader; + RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, range, + file_reader)); + _reader.reset(new ParquetReader(file_reader.release(), _file_slot_descs.size(), + range.start_offset, range.size)); + Status status = + _reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs, _state->timezone()); + if (!status.ok()) { + _scanner_eof = true; + return Status::OK(); + } + while (_reader->has_next()) { + Status st = _reader->read_next_batch(block); + if (st.is_end_of_file()) { + break; + } + } + return Status::OK(); } void ParquetFileHdfsScanner::close() {} diff --git a/be/src/vec/exec/file_hdfs_scanner.h b/be/src/vec/exec/file_hdfs_scanner.h index 266315f4d1..3db196a83c 100644 --- a/be/src/vec/exec/file_hdfs_scanner.h +++ b/be/src/vec/exec/file_hdfs_scanner.h @@ -18,18 +18,13 @@ #pragma once #include "common/status.h" +#include "file_scanner.h" #include "vec/core/block.h" +#include "vec/exec/format/parquet/vparquet_reader.h" namespace doris::vectorized { -class HdfsFileScanner { -public: - virtual Status open() = 0; - - virtual Status get_next(vectorized::Block* block, bool* eof) = 0; - - virtual void close() = 0; -}; +class HdfsFileScanner : public FileScanner {}; class ParquetFileHdfsScanner : public HdfsFileScanner { public: @@ -41,6 +36,9 @@ public: private: void _prefetch_batch(); + +private: + std::shared_ptr _reader; }; } // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index ae48305f5f..547dfba3bd 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -17,4 +17,53 @@ #include "vparquet_column_reader.h" -namespace doris::vectorized {} \ No newline at end of file +#include +#include + +#include "schema_desc.h" +#include "vparquet_column_chunk_reader.h" + +namespace doris::vectorized { + +Status ScalarColumnReader::init(const FileReader* file, const FieldSchema* field, + const tparquet::ColumnChunk* chunk, const TypeDescriptor& col_type, + int64_t chunk_size) { + // todo1: init column chunk reader + // BufferedFileStreamReader stream_reader(reader, 0, chunk_size); + // _chunk_reader(&stream_reader, chunk, field); + // _chunk_reader.init(); + return Status(); +} + +Status ParquetColumnReader::create(const FileReader* file, int64_t chunk_size, + const FieldSchema* field, const ParquetReadColumn& column, + const TypeDescriptor& col_type, + const tparquet::RowGroup& row_group, + const ParquetColumnReader* reader) { + if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) { + return Status::Corruption("not supported type"); + } + if (field->type.type == TYPE_ARRAY) { + return Status::Corruption("not supported array type yet"); + } else { + ScalarColumnReader* scalar_reader = new ScalarColumnReader(column); + RETURN_IF_ERROR(scalar_reader->init(file, field, + &row_group.columns[field->physical_column_index], + col_type, chunk_size)); + reader = scalar_reader; + } + return Status::OK(); +} + +Status ScalarColumnReader::read_column_data(const tparquet::RowGroup& row_group_meta, + ColumnPtr* data) { + // todo2: read data with chunk reader to load page data + // while (_chunk_reader.has_next) { + // _chunk_reader.next_page(); + // _chunk_reader.load_page_data(); + // } + return Status(); +} + +void ScalarColumnReader::close() {} +}; // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index a16ee4d9f3..68a38607e6 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -17,5 +17,39 @@ #pragma once #include +#include -namespace doris::vectorized {} \ No newline at end of file +#include "schema_desc.h" +#include "vparquet_reader.h" +//#include "vparquet_column_chunk_reader.h" + +namespace doris::vectorized { + +class ParquetReadColumn; + +class ParquetColumnReader { +public: + ParquetColumnReader(const ParquetReadColumn& column) : _column(column) {}; + virtual ~ParquetColumnReader() = 0; + virtual Status read_column_data(const tparquet::RowGroup& row_group_meta, ColumnPtr* data) = 0; + static Status create(const FileReader* file, int64_t chunk_size, const FieldSchema* field, + const ParquetReadColumn& column, const TypeDescriptor& col_type, + const tparquet::RowGroup& row_group, const ParquetColumnReader* reader); + virtual void close() = 0; + +protected: + const ParquetReadColumn& _column; + // const ColumnChunkReader& _chunk_reader; +}; + +class ScalarColumnReader : public ParquetColumnReader { +public: + ScalarColumnReader(const ParquetReadColumn& column) : ParquetColumnReader(column) {}; + ~ScalarColumnReader() override = default; + Status init(const FileReader* file, const FieldSchema* field, + const tparquet::ColumnChunk* chunk, const TypeDescriptor& col_type, + int64_t chunk_size); + Status read_column_data(const tparquet::RowGroup& row_group_meta, ColumnPtr* data) override; + void close() override; +}; +}; // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index ca98c08b2a..25b7819e8f 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -17,55 +17,226 @@ #include "vparquet_group_reader.h" +#include "parquet_pred_cmp.h" +#include "schema_desc.h" +#include "vparquet_column_reader.h" + namespace doris::vectorized { RowGroupReader::RowGroupReader(doris::FileReader* file_reader, - std::shared_ptr file_metadata, - const std::vector& column_ids) + const std::shared_ptr& file_metadata, + const std::vector& read_columns, + const std::map& map_column, + const std::vector& conjunct_ctxs) : _file_reader(file_reader), _file_metadata(file_metadata), - _column_ids(column_ids), - _current_row_group(0) { - DCHECK_LE(column_ids.size(), file_metadata->num_columns()); - _init_column_readers(column_ids); + _read_columns(read_columns), + _map_column(map_column), + _conjunct_ctxs(conjunct_ctxs), + _current_row_group(-1) {} + +RowGroupReader::~RowGroupReader() { + for (auto& column_reader : _column_readers) { + auto reader = column_reader.second; + reader->close(); + delete reader; + reader = nullptr; + } + _column_readers.clear(); } -void RowGroupReader::_init_column_readers(const std::vector& column_ids) { - // for (int col_id: column_ids) { - // - // } +Status RowGroupReader::init(const TupleDescriptor* tuple_desc, int64_t split_start_offset, + int64_t split_size) { + _tuple_desc = tuple_desc; + _split_start_offset = split_start_offset; + _split_size = split_size; + _init_conjuncts(tuple_desc, _conjunct_ctxs); + RETURN_IF_ERROR(_init_column_readers()); + return Status::OK(); } -Status RowGroupReader::read_next_row_group(const int32_t* group_id) { - int32_t total_group = _file_metadata->num_row_groups(); - while (_current_row_group < total_group) { - bool filter_group = false; - _process_row_group_filter(&filter_group); - if (!filter_group) { - group_id = &_current_row_group; +void RowGroupReader::_init_conjuncts(const TupleDescriptor* tuple_desc, + const std::vector& conjunct_ctxs) { + if (tuple_desc->slots().empty()) { + return; + } + for (auto& read_col : _read_columns) { + _parquet_column_ids.emplace(read_col.parquet_column_id); + } + + for (int i = 0; i < tuple_desc->slots().size(); i++) { + auto col_iter = _map_column.find(tuple_desc->slots()[i]->col_name()); + if (col_iter == _map_column.end()) { + continue; } - _current_row_group++; + int parquet_col_id = col_iter->second; + if (_parquet_column_ids.end() == _parquet_column_ids.find(parquet_col_id)) { + continue; + } + for (int conj_idx = 0; conj_idx < conjunct_ctxs.size(); conj_idx++) { + Expr* conjunct = conjunct_ctxs[conj_idx]->root(); + if (conjunct->get_num_children() == 0) { + continue; + } + Expr* raw_slot = conjunct->get_child(0); + if (TExprNodeType::SLOT_REF != raw_slot->node_type()) { + continue; + } + SlotRef* slot_ref = (SlotRef*)raw_slot; + SlotId conjunct_slot_id = slot_ref->slot_id(); + if (conjunct_slot_id == tuple_desc->slots()[i]->id()) { + // Get conjuncts by conjunct_slot_id + auto iter = _slot_conjuncts.find(conjunct_slot_id); + if (_slot_conjuncts.end() == iter) { + std::vector conjuncts; + conjuncts.emplace_back(conjunct_ctxs[conj_idx]); + _slot_conjuncts.emplace(std::make_pair(conjunct_slot_id, conjuncts)); + } else { + std::vector conjuncts = iter->second; + conjuncts.emplace_back(conjunct_ctxs[conj_idx]); + } + } + } + } +} + +Status RowGroupReader::_init_column_readers() { + for (auto& read_col : _read_columns) { + SlotDescriptor* slot_desc = read_col.slot_desc; + FieldDescriptor schema = _file_metadata->schema(); + TypeDescriptor col_type = slot_desc->type(); + const auto& field = schema.get_column(slot_desc->col_name()); + const tparquet::RowGroup row_group = + _file_metadata->to_thrift_metadata().row_groups[_current_row_group]; + ParquetColumnReader* reader = nullptr; + RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, MAX_PARQUET_BLOCK_SIZE, field, + read_col, slot_desc->type(), row_group, + reader)); + if (reader == nullptr) { + return Status::Corruption("Init row group reader failed"); + } + _column_readers[slot_desc->id()] = reader; } return Status::OK(); } -Status RowGroupReader::_process_row_group_filter(bool* filter_group) { +Status RowGroupReader::fill_columns_data(Block* block, const int32_t group_id) { + // get ColumnWithTypeAndName from src_block + for (auto& read_col : _read_columns) { + const tparquet::RowGroup row_group = + _file_metadata->to_thrift_metadata().row_groups[_current_row_group]; + auto& column_with_type_and_name = block->get_by_name(read_col.slot_desc->col_name()); + RETURN_IF_ERROR(_column_readers[read_col.slot_desc->id()]->read_column_data( + row_group, &column_with_type_and_name.column)); + VLOG_DEBUG << column_with_type_and_name.name; + } + // use data fill utils read column data to column ptr + return Status::OK(); +} + +Status RowGroupReader::get_next_row_group(const int32_t* group_id) { + int32_t total_group = _file_metadata->num_row_groups(); + if (total_group == 0 || _file_metadata->num_rows() == 0 || _split_size < 0) { + return Status::EndOfFile("No row group need read"); + } + while (_current_row_group < total_group) { + _current_row_group++; + const tparquet::RowGroup& row_group = + _file_metadata->to_thrift_metadata().row_groups[_current_row_group]; + if (!_is_misaligned_range_group(row_group)) { + continue; + } + bool filter_group = false; + RETURN_IF_ERROR(_process_row_group_filter(row_group, _conjunct_ctxs, &filter_group)); + if (!filter_group) { + group_id = &_current_row_group; + break; + } + } + return Status::OK(); +} + +bool RowGroupReader::_is_misaligned_range_group(const tparquet::RowGroup& row_group) { + int64_t start_offset = _get_column_start_offset(row_group.columns[0].meta_data); + + auto last_column = row_group.columns[row_group.columns.size() - 1].meta_data; + int64_t end_offset = _get_column_start_offset(last_column) + last_column.total_compressed_size; + + int64_t row_group_mid = start_offset + (end_offset - start_offset) / 2; + if (!(row_group_mid >= _split_start_offset && + row_group_mid < _split_start_offset + _split_size)) { + return true; + } + return false; +} + +Status RowGroupReader::_process_row_group_filter(const tparquet::RowGroup& row_group, + const std::vector& conjunct_ctxs, + bool* filter_group) { + _process_column_stat_filter(row_group, conjunct_ctxs, filter_group); _init_chunk_dicts(); - RETURN_IF_ERROR(_process_dict_filter()); + RETURN_IF_ERROR(_process_dict_filter(filter_group)); _init_bloom_filter(); - RETURN_IF_ERROR(_process_bloom_filter()); + RETURN_IF_ERROR(_process_bloom_filter(filter_group)); + return Status::OK(); +} + +Status RowGroupReader::_process_column_stat_filter(const tparquet::RowGroup& row_group, + const std::vector& conjunct_ctxs, + bool* filter_group) { + int total_group = _file_metadata->num_row_groups(); + // It will not filter if head_group_offset equals tail_group_offset + int64_t total_rows = 0; + int64_t total_bytes = 0; + for (int row_group_id = 0; row_group_id < total_group; row_group_id++) { + total_rows += row_group.num_rows; + total_bytes += row_group.total_byte_size; + for (SlotId slot_id = 0; slot_id < _tuple_desc->slots().size(); slot_id++) { + const std::string& col_name = _tuple_desc->slots()[slot_id]->col_name(); + auto col_iter = _map_column.find(col_name); + if (col_iter == _map_column.end()) { + continue; + } + int parquet_col_id = col_iter->second; + if (_parquet_column_ids.end() == _parquet_column_ids.find(parquet_col_id)) { + // Column not exist in parquet file + continue; + } + auto slot_iter = _slot_conjuncts.find(slot_id); + if (slot_iter == _slot_conjuncts.end()) { + continue; + } + auto statistic = row_group.columns[parquet_col_id].meta_data.statistics; + if (!statistic.__isset.max || !statistic.__isset.min) { + continue; + } + // Min-max of statistic is plain-encoded value + *filter_group = + _determine_filter_row_group(slot_iter->second, statistic.min, statistic.max); + if (*filter_group) { + _filtered_num_row_groups++; + VLOG_DEBUG << "Filter row group id: " << row_group_id; + break; + } + } + } + VLOG_DEBUG << "DEBUG total_rows: " << total_rows; + VLOG_DEBUG << "DEBUG total_bytes: " << total_bytes; + VLOG_DEBUG << "Parquet file: " << _file_metadata->schema().debug_string() + << ", Num of read row group: " << total_group + << ", and num of skip row group: " << _filtered_num_row_groups; return Status::OK(); } void RowGroupReader::_init_chunk_dicts() {} -Status RowGroupReader::_process_dict_filter() { +Status RowGroupReader::_process_dict_filter(bool* filter_group) { return Status(); } void RowGroupReader::_init_bloom_filter() {} -Status RowGroupReader::_process_bloom_filter() { +Status RowGroupReader::_process_bloom_filter(bool* filter_group) { RETURN_IF_ERROR(_file_reader->seek(0)); return Status(); } @@ -76,4 +247,12 @@ int64_t RowGroupReader::_get_row_group_start_offset(const tparquet::RowGroup& ro } return row_group.columns[0].meta_data.data_page_offset; } + +int64_t RowGroupReader::_get_column_start_offset(const tparquet::ColumnMetaData& column) { + if (column.__isset.dictionary_page_offset) { + DCHECK_LT(column.dictionary_page_offset, column.data_page_offset); + return column.dictionary_page_offset; + } + return column.data_page_offset; +} } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index f2b05b25e1..b69852f124 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -19,32 +19,54 @@ #include "exprs/expr_context.h" #include "io/file_reader.h" +#include "vec/core/block.h" +#include "vparquet_column_reader.h" #include "vparquet_file_metadata.h" +#include "vparquet_reader.h" + +#define MAX_PARQUET_BLOCK_SIZE 1024 namespace doris::vectorized { +class ParquetReadColumn; +class ParquetColumnReader; class RowGroupReader { public: - RowGroupReader(doris::FileReader* file_reader, std::shared_ptr file_metadata, - const std::vector& column_ids); - - ~RowGroupReader() = default; - - Status read_next_row_group(const int32_t* group_id); + RowGroupReader(doris::FileReader* file_reader, + const std::shared_ptr& file_metadata, + const std::vector& read_columns, + const std::map& map_column, + const std::vector& conjunct_ctxs); + ~RowGroupReader(); + Status init(const TupleDescriptor* tuple_desc, int64_t split_start_offset, int64_t split_size); + Status get_next_row_group(const int32_t* group_id); + Status fill_columns_data(Block* block, const int32_t group_id); private: - void _init_column_readers(const std::vector& column_ids); + bool _is_misaligned_range_group(const tparquet::RowGroup& row_group); - Status _process_row_group_filter(bool* filter_group); + Status _process_column_stat_filter(const tparquet::RowGroup& row_group, + const std::vector& conjunct_ctxs, + bool* filter_group); + + void _init_conjuncts(const TupleDescriptor* tuple_desc, + const std::vector& conjunct_ctxs); + + Status _init_column_readers(); + + Status _process_row_group_filter(const tparquet::RowGroup& row_group, + const std::vector& conjunct_ctxs, + bool* filter_group); void _init_chunk_dicts(); - Status _process_dict_filter(); + Status _process_dict_filter(bool* filter_group); void _init_bloom_filter(); - Status _process_bloom_filter(); + Status _process_bloom_filter(bool* filter_group); int64_t _get_row_group_start_offset(const tparquet::RowGroup& row_group); + int64_t _get_column_start_offset(const tparquet::ColumnMetaData& column_init_column_readers); bool _determine_filter_row_group(const std::vector& conjuncts, const std::string& encoded_min, @@ -72,8 +94,17 @@ private: private: doris::FileReader* _file_reader; - std::shared_ptr _file_metadata; - std::vector _column_ids; + const std::shared_ptr& _file_metadata; + std::unordered_map _column_readers; + const TupleDescriptor* _tuple_desc; // get all slot info + const std::vector& _read_columns; + const std::map& _map_column; + std::unordered_set _parquet_column_ids; + const std::vector& _conjunct_ctxs; + std::unordered_map> _slot_conjuncts; + int64_t _split_start_offset; + int64_t _split_size; int32_t _current_row_group; + int32_t _filtered_num_row_groups = 0; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index a849d9b681..815cba3c15 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -22,7 +22,9 @@ namespace doris::vectorized { ParquetReader::ParquetReader(FileReader* file_reader, int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t range_size) - : _num_of_columns_from_file(num_of_columns_from_file) { + : _num_of_columns_from_file(num_of_columns_from_file), + _range_start_offset(range_start_offset), + _range_size(range_size) { _file_reader = file_reader; _total_groups = 0; // _current_group = 0; @@ -53,44 +55,48 @@ Status ParquetReader::init_reader(const TupleDescriptor* tuple_desc, if (_total_groups == 0) { return Status::EndOfFile("Empty Parquet File"); } - auto schemaDescriptor = _file_metadata->schema(); + auto schema_desc = _file_metadata->schema(); for (int i = 0; i < _file_metadata->num_columns(); ++i) { - LOG(WARNING) << schemaDescriptor.debug_string(); - // // Get the Column Reader for the boolean column - // if (schemaDescriptor->(i)->max_definition_level() > 1) { - // _map_column.emplace(schemaDescriptor->(i)->path()->ToDotVector()[0], i); - // } else { - // _map_column.emplace(schemaDescriptor->(i)->name(), i); - // } + LOG(WARNING) << schema_desc.debug_string(); + // Get the Column Reader for the boolean column + _map_column.emplace(schema_desc.get_column(i)->name, i); } LOG(WARNING) << ""; - RETURN_IF_ERROR(_column_indices(tuple_slot_descs)); - _init_row_group_reader(); + RETURN_IF_ERROR(_init_read_columns(tuple_slot_descs)); + RETURN_IF_ERROR( + _init_row_group_reader(tuple_desc, _range_start_offset, _range_size, conjunct_ctxs)); return Status::OK(); } -Status ParquetReader::_column_indices(const std::vector& tuple_slot_descs) { +Status ParquetReader::_init_read_columns(const std::vector& tuple_slot_descs) { DCHECK(_num_of_columns_from_file <= tuple_slot_descs.size()); _include_column_ids.clear(); for (int i = 0; i < _num_of_columns_from_file; i++) { auto slot_desc = tuple_slot_descs.at(i); // Get the Column Reader for the boolean column auto iter = _map_column.find(slot_desc->col_name()); + auto parquet_col_id = iter->second; if (iter != _map_column.end()) { - _include_column_ids.emplace_back(iter->second); + _include_column_ids.emplace_back(parquet_col_id); } else { std::stringstream str_error; str_error << "Invalid Column Name:" << slot_desc->col_name(); LOG(WARNING) << str_error.str(); return Status::InvalidArgument(str_error.str()); } + ParquetReadColumn column; + column.slot_desc = slot_desc; + column.parquet_column_id = parquet_col_id; + auto physical_type = _file_metadata->schema().get_column(parquet_col_id)->physical_type; + column.parquet_type = physical_type; + _read_columns.emplace_back(column); } return Status::OK(); } Status ParquetReader::read_next_batch(Block* block) { int32_t group_id = 0; - RETURN_IF_ERROR(_row_group_reader->read_next_row_group(&group_id)); + RETURN_IF_ERROR(_row_group_reader->get_next_row_group(&group_id)); auto metadata = _file_metadata->to_thrift_metadata(); auto column_chunks = metadata.row_groups[group_id].columns; if (_has_page_index(column_chunks)) { @@ -104,19 +110,26 @@ Status ParquetReader::read_next_batch(Block* block) { } } // metadata has been processed, fill parquet data to block - _fill_block_data(column_chunks); - block = _batch; + // block is the batch data of a row group. a row group has N batch // push to scanner queue + _fill_block_data(block, group_id); return Status::OK(); } -void ParquetReader::_fill_block_data(std::vector columns) { +void ParquetReader::_fill_block_data(Block* block, int group_id) { + // make and init src block here // read column chunk - // _batch = + _row_group_reader->fill_columns_data(block, group_id); } -void ParquetReader::_init_row_group_reader() { - _row_group_reader.reset(new RowGroupReader(_file_reader, _file_metadata, _include_column_ids)); +Status ParquetReader::_init_row_group_reader(const TupleDescriptor* tuple_desc, + int64_t range_start_offset, int64_t range_size, + const std::vector& conjunct_ctxs) { + // todo: extract as create() + _row_group_reader.reset(new RowGroupReader(_file_reader, _file_metadata, _read_columns, + _map_column, conjunct_ctxs)); + RETURN_IF_ERROR(_row_group_reader->init(tuple_desc, range_start_offset, range_size)); + return Status::OK(); } bool ParquetReader::_has_page_index(std::vector columns) { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 42ecdb54bf..a4cf4e70e2 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -41,6 +41,20 @@ namespace doris::vectorized { // int64_t filtered_total_bytes = 0; // int64_t total_bytes = 0; // }; +class RowGroupReader; + +class ParquetReadColumn { +public: + friend class ParquetReader; + friend class RowGroupReader; + +private: + SlotDescriptor* slot_desc; + int parquet_column_id; + tparquet::Type::type parquet_type; + // int64_t start_offset; + // int64_t chunk_size; +}; class ParquetReader { public: @@ -63,9 +77,11 @@ public: int64_t size() const { return _file_reader->size(); } private: - Status _column_indices(const std::vector& tuple_slot_descs); - void _init_row_group_reader(); - void _fill_block_data(std::vector columns); + Status _init_read_columns(const std::vector& tuple_slot_descs); + Status _init_row_group_reader(const TupleDescriptor* tuple_desc, int64_t range_start_offset, + int64_t range_size, + const std::vector& conjunct_ctxs); + void _fill_block_data(Block* block, int group_id); bool _has_page_index(std::vector columns); Status _process_page_index(std::vector columns); @@ -78,12 +94,13 @@ private: // int _current_group; // current group(stripe) // std::shared_ptr _statistics; const int32_t _num_of_columns_from_file; + std::map _map_column; // column-name <---> column-index std::vector _include_column_ids; // columns that need to get from file + std::vector _read_columns; // parquet file reader object - Block* _batch; bool* _batch_eof; - // int64_t _range_start_offset; - // int64_t _range_size; + int64_t _range_start_offset; + int64_t _range_size; }; } // namespace doris::vectorized diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index f503e3b4dc..a775256f84 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -60,6 +60,7 @@ set(EXEC_TEST_FILES exec/multi_bytes_separator_test.cpp exec/hdfs_file_reader_test.cpp vec/exec/parquet/parquet_thrift_test.cpp + vec/exec/parquet/parquet_reader_test.cpp # exec/new_olap_scan_node_test.cpp # exec/pre_aggregation_node_test.cpp # exec/partitioned_hash_table_test.cpp diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp b/be/test/vec/exec/parquet/parquet_reader_test.cpp new file mode 100644 index 0000000000..63d783f613 --- /dev/null +++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +//#include "io/buffered_reader.h" +//#include "io/file_reader.h" +//#include "io/local_file_reader.h" +//#include "util/runtime_profile.h" +//#include "vec/exec/format/parquet/vparquet_file_metadata.h" + +namespace doris { +namespace vectorized { + +class ParquetReaderTest : public testing::Test { +public: + ParquetReaderTest() {} +}; + +//TEST_F(ParquetReaderTest, normal) { +// LocalFileReader reader("./be/test/exec/test_data/parquet_scanner/localfile.parquet", 0); +// auto st = reader.open(); +// EXPECT_TRUE(st.ok()); +// std::shared_ptr metaData; +// parse_thrift_footer(&reader, metaData); +// tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata(); +// for (auto value : t_metadata.row_groups) { +// LOG(WARNING) << "row group num_rows: " << value.num_rows; +// } +//} + +} // namespace vectorized +} // namespace doris \ No newline at end of file