diff --git a/be/src/vec/exec/format/parquet/level_decoder.h b/be/src/vec/exec/format/parquet/level_decoder.h index be6c6c6154..da9aefeb4e 100644 --- a/be/src/vec/exec/format/parquet/level_decoder.h +++ b/be/src/vec/exec/format/parquet/level_decoder.h @@ -35,18 +35,12 @@ public: Status init(Slice* slice, tparquet::Encoding::type encoding, level_t max_level, uint32_t num_levels); - bool has_levels() const { return _num_levels > 0; } + inline bool has_levels() const { return _num_levels > 0; } size_t get_levels(level_t* levels, size_t n); - size_t next_repeated_count() { - DCHECK_EQ(_encoding, tparquet::Encoding::RLE); - return _rle_decoder.repeated_count(); - } - - level_t get_repeated_value(size_t count) { - DCHECK_EQ(_encoding, tparquet::Encoding::RLE); - return _rle_decoder.get_repeated_value(count); + inline size_t get_next_run(level_t* val, size_t max_run) { + return _rle_decoder.GetNextRun(val, max_run); } private: diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index ed85e93f5c..0e7d1cd4ec 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -46,10 +46,17 @@ Status ColumnChunkReader::init() { // seek to the first data page _page_reader->seek_to_page(_metadata.data_page_offset); } + _state = INITIALIZED; return Status::OK(); } Status ColumnChunkReader::next_page() { + if (UNLIKELY(_state == NOT_INIT)) { + return Status::Corruption("Should initialize chunk reader"); + } + if (UNLIKELY(_remaining_num_values != 0)) { + return Status::Corruption("Should skip current page"); + } RETURN_IF_ERROR(_page_reader->next_page_header()); if (_page_reader->get_page_header()->type == tparquet::PageType::DICTIONARY_PAGE) { // the first page maybe directory page even if _metadata.__isset.dictionary_page_offset == false, @@ -59,11 +66,15 @@ Status ColumnChunkReader::next_page() { return next_page(); } else { _remaining_num_values = _page_reader->get_page_header()->data_page_header.num_values; + _state = HEADER_PARSED; return Status::OK(); } } Status ColumnChunkReader::load_page_data() { + if (UNLIKELY(_state != HEADER_PARSED)) { + return Status::Corruption("Should parse page header"); + } const auto& header = *_page_reader->get_page_header(); // int32_t compressed_size = header.compressed_page_size; int32_t uncompressed_size = header.uncompressed_page_size; @@ -113,6 +124,7 @@ Status ColumnChunkReader::load_page_data() { // Reset page data for each page _page_decoder->set_data(&_page_data); + _state = DATA_LOADED; return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index 2c6b620cce..44f5b56ff2 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -69,14 +69,25 @@ public: // Seek to the specific page, page_header_offset must be the start offset of the page header. void seek_to_page(int64_t page_header_offset) { + _remaining_num_values = 0; _page_reader->seek_to_page(page_header_offset); + _state = INITIALIZED; } // Seek to next page. Only read and parse the page header. Status next_page(); // Skip current page(will not read and parse) if the page is filtered by predicates. - Status skip_page() { return _page_reader->skip_page(); } + Status skip_page() { + _remaining_num_values = 0; + if (_state == HEADER_PARSED) { + return _page_reader->skip_page(); + } + if (_state != DATA_LOADED) { + return Status::Corruption("Should parse page header to skip page"); + } + return Status::OK(); + } // Skip some values(will not read and parse) in current page if the values are filtered by predicates. // when skip_data = false, the underlying decoder will not skip data, // only used when maintaining the consistency of _remaining_num_values. @@ -85,6 +96,12 @@ public: // Load page data into the underlying container, // and initialize the repetition and definition level decoder for current page data. Status load_page_data(); + Status load_page_date_idempotent() { + if (_state == DATA_LOADED) { + return Status::OK(); + } + return load_page_data(); + } // The remaining number of values in current page(including null values). Decreased when reading or skipping. uint32_t remaining_num_values() const { return _remaining_num_values; }; // null values are generated from definition levels @@ -115,10 +132,13 @@ public: Decoder* get_page_decoder() { return _page_decoder; } private: + enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, DATA_LOADED }; + Status _decode_dict_page(); void _reserve_decompress_buf(size_t size); int32_t _get_type_length(); + ColumnChunkReaderState _state = NOT_INIT; FieldSchema* _field_schema; level_t _max_rep_level; level_t _max_def_level; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index 273088dbbe..da433b7cba 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -60,15 +60,29 @@ void ParquetColumnReader::init_column_metadata(const tparquet::ColumnChunk& chun _metadata.reset(new ParquetColumnMetadata(chunk_start, chunk_len, chunk_meta)); } -void ParquetColumnReader::_reserve_def_levels_buf(size_t size) { - if (size > _def_levels_buf_size || _def_levels_buf == nullptr) { - _def_levels_buf_size = BitUtil::next_power_of_two(size); - _def_levels_buf.reset(new level_t[_def_levels_buf_size]); +void ParquetColumnReader::_generate_read_ranges(int64_t start_index, int64_t end_index, + std::list& read_ranges) { + if (_row_ranges.size() == 0) { + read_ranges.emplace_back(start_index, end_index); + } + int index = _row_range_index; + while (index < _row_ranges.size()) { + RowRange& read_range = _row_ranges[index]; + if (read_range.last_row <= start_index) { + index++; + _row_range_index++; + continue; + } + if (read_range.first_row >= end_index) { + break; + } + int64_t start = read_range.first_row < start_index ? start_index : read_range.first_row; + int64_t end = read_range.last_row < end_index ? read_range.last_row : end_index; + read_ranges.emplace_back(start, end); + index++; } } -void ParquetColumnReader::_skipped_pages() {} - Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, std::vector& row_ranges) { _stream_reader = @@ -85,6 +99,59 @@ Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, tparquet:: return Status::OK(); } +Status ScalarColumnReader::_skip_values(size_t num_values) { + if (_chunk_reader->max_def_level() > 0) { + LevelDecoder& def_decoder = _chunk_reader->def_level_decoder(); + size_t skipped = 0; + while (skipped < num_values) { + level_t def_level; + size_t loop_skip = def_decoder.get_next_run(&def_level, num_values - skipped); + RETURN_IF_ERROR(_chunk_reader->skip_values(loop_skip, def_level == 1)); + skipped += loop_skip; + } + } else { + RETURN_IF_ERROR(_chunk_reader->skip_values(num_values)); + } + return Status::OK(); +} + +Status ScalarColumnReader::_read_values(size_t num_values, ColumnPtr& doris_column, + DataTypePtr& type) { + CHECK(doris_column->is_nullable()); + auto* nullable_column = reinterpret_cast( + (*std::move(doris_column)).mutate().get()); + MutableColumnPtr data_column = nullable_column->get_nested_column_ptr(); + NullMap& map_data = nullable_column->get_null_map_data(); + + if (_chunk_reader->max_def_level() > 0) { + LevelDecoder& def_decoder = _chunk_reader->def_level_decoder(); + size_t has_read = 0; + while (has_read < num_values) { + level_t def_level; + size_t loop_read = def_decoder.get_next_run(&def_level, num_values - has_read); + bool is_null = def_level == 0; + // fill NullMap + for (int i = 0; i < loop_read; ++i) { + map_data.emplace_back(is_null); + } + // decode data + if (is_null) { + // null values + _chunk_reader->insert_null_values(data_column, loop_read); + } else { + RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, loop_read)); + } + has_read += loop_read; + } + } else { + for (int i = 0; i < num_values; ++i) { + map_data.emplace_back(false); + } + RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, num_values)); + } + return Status::OK(); +} + Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size, size_t* read_rows, bool* eof) { if (_chunk_reader->remaining_num_values() == 0) { @@ -94,61 +161,39 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr return Status::OK(); } RETURN_IF_ERROR(_chunk_reader->next_page()); - if (_row_ranges.size() != 0) { - _skipped_pages(); - } - RETURN_IF_ERROR(_chunk_reader->load_page_data()); } - size_t read_values = _chunk_reader->remaining_num_values() < batch_size - ? _chunk_reader->remaining_num_values() - : batch_size; - // get definition levels, and generate null values - _reserve_def_levels_buf(read_values); - level_t* definitions = _def_levels_buf.get(); - if (_chunk_reader->max_def_level() == 0) { // required field - std::fill(definitions, definitions + read_values, 1); + + // generate the row ranges that should be read + std::list read_ranges; + _generate_read_ranges(_current_row_index, + _current_row_index + _chunk_reader->remaining_num_values(), read_ranges); + if (read_ranges.size() == 0) { + // skip the whole page + _current_row_index += _chunk_reader->remaining_num_values(); + RETURN_IF_ERROR(_chunk_reader->skip_page()); + *read_rows = 0; } else { - _chunk_reader->get_def_levels(definitions, read_values); - } - // fill NullMap - // TODO cmy why must nullable? - CHECK(doris_column->is_nullable()); - auto* nullable_column = reinterpret_cast( - (*std::move(doris_column)).mutate().get()); - MutableColumnPtr data_column = nullable_column->get_nested_column_ptr(); - NullMap& map_data = nullable_column->get_null_map_data(); - for (int i = 0; i < read_values; ++i) { - map_data.emplace_back(definitions[i] == 0); - } - // decode data - if (_chunk_reader->max_def_level() == 0) { - RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, read_values)); - } else if (read_values > 0) { - // column with null values - level_t level_type = definitions[0]; - int num_values = 1; - for (int i = 1; i < read_values; ++i) { - if (definitions[i] != level_type) { - if (level_type == 0) { - // null values - _chunk_reader->insert_null_values(data_column, num_values); - } else { - RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, num_values)); - } - level_type = definitions[i]; - num_values = 1; - } else { - num_values++; + // load page data to decode or skip values + RETURN_IF_ERROR(_chunk_reader->load_page_date_idempotent()); + size_t has_read = 0; + for (auto& range : read_ranges) { + // generate the skipped values + size_t skip_values = range.first_row - _current_row_index; + RETURN_IF_ERROR(_skip_values(skip_values)); + _current_row_index += skip_values; + // generate the read values + size_t read_values = + std::min((size_t)(range.last_row - range.first_row), batch_size - has_read); + RETURN_IF_ERROR(_read_values(read_values, doris_column, type)); + has_read += read_values; + _current_row_index += read_values; + if (has_read == batch_size) { + break; } } - if (level_type == 0) { - // null values - _chunk_reader->insert_null_values(data_column, num_values); - } else { - RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, num_values)); - } + *read_rows = has_read; } - *read_rows = read_values; + if (_chunk_reader->remaining_num_values() == 0 && !_chunk_reader->has_next_page()) { *eof = true; } @@ -157,6 +202,13 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr void ScalarColumnReader::close() {} +void ArrayColumnReader::_reserve_def_levels_buf(size_t size) { + if (size > _def_levels_buf_size || _def_levels_buf == nullptr) { + _def_levels_buf_size = BitUtil::next_power_of_two(size); + _def_levels_buf.reset(new level_t[_def_levels_buf_size]); + } +} + Status ArrayColumnReader::init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk, std::vector& row_ranges) { _stream_reader = @@ -178,7 +230,7 @@ Status ArrayColumnReader::init(FileReader* file, FieldSchema* field, tparquet::C } } if (_chunk_reader->max_rep_level() != 1) { - return Status::Corruption("Max repetition level in scalar column should be 1"); + return Status::Corruption("Max repetition level in array column should be 1"); } return Status::OK(); } @@ -192,19 +244,17 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& return Status::OK(); } RETURN_IF_ERROR(_chunk_reader->next_page()); - if (_row_ranges.size() != 0) { - _skipped_pages(); - } + // array should load data to analyze row range RETURN_IF_ERROR(_chunk_reader->load_page_data()); _init_rep_levels_buf(); } - // fill NullMap CHECK(doris_column->is_nullable()); auto* nullable_column = reinterpret_cast( (*std::move(doris_column)).mutate().get()); NullMap& map_data = nullable_column->get_null_map_data(); MutableColumnPtr data_column = nullable_column->get_nested_column_ptr(); + // generate array offset size_t real_batch_size = 0; size_t num_values = 0; @@ -214,21 +264,64 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& _reserve_def_levels_buf(num_values); level_t* definitions = _def_levels_buf.get(); _chunk_reader->get_def_levels(definitions, num_values); - // null array, should ignore the last offset in element_offsets - for (int i = 0; i < element_offsets.size() - 1; ++i) { - map_data.emplace_back(definitions[element_offsets[i]] == _NULL_ARRAY); + _def_offset = 0; + + // generate the row ranges that should be read + std::list read_ranges; + _generate_read_ranges(_current_row_index, _current_row_index + real_batch_size, read_ranges); + if (read_ranges.size() == 0) { + // skip the current batch + _current_row_index += real_batch_size; + _skip_values(num_values); + *read_rows = 0; + } else { + size_t has_read = 0; + int offset_index = 0; + for (auto& range : read_ranges) { + // generate the skipped values + size_t skip_rows = range.first_row - _current_row_index; + size_t skip_values = + element_offsets[offset_index + skip_rows] - element_offsets[offset_index]; + RETURN_IF_ERROR(_skip_values(skip_values)); + offset_index += skip_rows; + _current_row_index += skip_rows; + real_batch_size -= skip_rows; + + // generate the read values + size_t scan_rows = range.last_row - range.first_row; + size_t scan_values = + element_offsets[offset_index + scan_rows] - element_offsets[offset_index]; + // null array, should ignore the last offset in element_offsets + for (int i = offset_index; i < offset_index + scan_rows; ++i) { + map_data.emplace_back(definitions[element_offsets[i]] == _NULL_ARRAY); + } + // fill array offset, should skip a value when parsing null array + _fill_array_offset(data_column, element_offsets, offset_index, scan_rows); + // fill nested array elements + if (LIKELY(scan_values > 0)) { + RETURN_IF_ERROR(_load_nested_column( + static_cast(*data_column).get_data_ptr(), + const_cast((reinterpret_cast( + remove_nullable(type).get())) + ->get_nested_type()), + scan_values)); + } + offset_index += scan_rows; + has_read += scan_rows; + _current_row_index += scan_rows; + } + if (offset_index != element_offsets.size() - 1) { + size_t skip_rows = element_offsets.size() - 1 - offset_index; + size_t skip_values = element_offsets.back() - element_offsets[offset_index]; + RETURN_IF_ERROR(_skip_values(skip_values)); + offset_index += skip_rows; + _current_row_index += skip_rows; + real_batch_size -= skip_rows; + } + DCHECK_EQ(real_batch_size, has_read); + *read_rows = has_read; } - // fill array offset, should skip a value when parsing null array - _fill_array_offset(data_column, element_offsets); - // fill nested array elements - if (num_values > 0) { - _load_nested_column(static_cast(*data_column).get_data_ptr(), - const_cast((reinterpret_cast( - remove_nullable(type).get())) - ->get_nested_type()), - num_values); - } - *read_rows = real_batch_size; + if (_chunk_reader->remaining_num_values() == 0 && !_chunk_reader->has_next_page()) { *eof = true; } @@ -244,7 +337,7 @@ Status ArrayColumnReader::_load_nested_column(ColumnPtr& doris_column, DataTypeP (*std::move(doris_column)).mutate().get()); NullMap& map_data = nullable_column->get_null_map_data(); MutableColumnPtr data_column = nullable_column->get_nested_column_ptr(); - for (int i = 0; i < read_values; ++i) { + for (int i = _def_offset; i < _def_offset + read_values; ++i) { // should skip _EMPTY_ARRAY and _NULL_ARRAY if (definitions[i] == _CONCRETE_ELEMENT) { map_data.emplace_back(false); @@ -253,7 +346,7 @@ Status ArrayColumnReader::_load_nested_column(ColumnPtr& doris_column, DataTypeP } } // column with null values - int start_idx = 0; + int start_idx = _def_offset; while (start_idx < read_values) { if (definitions[start_idx] == _CONCRETE_ELEMENT || definitions[start_idx] == _NULL_ELEMENT) { @@ -264,13 +357,13 @@ Status ArrayColumnReader::_load_nested_column(ColumnPtr& doris_column, DataTypeP start_idx++; } } - if (start_idx == read_values) { + if (start_idx == _def_offset + read_values) { // all values are empty array or null array return Status::OK(); } bool prev_is_null = definitions[start_idx] == _NULL_ELEMENT; int num_values = 1; - for (int i = start_idx + 1; i < read_values; ++i) { + for (int i = start_idx + 1; i < _def_offset + read_values; ++i) { if (definitions[i] == _EMPTY_ARRAY || definitions[i] == _NULL_ARRAY) { // only decrease _remaining_num_values _chunk_reader->skip_values(1, false); @@ -296,15 +389,16 @@ Status ArrayColumnReader::_load_nested_column(ColumnPtr& doris_column, DataTypeP } else { RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, num_values)); } + _def_offset += read_values; return Status::OK(); } void ArrayColumnReader::close() {} void ArrayColumnReader::_init_rep_levels_buf() { - size_t max_buf_size = _chunk_reader->remaining_num_values() < 1024 + size_t max_buf_size = _chunk_reader->remaining_num_values() < 4096 ? _chunk_reader->remaining_num_values() - : 1024; + : 4096; if (_rep_levels_buf_size < max_buf_size || _rep_levels_buf == nullptr) { _rep_levels_buf.reset(new level_t[max_buf_size]); _rep_levels_buf_size = max_buf_size; @@ -314,11 +408,10 @@ void ArrayColumnReader::_init_rep_levels_buf() { } void ArrayColumnReader::_load_rep_levels() { - _rep_size = _chunk_reader->remaining_num_values() < _rep_levels_buf_size - ? _chunk_reader->remaining_num_values() - : _rep_levels_buf_size; - _chunk_reader->get_rep_levels(_rep_levels_buf.get(), _rep_size); _start_offset += _rep_size; + _rep_size = _remaining_rep_levels < _rep_levels_buf_size ? _remaining_rep_levels + : _rep_levels_buf_size; + _chunk_reader->get_rep_levels(_rep_levels_buf.get(), _rep_size); _rep_offset = 0; } @@ -367,20 +460,45 @@ Status ArrayColumnReader::_generate_array_offset(std::vector& element_of } void ArrayColumnReader::_fill_array_offset(MutableColumnPtr& doris_column, - std::vector& element_offsets) { - auto& offsets_data = static_cast(*doris_column).get_offsets(); - level_t* definitions = _def_levels_buf.get(); - auto prev_offset = offsets_data.back(); - size_t null_empty_array = 0; - for (int i = 0; i < element_offsets.size(); ++i) { - auto& offset = element_offsets[i]; - if (i != 0) { - offsets_data.emplace_back(prev_offset + offset - null_empty_array); - } - if ((i != element_offsets.size() - 1) && - (definitions[offset] == _EMPTY_ARRAY || definitions[offset] == _NULL_ARRAY)) { - null_empty_array++; + std::vector& element_offsets, int offset_index, + size_t num_rows) { + if (LIKELY(num_rows > 0)) { + auto& offsets_data = static_cast(*doris_column).get_offsets(); + level_t* definitions = _def_levels_buf.get(); + auto prev_offset = offsets_data.back(); + auto base_offset = element_offsets[offset_index]; + size_t null_empty_array = 0; + for (int i = offset_index; i < offset_index + num_rows + 1; ++i) { + auto& offset = element_offsets[i]; + if (i != offset_index) { + offsets_data.emplace_back(prev_offset + offset - base_offset - null_empty_array); + } + if ((i != offset_index + num_rows) && + (definitions[offset] == _EMPTY_ARRAY || definitions[offset] == _NULL_ARRAY)) { + null_empty_array++; + } } } } + +Status ArrayColumnReader::_skip_values(size_t num_values) { + if (LIKELY(num_values > 0)) { + level_t* definitions = _def_levels_buf.get(); + bool prev_is_null = definitions[_def_offset] != _CONCRETE_ELEMENT; + int cnt = 1; + for (int i = _def_offset + 1; i < _def_offset + num_values; ++i) { + bool curr_is_null = definitions[i] != _CONCRETE_ELEMENT; + if (prev_is_null ^ curr_is_null) { + RETURN_IF_ERROR(_chunk_reader->skip_values(cnt, !prev_is_null)); + prev_is_null = curr_is_null; + cnt = 1; + } else { + cnt++; + } + } + RETURN_IF_ERROR(_chunk_reader->skip_values(cnt, !prev_is_null)); + _def_offset += num_values; + } + return Status::OK(); +} }; // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h index 316410c34e..fd0ce845f4 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h @@ -67,8 +67,8 @@ public: virtual void close() = 0; protected: - void _skipped_pages(); - void _reserve_def_levels_buf(size_t size); + void _generate_read_ranges(int64_t start_index, int64_t end_index, + std::list& read_ranges); const ParquetReadColumn& _column; BufferedFileStreamReader* _stream_reader; @@ -76,9 +76,9 @@ protected: std::vector _row_ranges; cctz::time_zone* _ctz; std::unique_ptr _chunk_reader; - std::unique_ptr _def_levels_buf = nullptr; - size_t _def_levels_buf_size = 0; tparquet::OffsetIndex* _offset_index; + int64_t _current_row_index = 0; + int _row_range_index = 0; }; class ScalarColumnReader : public ParquetColumnReader { @@ -90,6 +90,8 @@ public: std::vector& row_ranges); Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size, size_t* read_rows, bool* eof) override; + Status _skip_values(size_t num_values); + Status _read_values(size_t num_values, ColumnPtr& doris_column, DataTypePtr& type); void close() override; }; @@ -105,12 +107,19 @@ public: void close() override; private: + void _reserve_def_levels_buf(size_t size); void _init_rep_levels_buf(); void _load_rep_levels(); Status _load_nested_column(ColumnPtr& doris_column, DataTypePtr& type, size_t read_values); Status _generate_array_offset(std::vector& element_offsets, size_t pre_batch_size, size_t* real_batch_size, size_t* num_values); - void _fill_array_offset(MutableColumnPtr& doris_column, std::vector& element_offsets); + void _fill_array_offset(MutableColumnPtr& doris_column, std::vector& element_offsets, + int offset_index, size_t num_rows); + Status _skip_values(size_t num_values); + + std::unique_ptr _def_levels_buf = nullptr; + size_t _def_levels_buf_size = 0; + size_t _def_offset = 0; std::unique_ptr _rep_levels_buf = nullptr; size_t _rep_levels_buf_size = 0; diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp index d2a229d7d8..80c6ff228b 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp @@ -29,11 +29,14 @@ PageReader::PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t l : _reader(reader), _start_offset(offset), _end_offset(offset + length) {} Status PageReader::next_page_header() { - if (_offset < _start_offset || _offset >= _end_offset) { + if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) { return Status::IOError("Out-of-bounds Access"); } - if (_offset != _next_header_offset) { - return Status::IOError("Wrong header position"); + if (UNLIKELY(_offset != _next_header_offset)) { + return Status::IOError("Wrong header position, should seek to a page header first"); + } + if (UNLIKELY(_state != INITIALIZED)) { + return Status::IOError("Should skip or load current page to get next page"); } const uint8_t* page_header_buf = nullptr; @@ -58,25 +61,28 @@ Status PageReader::next_page_header() { _offset += real_header_size; _next_header_offset = _offset + _cur_page_header.compressed_page_size; + _state = HEADER_PARSED; return Status::OK(); } Status PageReader::skip_page() { - if (_offset == _next_header_offset) { - return Status::InternalError("Should call next_page() to generate page header"); + if (UNLIKELY(_state != HEADER_PARSED)) { + return Status::IOError("Should generate page header first to skip current page"); } _offset = _next_header_offset; + _state = INITIALIZED; return Status::OK(); } Status PageReader::get_page_data(Slice& slice) { - if (_offset == _next_header_offset) { - return Status::InternalError("Should call next_page() to generate page header"); + if (UNLIKELY(_state != HEADER_PARSED)) { + return Status::IOError("Should generate page header first to load current page data"); } slice.size = _cur_page_header.compressed_page_size; RETURN_IF_ERROR(_reader->read_bytes(slice, _offset)); DCHECK_EQ(slice.size, _cur_page_header.compressed_page_size); _offset += slice.size; + _state = INITIALIZED; return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h b/be/src/vec/exec/format/parquet/vparquet_page_reader.h index 1795cac52d..0d83c81650 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h @@ -27,7 +27,6 @@ namespace doris::vectorized { * Use to deserialize parquet page header, and get the page data in iterator interface. */ class PageReader { -public: public: PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t length); ~PageReader() = default; @@ -45,11 +44,15 @@ public: void seek_to_page(int64_t page_header_offset) { _offset = page_header_offset; _next_header_offset = page_header_offset; + _state = INITIALIZED; } private: + enum PageReaderState { INITIALIZED, HEADER_PARSED }; + BufferedStreamReader* _reader; tparquet::PageHeader _cur_page_header; + PageReaderState _state = INITIALIZED; uint64_t _offset = 0; uint64_t _next_header_offset = 0; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index b0a0ea2d2b..3c2e80dd86 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -47,6 +47,8 @@ class RowGroupReader; class PageIndex; struct RowRange { + RowRange() {} + RowRange(int64_t first, int64_t last) : first_row(first), last_row(last) {} int64_t first_row; int64_t last_row; };