From b837b2eb9567f5085bed52115f49a668eb2435ef Mon Sep 17 00:00:00 2001 From: Ashin Gau Date: Tue, 20 Sep 2022 15:55:19 +0800 Subject: [PATCH] [feature-wip](parquet-reader) filter rows by page index (#12664) # Proposed changes [Parquet v1.11+ supports page skipping](https://github.com/apache/parquet-format/blob/master/PageIndex.md), which helps the scanner reduce the amount of data scanned, decompressed, decoded, and insertion. According to the performance FlameGraph, decompression takes up 20% cpu time. If a page can be filtered as a whole, the page can not be decompressed. However, the row numbers between pages are not aligned. Columns containing predicates can be filtered by page granularity, but other columns need to be skipped within pages, so non predicate columns can only save the decoding and insertion time. Array column needs the repetition level to align with other columns, so the array column can only save the decoding and insertion time. ## Explore `OffsetIndex` in the column metadata can locate the page position. Theoretically, a page can be completely skipped, including the time of reading from HDFS. However, the average size of a page is around 500KB. Skipping a page requires calling the `skip`. The performance of `skip` is low when it is called frequently, and may not be better than continuous reading of large blocks of data (such as 4MB). If multiple consecutive pages are filtered, `skip` reading can be performed according to`OffsetIndex`. However, for the convenience of programming and readability, the data of all pages are loaded and filtered in turn. --- .../vec/exec/format/parquet/level_decoder.h | 12 +- .../parquet/vparquet_column_chunk_reader.cpp | 12 + .../parquet/vparquet_column_chunk_reader.h | 22 +- .../format/parquet/vparquet_column_reader.cpp | 314 ++++++++++++------ .../format/parquet/vparquet_column_reader.h | 19 +- .../format/parquet/vparquet_page_reader.cpp | 20 +- .../format/parquet/vparquet_page_reader.h | 5 +- .../vec/exec/format/parquet/vparquet_reader.h | 2 + 8 files changed, 285 insertions(+), 121 deletions(-) diff --git a/be/src/vec/exec/format/parquet/level_decoder.h b/be/src/vec/exec/format/parquet/level_decoder.h index be6c6c6154..da9aefeb4e 100644 --- a/be/src/vec/exec/format/parquet/level_decoder.h +++ b/be/src/vec/exec/format/parquet/level_decoder.h @@ -35,18 +35,12 @@ public: Status init(Slice* slice, tparquet::Encoding::type encoding, level_t max_level, uint32_t num_levels); - bool has_levels() const { return _num_levels > 0; } + inline bool has_levels() const { return _num_levels > 0; } size_t get_levels(level_t* levels, size_t n); - size_t next_repeated_count() { - DCHECK_EQ(_encoding, tparquet::Encoding::RLE); - return _rle_decoder.repeated_count(); - } - - level_t get_repeated_value(size_t count) { - DCHECK_EQ(_encoding, tparquet::Encoding::RLE); - return _rle_decoder.get_repeated_value(count); + inline size_t get_next_run(level_t* val, size_t max_run) { + return _rle_decoder.GetNextRun(val, max_run); } private: diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index ed85e93f5c..0e7d1cd4ec 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -46,10 +46,17 @@ Status ColumnChunkReader::init() { // seek to the first data page _page_reader->seek_to_page(_metadata.data_page_offset); } + _state = INITIALIZED; return Status::OK(); } Status ColumnChunkReader::next_page() { + if (UNLIKELY(_state == NOT_INIT)) { + return Status::Corruption("Should initialize chunk reader"); + } + if (UNLIKELY(_remaining_num_values != 0)) { + return Status::Corruption("Should skip current page"); + } RETURN_IF_ERROR(_page_reader->next_page_header()); if (_page_reader->get_page_header()->type == tparquet::PageType::DICTIONARY_PAGE) { // the first page maybe directory page even if _metadata.__isset.dictionary_page_offset == false, @@ -59,11 +66,15 @@ Status ColumnChunkReader::next_page() { return next_page(); } else { _remaining_num_values = _page_reader->get_page_header()->data_page_header.num_values; + _state = HEADER_PARSED; return Status::OK(); } } Status ColumnChunkReader::load_page_data() { + if (UNLIKELY(_state != HEADER_PARSED)) { + return Status::Corruption("Should parse page header"); + } const auto& header = *_page_reader->get_page_header(); // int32_t compressed_size = header.compressed_page_size; int32_t uncompressed_size = header.uncompressed_page_size; @@ -113,6 +124,7 @@ Status ColumnChunkReader::load_page_data() { // Reset page data for each page _page_decoder->set_data(&_page_data); + _state = DATA_LOADED; return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 2c6b620cce..44f5b56ff2 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -69,14 +69,25 @@ public: // Seek to the specific page, page_header_offset must be the start offset of the page header. void seek_to_page(int64_t page_header_offset) { + _remaining_num_values = 0; _page_reader->seek_to_page(page_header_offset); + _state = INITIALIZED; } // Seek to next page. Only read and parse the page header. Status next_page(); // Skip current page(will not read and parse) if the page is filtered by predicates. - Status skip_page() { return _page_reader->skip_page(); } + Status skip_page() { + _remaining_num_values = 0; + if (_state == HEADER_PARSED) { + return _page_reader->skip_page(); + } + if (_state != DATA_LOADED) { + return Status::Corruption("Should parse page header to skip page"); + } + return Status::OK(); + } // Skip some values(will not read and parse) in current page if the values are filtered by predicates. // when skip_data = false, the underlying decoder will not skip data, // only used when maintaining the consistency of _remaining_num_values. @@ -85,6 +96,12 @@ public: // Load page data into the underlying container, // and initialize the repetition and definition level decoder for current page data. Status load_page_data(); + Status load_page_date_idempotent() { + if (_state == DATA_LOADED) { + return Status::OK(); + } + return load_page_data(); + } // The remaining number of values in current page(including null values). Decreased when reading or skipping. uint32_t remaining_num_values() const { return _remaining_num_values; }; // null values are generated from definition levels @@ -115,10 +132,13 @@ public: Decoder* get_page_decoder() { return _page_decoder; } private: + enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, DATA_LOADED }; + Status _decode_dict_page(); void _reserve_decompress_buf(size_t size); int32_t _get_type_length(); + ColumnChunkReaderState _state = NOT_INIT; FieldSchema* _field_schema; level_t _max_rep_level; level_t _max_def_level; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 273088dbbe..da433b7cba 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -60,15 +60,29 @@ void ParquetColumnReader::init_column_metadata(const tparquet::ColumnChunk& chun _metadata.reset(new ParquetColumnMetadata(chunk_start, chunk_len, chunk_meta)); } -void ParquetColumnReader::_reserve_def_levels_buf(size_t size) { - if (size > _def_levels_buf_size || _def_levels_buf == nullptr) { - _def_levels_buf_size = BitUtil::next_power_of_two(size); - _def_levels_buf.reset(new level_t[_def_levels_buf_size]); +void ParquetColumnReader::_generate_read_ranges(int64_t start_index, int64_t end_index, + std::list& read_ranges) { + if (_row_ranges.size() == 0) { + read_ranges.emplace_back(start_index, end_index); + } + int index = _row_range_index; + while (index < _row_ranges.size()) { + RowRange& read_range = _row_ranges[index]; + if (read_range.last_row <= start_index) { + index++; + _row_range_index++; + continue; + } + if (read_range.first_row >= end_index) { + break; + } + int64_t start = read_range.first_row < start_index ? start_index : read_range.first_row; + int64_t end = read_range.last_row < end_index ? read_range.last_row : end_index; + read_ranges.emplace_back(start, end); + index++; } } -void ParquetColumnReader::_skipped_pages() {} - Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, std::vector& row_ranges) { _stream_reader = @@ -85,6 +99,59 @@ Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, tparquet:: return Status::OK(); } +Status ScalarColumnReader::_skip_values(size_t num_values) { + if (_chunk_reader->max_def_level() > 0) { + LevelDecoder& def_decoder = _chunk_reader->def_level_decoder(); + size_t skipped = 0; + while (skipped < num_values) { + level_t def_level; + size_t loop_skip = def_decoder.get_next_run(&def_level, num_values - skipped); + RETURN_IF_ERROR(_chunk_reader->skip_values(loop_skip, def_level == 1)); + skipped += loop_skip; + } + } else { + RETURN_IF_ERROR(_chunk_reader->skip_values(num_values)); + } + return Status::OK(); +} + +Status ScalarColumnReader::_read_values(size_t num_values, ColumnPtr& doris_column, + DataTypePtr& type) { + CHECK(doris_column->is_nullable()); + auto* nullable_column = reinterpret_cast( + (*std::move(doris_column)).mutate().get()); + MutableColumnPtr data_column = nullable_column->get_nested_column_ptr(); + NullMap& map_data = nullable_column->get_null_map_data(); + + if (_chunk_reader->max_def_level() > 0) { + LevelDecoder& def_decoder = _chunk_reader->def_level_decoder(); + size_t has_read = 0; + while (has_read < num_values) { + level_t def_level; + size_t loop_read = def_decoder.get_next_run(&def_level, num_values - has_read); + bool is_null = def_level == 0; + // fill NullMap + for (int i = 0; i < loop_read; ++i) { + map_data.emplace_back(is_null); + } + // decode data + if (is_null) { + // null values + _chunk_reader->insert_null_values(data_column, loop_read); + } else { + RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, loop_read)); + } + has_read += loop_read; + } + } else { + for (int i = 0; i < num_values; ++i) { + map_data.emplace_back(false); + } + RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, num_values)); + } + return Status::OK(); +} + Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size, size_t* read_rows, bool* eof) { if (_chunk_reader->remaining_num_values() == 0) { @@ -94,61 +161,39 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr return Status::OK(); } RETURN_IF_ERROR(_chunk_reader->next_page()); - if (_row_ranges.size() != 0) { - _skipped_pages(); - } - RETURN_IF_ERROR(_chunk_reader->load_page_data()); } - size_t read_values = _chunk_reader->remaining_num_values() < batch_size - ? _chunk_reader->remaining_num_values() - : batch_size; - // get definition levels, and generate null values - _reserve_def_levels_buf(read_values); - level_t* definitions = _def_levels_buf.get(); - if (_chunk_reader->max_def_level() == 0) { // required field - std::fill(definitions, definitions + read_values, 1); + + // generate the row ranges that should be read + std::list read_ranges; + _generate_read_ranges(_current_row_index, + _current_row_index + _chunk_reader->remaining_num_values(), read_ranges); + if (read_ranges.size() == 0) { + // skip the whole page + _current_row_index += _chunk_reader->remaining_num_values(); + RETURN_IF_ERROR(_chunk_reader->skip_page()); + *read_rows = 0; } else { - _chunk_reader->get_def_levels(definitions, read_values); - } - // fill NullMap - // TODO cmy why must nullable? - CHECK(doris_column->is_nullable()); - auto* nullable_column = reinterpret_cast( - (*std::move(doris_column)).mutate().get()); - MutableColumnPtr data_column = nullable_column->get_nested_column_ptr(); - NullMap& map_data = nullable_column->get_null_map_data(); - for (int i = 0; i < read_values; ++i) { - map_data.emplace_back(definitions[i] == 0); - } - // decode data - if (_chunk_reader->max_def_level() == 0) { - RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, read_values)); - } else if (read_values > 0) { - // column with null values - level_t level_type = definitions[0]; - int num_values = 1; - for (int i = 1; i < read_values; ++i) { - if (definitions[i] != level_type) { - if (level_type == 0) { - // null values - _chunk_reader->insert_null_values(data_column, num_values); - } else { - RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, num_values)); - } - level_type = definitions[i]; - num_values = 1; - } else { - num_values++; + // load page data to decode or skip values + RETURN_IF_ERROR(_chunk_reader->load_page_date_idempotent()); + size_t has_read = 0; + for (auto& range : read_ranges) { + // generate the skipped values + size_t skip_values = range.first_row - _current_row_index; + RETURN_IF_ERROR(_skip_values(skip_values)); + _current_row_index += skip_values; + // generate the read values + size_t read_values = + std::min((size_t)(range.last_row - range.first_row), batch_size - has_read); + RETURN_IF_ERROR(_read_values(read_values, doris_column, type)); + has_read += read_values; + _current_row_index += read_values; + if (has_read == batch_size) { + break; } } - if (level_type == 0) { - // null values - _chunk_reader->insert_null_values(data_column, num_values); - } else { - RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, num_values)); - } + *read_rows = has_read; } - *read_rows = read_values; + if (_chunk_reader->remaining_num_values() == 0 && !_chunk_reader->has_next_page()) { *eof = true; } @@ -157,6 +202,13 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr void ScalarColumnReader::close() {} +void ArrayColumnReader::_reserve_def_levels_buf(size_t size) { + if (size > _def_levels_buf_size || _def_levels_buf == nullptr) { + _def_levels_buf_size = BitUtil::next_power_of_two(size); + _def_levels_buf.reset(new level_t[_def_levels_buf_size]); + } +} + Status ArrayColumnReader::init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, std::vector& row_ranges) { _stream_reader = @@ -178,7 +230,7 @@ Status ArrayColumnReader::init(FileReader* file, FieldSchema* field, tparquet::C } } if (_chunk_reader->max_rep_level() != 1) { - return Status::Corruption("Max repetition level in scalar column should be 1"); + return Status::Corruption("Max repetition level in array column should be 1"); } return Status::OK(); } @@ -192,19 +244,17 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& return Status::OK(); } RETURN_IF_ERROR(_chunk_reader->next_page()); - if (_row_ranges.size() != 0) { - _skipped_pages(); - } + // array should load data to analyze row range RETURN_IF_ERROR(_chunk_reader->load_page_data()); _init_rep_levels_buf(); } - // fill NullMap CHECK(doris_column->is_nullable()); auto* nullable_column = reinterpret_cast( (*std::move(doris_column)).mutate().get()); NullMap& map_data = nullable_column->get_null_map_data(); MutableColumnPtr data_column = nullable_column->get_nested_column_ptr(); + // generate array offset size_t real_batch_size = 0; size_t num_values = 0; @@ -214,21 +264,64 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& _reserve_def_levels_buf(num_values); level_t* definitions = _def_levels_buf.get(); _chunk_reader->get_def_levels(definitions, num_values); - // null array, should ignore the last offset in element_offsets - for (int i = 0; i < element_offsets.size() - 1; ++i) { - map_data.emplace_back(definitions[element_offsets[i]] == _NULL_ARRAY); + _def_offset = 0; + + // generate the row ranges that should be read + std::list read_ranges; + _generate_read_ranges(_current_row_index, _current_row_index + real_batch_size, read_ranges); + if (read_ranges.size() == 0) { + // skip the current batch + _current_row_index += real_batch_size; + _skip_values(num_values); + *read_rows = 0; + } else { + size_t has_read = 0; + int offset_index = 0; + for (auto& range : read_ranges) { + // generate the skipped values + size_t skip_rows = range.first_row - _current_row_index; + size_t skip_values = + element_offsets[offset_index + skip_rows] - element_offsets[offset_index]; + RETURN_IF_ERROR(_skip_values(skip_values)); + offset_index += skip_rows; + _current_row_index += skip_rows; + real_batch_size -= skip_rows; + + // generate the read values + size_t scan_rows = range.last_row - range.first_row; + size_t scan_values = + element_offsets[offset_index + scan_rows] - element_offsets[offset_index]; + // null array, should ignore the last offset in element_offsets + for (int i = offset_index; i < offset_index + scan_rows; ++i) { + map_data.emplace_back(definitions[element_offsets[i]] == _NULL_ARRAY); + } + // fill array offset, should skip a value when parsing null array + _fill_array_offset(data_column, element_offsets, offset_index, scan_rows); + // fill nested array elements + if (LIKELY(scan_values > 0)) { + RETURN_IF_ERROR(_load_nested_column( + static_cast(*data_column).get_data_ptr(), + const_cast((reinterpret_cast( + remove_nullable(type).get())) + ->get_nested_type()), + scan_values)); + } + offset_index += scan_rows; + has_read += scan_rows; + _current_row_index += scan_rows; + } + if (offset_index != element_offsets.size() - 1) { + size_t skip_rows = element_offsets.size() - 1 - offset_index; + size_t skip_values = element_offsets.back() - element_offsets[offset_index]; + RETURN_IF_ERROR(_skip_values(skip_values)); + offset_index += skip_rows; + _current_row_index += skip_rows; + real_batch_size -= skip_rows; + } + DCHECK_EQ(real_batch_size, has_read); + *read_rows = has_read; } - // fill array offset, should skip a value when parsing null array - _fill_array_offset(data_column, element_offsets); - // fill nested array elements - if (num_values > 0) { - _load_nested_column(static_cast(*data_column).get_data_ptr(), - const_cast((reinterpret_cast( - remove_nullable(type).get())) - ->get_nested_type()), - num_values); - } - *read_rows = real_batch_size; + if (_chunk_reader->remaining_num_values() == 0 && !_chunk_reader->has_next_page()) { *eof = true; } @@ -244,7 +337,7 @@ Status ArrayColumnReader::_load_nested_column(ColumnPtr& doris_column, DataTypeP (*std::move(doris_column)).mutate().get()); NullMap& map_data = nullable_column->get_null_map_data(); MutableColumnPtr data_column = nullable_column->get_nested_column_ptr(); - for (int i = 0; i < read_values; ++i) { + for (int i = _def_offset; i < _def_offset + read_values; ++i) { // should skip _EMPTY_ARRAY and _NULL_ARRAY if (definitions[i] == _CONCRETE_ELEMENT) { map_data.emplace_back(false); @@ -253,7 +346,7 @@ Status ArrayColumnReader::_load_nested_column(ColumnPtr& doris_column, DataTypeP } } // column with null values - int start_idx = 0; + int start_idx = _def_offset; while (start_idx < read_values) { if (definitions[start_idx] == _CONCRETE_ELEMENT || definitions[start_idx] == _NULL_ELEMENT) { @@ -264,13 +357,13 @@ Status ArrayColumnReader::_load_nested_column(ColumnPtr& doris_column, DataTypeP start_idx++; } } - if (start_idx == read_values) { + if (start_idx == _def_offset + read_values) { // all values are empty array or null array return Status::OK(); } bool prev_is_null = definitions[start_idx] == _NULL_ELEMENT; int num_values = 1; - for (int i = start_idx + 1; i < read_values; ++i) { + for (int i = start_idx + 1; i < _def_offset + read_values; ++i) { if (definitions[i] == _EMPTY_ARRAY || definitions[i] == _NULL_ARRAY) { // only decrease _remaining_num_values _chunk_reader->skip_values(1, false); @@ -296,15 +389,16 @@ Status ArrayColumnReader::_load_nested_column(ColumnPtr& doris_column, DataTypeP } else { RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, num_values)); } + _def_offset += read_values; return Status::OK(); } void ArrayColumnReader::close() {} void ArrayColumnReader::_init_rep_levels_buf() { - size_t max_buf_size = _chunk_reader->remaining_num_values() < 1024 + size_t max_buf_size = _chunk_reader->remaining_num_values() < 4096 ? _chunk_reader->remaining_num_values() - : 1024; + : 4096; if (_rep_levels_buf_size < max_buf_size || _rep_levels_buf == nullptr) { _rep_levels_buf.reset(new level_t[max_buf_size]); _rep_levels_buf_size = max_buf_size; @@ -314,11 +408,10 @@ void ArrayColumnReader::_init_rep_levels_buf() { } void ArrayColumnReader::_load_rep_levels() { - _rep_size = _chunk_reader->remaining_num_values() < _rep_levels_buf_size - ? _chunk_reader->remaining_num_values() - : _rep_levels_buf_size; - _chunk_reader->get_rep_levels(_rep_levels_buf.get(), _rep_size); _start_offset += _rep_size; + _rep_size = _remaining_rep_levels < _rep_levels_buf_size ? _remaining_rep_levels + : _rep_levels_buf_size; + _chunk_reader->get_rep_levels(_rep_levels_buf.get(), _rep_size); _rep_offset = 0; } @@ -367,20 +460,45 @@ Status ArrayColumnReader::_generate_array_offset(std::vector& element_of } void ArrayColumnReader::_fill_array_offset(MutableColumnPtr& doris_column, - std::vector& element_offsets) { - auto& offsets_data = static_cast(*doris_column).get_offsets(); - level_t* definitions = _def_levels_buf.get(); - auto prev_offset = offsets_data.back(); - size_t null_empty_array = 0; - for (int i = 0; i < element_offsets.size(); ++i) { - auto& offset = element_offsets[i]; - if (i != 0) { - offsets_data.emplace_back(prev_offset + offset - null_empty_array); - } - if ((i != element_offsets.size() - 1) && - (definitions[offset] == _EMPTY_ARRAY || definitions[offset] == _NULL_ARRAY)) { - null_empty_array++; + std::vector& element_offsets, int offset_index, + size_t num_rows) { + if (LIKELY(num_rows > 0)) { + auto& offsets_data = static_cast(*doris_column).get_offsets(); + level_t* definitions = _def_levels_buf.get(); + auto prev_offset = offsets_data.back(); + auto base_offset = element_offsets[offset_index]; + size_t null_empty_array = 0; + for (int i = offset_index; i < offset_index + num_rows + 1; ++i) { + auto& offset = element_offsets[i]; + if (i != offset_index) { + offsets_data.emplace_back(prev_offset + offset - base_offset - null_empty_array); + } + if ((i != offset_index + num_rows) && + (definitions[offset] == _EMPTY_ARRAY || definitions[offset] == _NULL_ARRAY)) { + null_empty_array++; + } } } } + +Status ArrayColumnReader::_skip_values(size_t num_values) { + if (LIKELY(num_values > 0)) { + level_t* definitions = _def_levels_buf.get(); + bool prev_is_null = definitions[_def_offset] != _CONCRETE_ELEMENT; + int cnt = 1; + for (int i = _def_offset + 1; i < _def_offset + num_values; ++i) { + bool curr_is_null = definitions[i] != _CONCRETE_ELEMENT; + if (prev_is_null ^ curr_is_null) { + RETURN_IF_ERROR(_chunk_reader->skip_values(cnt, !prev_is_null)); + prev_is_null = curr_is_null; + cnt = 1; + } else { + cnt++; + } + } + RETURN_IF_ERROR(_chunk_reader->skip_values(cnt, !prev_is_null)); + _def_offset += num_values; + } + return Status::OK(); +} }; // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 316410c34e..fd0ce845f4 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -67,8 +67,8 @@ public: virtual void close() = 0; protected: - void _skipped_pages(); - void _reserve_def_levels_buf(size_t size); + void _generate_read_ranges(int64_t start_index, int64_t end_index, + std::list& read_ranges); const ParquetReadColumn& _column; BufferedFileStreamReader* _stream_reader; @@ -76,9 +76,9 @@ protected: std::vector _row_ranges; cctz::time_zone* _ctz; std::unique_ptr _chunk_reader; - std::unique_ptr _def_levels_buf = nullptr; - size_t _def_levels_buf_size = 0; tparquet::OffsetIndex* _offset_index; + int64_t _current_row_index = 0; + int _row_range_index = 0; }; class ScalarColumnReader : public ParquetColumnReader { @@ -90,6 +90,8 @@ public: std::vector& row_ranges); Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size, size_t* read_rows, bool* eof) override; + Status _skip_values(size_t num_values); + Status _read_values(size_t num_values, ColumnPtr& doris_column, DataTypePtr& type); void close() override; }; @@ -105,12 +107,19 @@ public: void close() override; private: + void _reserve_def_levels_buf(size_t size); void _init_rep_levels_buf(); void _load_rep_levels(); Status _load_nested_column(ColumnPtr& doris_column, DataTypePtr& type, size_t read_values); Status _generate_array_offset(std::vector& element_offsets, size_t pre_batch_size, size_t* real_batch_size, size_t* num_values); - void _fill_array_offset(MutableColumnPtr& doris_column, std::vector& element_offsets); + void _fill_array_offset(MutableColumnPtr& doris_column, std::vector& element_offsets, + int offset_index, size_t num_rows); + Status _skip_values(size_t num_values); + + std::unique_ptr _def_levels_buf = nullptr; + size_t _def_levels_buf_size = 0; + size_t _def_offset = 0; std::unique_ptr _rep_levels_buf = nullptr; size_t _rep_levels_buf_size = 0; diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp index d2a229d7d8..80c6ff228b 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp @@ -29,11 +29,14 @@ PageReader::PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t l : _reader(reader), _start_offset(offset), _end_offset(offset + length) {} Status PageReader::next_page_header() { - if (_offset < _start_offset || _offset >= _end_offset) { + if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) { return Status::IOError("Out-of-bounds Access"); } - if (_offset != _next_header_offset) { - return Status::IOError("Wrong header position"); + if (UNLIKELY(_offset != _next_header_offset)) { + return Status::IOError("Wrong header position, should seek to a page header first"); + } + if (UNLIKELY(_state != INITIALIZED)) { + return Status::IOError("Should skip or load current page to get next page"); } const uint8_t* page_header_buf = nullptr; @@ -58,25 +61,28 @@ Status PageReader::next_page_header() { _offset += real_header_size; _next_header_offset = _offset + _cur_page_header.compressed_page_size; + _state = HEADER_PARSED; return Status::OK(); } Status PageReader::skip_page() { - if (_offset == _next_header_offset) { - return Status::InternalError("Should call next_page() to generate page header"); + if (UNLIKELY(_state != HEADER_PARSED)) { + return Status::IOError("Should generate page header first to skip current page"); } _offset = _next_header_offset; + _state = INITIALIZED; return Status::OK(); } Status PageReader::get_page_data(Slice& slice) { - if (_offset == _next_header_offset) { - return Status::InternalError("Should call next_page() to generate page header"); + if (UNLIKELY(_state != HEADER_PARSED)) { + return Status::IOError("Should generate page header first to load current page data"); } slice.size = _cur_page_header.compressed_page_size; RETURN_IF_ERROR(_reader->read_bytes(slice, _offset)); DCHECK_EQ(slice.size, _cur_page_header.compressed_page_size); _offset += slice.size; + _state = INITIALIZED; return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h b/be/src/vec/exec/format/parquet/vparquet_page_reader.h index 1795cac52d..0d83c81650 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h @@ -27,7 +27,6 @@ namespace doris::vectorized { * Use to deserialize parquet page header, and get the page data in iterator interface. */ class PageReader { -public: public: PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t length); ~PageReader() = default; @@ -45,11 +44,15 @@ public: void seek_to_page(int64_t page_header_offset) { _offset = page_header_offset; _next_header_offset = page_header_offset; + _state = INITIALIZED; } private: + enum PageReaderState { INITIALIZED, HEADER_PARSED }; + BufferedStreamReader* _reader; tparquet::PageHeader _cur_page_header; + PageReaderState _state = INITIALIZED; uint64_t _offset = 0; uint64_t _next_header_offset = 0; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index b0a0ea2d2b..3c2e80dd86 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -47,6 +47,8 @@ class RowGroupReader; class PageIndex; struct RowRange { + RowRange() {} + RowRange(int64_t first, int64_t last) : first_row(first), last_row(last) {} int64_t first_row; int64_t last_row; };