[feature-wip](parquet-reader) filter rows by page index (#12664)

# Proposed changes

[Parquet v1.11+ supports page skipping](https://github.com/apache/parquet-format/blob/master/PageIndex.md), 
which helps the scanner reduce the amount of data scanned, decompressed, decoded, and insertion.
According to the performance FlameGraph, decompression takes up 20% cpu time.
If a page can be filtered as a whole, the page can not be decompressed.

However, the row numbers between pages are not aligned. Columns containing predicates can be filtered by page granularity,
but other columns need to be skipped within pages, so non predicate columns can only save the decoding and insertion time.

Array column needs the repetition level to align with other columns, so the array column can only save the decoding and insertion time.

## Explore
`OffsetIndex` in the column metadata can locate the page position.
Theoretically, a page can be completely skipped, including the time of reading from HDFS.
However, the average size of a page is around 500KB. Skipping a page requires calling the `skip`.
The performance of `skip` is low when it is called frequently,
and may not be better than continuous reading of large blocks of data (such as 4MB).

If multiple consecutive pages are filtered, `skip` reading can be performed according to`OffsetIndex`.
However, for the convenience of programming and readability, the data of all pages are loaded and filtered in turn.
This commit is contained in:
Ashin Gau
2022-09-20 15:55:19 +08:00
committed by GitHub
parent 47797ad7e8
commit b837b2eb95
8 changed files with 285 additions and 121 deletions

View File

@ -35,18 +35,12 @@ public:
Status init(Slice* slice, tparquet::Encoding::type encoding, level_t max_level,
uint32_t num_levels);
bool has_levels() const { return _num_levels > 0; }
inline bool has_levels() const { return _num_levels > 0; }
size_t get_levels(level_t* levels, size_t n);
size_t next_repeated_count() {
DCHECK_EQ(_encoding, tparquet::Encoding::RLE);
return _rle_decoder.repeated_count();
}
level_t get_repeated_value(size_t count) {
DCHECK_EQ(_encoding, tparquet::Encoding::RLE);
return _rle_decoder.get_repeated_value(count);
inline size_t get_next_run(level_t* val, size_t max_run) {
return _rle_decoder.GetNextRun(val, max_run);
}
private:

View File

@ -46,10 +46,17 @@ Status ColumnChunkReader::init() {
// seek to the first data page
_page_reader->seek_to_page(_metadata.data_page_offset);
}
_state = INITIALIZED;
return Status::OK();
}
Status ColumnChunkReader::next_page() {
if (UNLIKELY(_state == NOT_INIT)) {
return Status::Corruption("Should initialize chunk reader");
}
if (UNLIKELY(_remaining_num_values != 0)) {
return Status::Corruption("Should skip current page");
}
RETURN_IF_ERROR(_page_reader->next_page_header());
if (_page_reader->get_page_header()->type == tparquet::PageType::DICTIONARY_PAGE) {
// the first page maybe directory page even if _metadata.__isset.dictionary_page_offset == false,
@ -59,11 +66,15 @@ Status ColumnChunkReader::next_page() {
return next_page();
} else {
_remaining_num_values = _page_reader->get_page_header()->data_page_header.num_values;
_state = HEADER_PARSED;
return Status::OK();
}
}
Status ColumnChunkReader::load_page_data() {
if (UNLIKELY(_state != HEADER_PARSED)) {
return Status::Corruption("Should parse page header");
}
const auto& header = *_page_reader->get_page_header();
// int32_t compressed_size = header.compressed_page_size;
int32_t uncompressed_size = header.uncompressed_page_size;
@ -113,6 +124,7 @@ Status ColumnChunkReader::load_page_data() {
// Reset page data for each page
_page_decoder->set_data(&_page_data);
_state = DATA_LOADED;
return Status::OK();
}

View File

@ -69,14 +69,25 @@ public:
// Seek to the specific page, page_header_offset must be the start offset of the page header.
void seek_to_page(int64_t page_header_offset) {
_remaining_num_values = 0;
_page_reader->seek_to_page(page_header_offset);
_state = INITIALIZED;
}
// Seek to next page. Only read and parse the page header.
Status next_page();
// Skip current page(will not read and parse) if the page is filtered by predicates.
Status skip_page() { return _page_reader->skip_page(); }
Status skip_page() {
_remaining_num_values = 0;
if (_state == HEADER_PARSED) {
return _page_reader->skip_page();
}
if (_state != DATA_LOADED) {
return Status::Corruption("Should parse page header to skip page");
}
return Status::OK();
}
// Skip some values(will not read and parse) in current page if the values are filtered by predicates.
// when skip_data = false, the underlying decoder will not skip data,
// only used when maintaining the consistency of _remaining_num_values.
@ -85,6 +96,12 @@ public:
// Load page data into the underlying container,
// and initialize the repetition and definition level decoder for current page data.
Status load_page_data();
Status load_page_date_idempotent() {
if (_state == DATA_LOADED) {
return Status::OK();
}
return load_page_data();
}
// The remaining number of values in current page(including null values). Decreased when reading or skipping.
uint32_t remaining_num_values() const { return _remaining_num_values; };
// null values are generated from definition levels
@ -115,10 +132,13 @@ public:
Decoder* get_page_decoder() { return _page_decoder; }
private:
enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, DATA_LOADED };
Status _decode_dict_page();
void _reserve_decompress_buf(size_t size);
int32_t _get_type_length();
ColumnChunkReaderState _state = NOT_INIT;
FieldSchema* _field_schema;
level_t _max_rep_level;
level_t _max_def_level;

View File

@ -60,15 +60,29 @@ void ParquetColumnReader::init_column_metadata(const tparquet::ColumnChunk& chun
_metadata.reset(new ParquetColumnMetadata(chunk_start, chunk_len, chunk_meta));
}
void ParquetColumnReader::_reserve_def_levels_buf(size_t size) {
if (size > _def_levels_buf_size || _def_levels_buf == nullptr) {
_def_levels_buf_size = BitUtil::next_power_of_two(size);
_def_levels_buf.reset(new level_t[_def_levels_buf_size]);
void ParquetColumnReader::_generate_read_ranges(int64_t start_index, int64_t end_index,
std::list<RowRange>& read_ranges) {
if (_row_ranges.size() == 0) {
read_ranges.emplace_back(start_index, end_index);
}
int index = _row_range_index;
while (index < _row_ranges.size()) {
RowRange& read_range = _row_ranges[index];
if (read_range.last_row <= start_index) {
index++;
_row_range_index++;
continue;
}
if (read_range.first_row >= end_index) {
break;
}
int64_t start = read_range.first_row < start_index ? start_index : read_range.first_row;
int64_t end = read_range.last_row < end_index ? read_range.last_row : end_index;
read_ranges.emplace_back(start, end);
index++;
}
}
void ParquetColumnReader::_skipped_pages() {}
Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk,
std::vector<RowRange>& row_ranges) {
_stream_reader =
@ -85,6 +99,59 @@ Status ScalarColumnReader::init(FileReader* file, FieldSchema* field, tparquet::
return Status::OK();
}
Status ScalarColumnReader::_skip_values(size_t num_values) {
if (_chunk_reader->max_def_level() > 0) {
LevelDecoder& def_decoder = _chunk_reader->def_level_decoder();
size_t skipped = 0;
while (skipped < num_values) {
level_t def_level;
size_t loop_skip = def_decoder.get_next_run(&def_level, num_values - skipped);
RETURN_IF_ERROR(_chunk_reader->skip_values(loop_skip, def_level == 1));
skipped += loop_skip;
}
} else {
RETURN_IF_ERROR(_chunk_reader->skip_values(num_values));
}
return Status::OK();
}
Status ScalarColumnReader::_read_values(size_t num_values, ColumnPtr& doris_column,
DataTypePtr& type) {
CHECK(doris_column->is_nullable());
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(doris_column)).mutate().get());
MutableColumnPtr data_column = nullable_column->get_nested_column_ptr();
NullMap& map_data = nullable_column->get_null_map_data();
if (_chunk_reader->max_def_level() > 0) {
LevelDecoder& def_decoder = _chunk_reader->def_level_decoder();
size_t has_read = 0;
while (has_read < num_values) {
level_t def_level;
size_t loop_read = def_decoder.get_next_run(&def_level, num_values - has_read);
bool is_null = def_level == 0;
// fill NullMap
for (int i = 0; i < loop_read; ++i) {
map_data.emplace_back(is_null);
}
// decode data
if (is_null) {
// null values
_chunk_reader->insert_null_values(data_column, loop_read);
} else {
RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, loop_read));
}
has_read += loop_read;
}
} else {
for (int i = 0; i < num_values; ++i) {
map_data.emplace_back(false);
}
RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, num_values));
}
return Status::OK();
}
Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
size_t batch_size, size_t* read_rows, bool* eof) {
if (_chunk_reader->remaining_num_values() == 0) {
@ -94,61 +161,39 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
return Status::OK();
}
RETURN_IF_ERROR(_chunk_reader->next_page());
if (_row_ranges.size() != 0) {
_skipped_pages();
}
RETURN_IF_ERROR(_chunk_reader->load_page_data());
}
size_t read_values = _chunk_reader->remaining_num_values() < batch_size
? _chunk_reader->remaining_num_values()
: batch_size;
// get definition levels, and generate null values
_reserve_def_levels_buf(read_values);
level_t* definitions = _def_levels_buf.get();
if (_chunk_reader->max_def_level() == 0) { // required field
std::fill(definitions, definitions + read_values, 1);
// generate the row ranges that should be read
std::list<RowRange> read_ranges;
_generate_read_ranges(_current_row_index,
_current_row_index + _chunk_reader->remaining_num_values(), read_ranges);
if (read_ranges.size() == 0) {
// skip the whole page
_current_row_index += _chunk_reader->remaining_num_values();
RETURN_IF_ERROR(_chunk_reader->skip_page());
*read_rows = 0;
} else {
_chunk_reader->get_def_levels(definitions, read_values);
}
// fill NullMap
// TODO cmy why must nullable?
CHECK(doris_column->is_nullable());
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(doris_column)).mutate().get());
MutableColumnPtr data_column = nullable_column->get_nested_column_ptr();
NullMap& map_data = nullable_column->get_null_map_data();
for (int i = 0; i < read_values; ++i) {
map_data.emplace_back(definitions[i] == 0);
}
// decode data
if (_chunk_reader->max_def_level() == 0) {
RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, read_values));
} else if (read_values > 0) {
// column with null values
level_t level_type = definitions[0];
int num_values = 1;
for (int i = 1; i < read_values; ++i) {
if (definitions[i] != level_type) {
if (level_type == 0) {
// null values
_chunk_reader->insert_null_values(data_column, num_values);
} else {
RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, num_values));
}
level_type = definitions[i];
num_values = 1;
} else {
num_values++;
// load page data to decode or skip values
RETURN_IF_ERROR(_chunk_reader->load_page_date_idempotent());
size_t has_read = 0;
for (auto& range : read_ranges) {
// generate the skipped values
size_t skip_values = range.first_row - _current_row_index;
RETURN_IF_ERROR(_skip_values(skip_values));
_current_row_index += skip_values;
// generate the read values
size_t read_values =
std::min((size_t)(range.last_row - range.first_row), batch_size - has_read);
RETURN_IF_ERROR(_read_values(read_values, doris_column, type));
has_read += read_values;
_current_row_index += read_values;
if (has_read == batch_size) {
break;
}
}
if (level_type == 0) {
// null values
_chunk_reader->insert_null_values(data_column, num_values);
} else {
RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, num_values));
}
*read_rows = has_read;
}
*read_rows = read_values;
if (_chunk_reader->remaining_num_values() == 0 && !_chunk_reader->has_next_page()) {
*eof = true;
}
@ -157,6 +202,13 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
void ScalarColumnReader::close() {}
void ArrayColumnReader::_reserve_def_levels_buf(size_t size) {
if (size > _def_levels_buf_size || _def_levels_buf == nullptr) {
_def_levels_buf_size = BitUtil::next_power_of_two(size);
_def_levels_buf.reset(new level_t[_def_levels_buf_size]);
}
}
Status ArrayColumnReader::init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk,
std::vector<RowRange>& row_ranges) {
_stream_reader =
@ -178,7 +230,7 @@ Status ArrayColumnReader::init(FileReader* file, FieldSchema* field, tparquet::C
}
}
if (_chunk_reader->max_rep_level() != 1) {
return Status::Corruption("Max repetition level in scalar column should be 1");
return Status::Corruption("Max repetition level in array column should be 1");
}
return Status::OK();
}
@ -192,19 +244,17 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr&
return Status::OK();
}
RETURN_IF_ERROR(_chunk_reader->next_page());
if (_row_ranges.size() != 0) {
_skipped_pages();
}
// array should load data to analyze row range
RETURN_IF_ERROR(_chunk_reader->load_page_data());
_init_rep_levels_buf();
}
// fill NullMap
CHECK(doris_column->is_nullable());
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(doris_column)).mutate().get());
NullMap& map_data = nullable_column->get_null_map_data();
MutableColumnPtr data_column = nullable_column->get_nested_column_ptr();
// generate array offset
size_t real_batch_size = 0;
size_t num_values = 0;
@ -214,21 +264,64 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr&
_reserve_def_levels_buf(num_values);
level_t* definitions = _def_levels_buf.get();
_chunk_reader->get_def_levels(definitions, num_values);
// null array, should ignore the last offset in element_offsets
for (int i = 0; i < element_offsets.size() - 1; ++i) {
map_data.emplace_back(definitions[element_offsets[i]] == _NULL_ARRAY);
_def_offset = 0;
// generate the row ranges that should be read
std::list<RowRange> read_ranges;
_generate_read_ranges(_current_row_index, _current_row_index + real_batch_size, read_ranges);
if (read_ranges.size() == 0) {
// skip the current batch
_current_row_index += real_batch_size;
_skip_values(num_values);
*read_rows = 0;
} else {
size_t has_read = 0;
int offset_index = 0;
for (auto& range : read_ranges) {
// generate the skipped values
size_t skip_rows = range.first_row - _current_row_index;
size_t skip_values =
element_offsets[offset_index + skip_rows] - element_offsets[offset_index];
RETURN_IF_ERROR(_skip_values(skip_values));
offset_index += skip_rows;
_current_row_index += skip_rows;
real_batch_size -= skip_rows;
// generate the read values
size_t scan_rows = range.last_row - range.first_row;
size_t scan_values =
element_offsets[offset_index + scan_rows] - element_offsets[offset_index];
// null array, should ignore the last offset in element_offsets
for (int i = offset_index; i < offset_index + scan_rows; ++i) {
map_data.emplace_back(definitions[element_offsets[i]] == _NULL_ARRAY);
}
// fill array offset, should skip a value when parsing null array
_fill_array_offset(data_column, element_offsets, offset_index, scan_rows);
// fill nested array elements
if (LIKELY(scan_values > 0)) {
RETURN_IF_ERROR(_load_nested_column(
static_cast<ColumnArray&>(*data_column).get_data_ptr(),
const_cast<DataTypePtr&>((reinterpret_cast<const DataTypeArray*>(
remove_nullable(type).get()))
->get_nested_type()),
scan_values));
}
offset_index += scan_rows;
has_read += scan_rows;
_current_row_index += scan_rows;
}
if (offset_index != element_offsets.size() - 1) {
size_t skip_rows = element_offsets.size() - 1 - offset_index;
size_t skip_values = element_offsets.back() - element_offsets[offset_index];
RETURN_IF_ERROR(_skip_values(skip_values));
offset_index += skip_rows;
_current_row_index += skip_rows;
real_batch_size -= skip_rows;
}
DCHECK_EQ(real_batch_size, has_read);
*read_rows = has_read;
}
// fill array offset, should skip a value when parsing null array
_fill_array_offset(data_column, element_offsets);
// fill nested array elements
if (num_values > 0) {
_load_nested_column(static_cast<ColumnArray&>(*data_column).get_data_ptr(),
const_cast<DataTypePtr&>((reinterpret_cast<const DataTypeArray*>(
remove_nullable(type).get()))
->get_nested_type()),
num_values);
}
*read_rows = real_batch_size;
if (_chunk_reader->remaining_num_values() == 0 && !_chunk_reader->has_next_page()) {
*eof = true;
}
@ -244,7 +337,7 @@ Status ArrayColumnReader::_load_nested_column(ColumnPtr& doris_column, DataTypeP
(*std::move(doris_column)).mutate().get());
NullMap& map_data = nullable_column->get_null_map_data();
MutableColumnPtr data_column = nullable_column->get_nested_column_ptr();
for (int i = 0; i < read_values; ++i) {
for (int i = _def_offset; i < _def_offset + read_values; ++i) {
// should skip _EMPTY_ARRAY and _NULL_ARRAY
if (definitions[i] == _CONCRETE_ELEMENT) {
map_data.emplace_back(false);
@ -253,7 +346,7 @@ Status ArrayColumnReader::_load_nested_column(ColumnPtr& doris_column, DataTypeP
}
}
// column with null values
int start_idx = 0;
int start_idx = _def_offset;
while (start_idx < read_values) {
if (definitions[start_idx] == _CONCRETE_ELEMENT ||
definitions[start_idx] == _NULL_ELEMENT) {
@ -264,13 +357,13 @@ Status ArrayColumnReader::_load_nested_column(ColumnPtr& doris_column, DataTypeP
start_idx++;
}
}
if (start_idx == read_values) {
if (start_idx == _def_offset + read_values) {
// all values are empty array or null array
return Status::OK();
}
bool prev_is_null = definitions[start_idx] == _NULL_ELEMENT;
int num_values = 1;
for (int i = start_idx + 1; i < read_values; ++i) {
for (int i = start_idx + 1; i < _def_offset + read_values; ++i) {
if (definitions[i] == _EMPTY_ARRAY || definitions[i] == _NULL_ARRAY) {
// only decrease _remaining_num_values
_chunk_reader->skip_values(1, false);
@ -296,15 +389,16 @@ Status ArrayColumnReader::_load_nested_column(ColumnPtr& doris_column, DataTypeP
} else {
RETURN_IF_ERROR(_chunk_reader->decode_values(data_column, type, num_values));
}
_def_offset += read_values;
return Status::OK();
}
void ArrayColumnReader::close() {}
void ArrayColumnReader::_init_rep_levels_buf() {
size_t max_buf_size = _chunk_reader->remaining_num_values() < 1024
size_t max_buf_size = _chunk_reader->remaining_num_values() < 4096
? _chunk_reader->remaining_num_values()
: 1024;
: 4096;
if (_rep_levels_buf_size < max_buf_size || _rep_levels_buf == nullptr) {
_rep_levels_buf.reset(new level_t[max_buf_size]);
_rep_levels_buf_size = max_buf_size;
@ -314,11 +408,10 @@ void ArrayColumnReader::_init_rep_levels_buf() {
}
void ArrayColumnReader::_load_rep_levels() {
_rep_size = _chunk_reader->remaining_num_values() < _rep_levels_buf_size
? _chunk_reader->remaining_num_values()
: _rep_levels_buf_size;
_chunk_reader->get_rep_levels(_rep_levels_buf.get(), _rep_size);
_start_offset += _rep_size;
_rep_size = _remaining_rep_levels < _rep_levels_buf_size ? _remaining_rep_levels
: _rep_levels_buf_size;
_chunk_reader->get_rep_levels(_rep_levels_buf.get(), _rep_size);
_rep_offset = 0;
}
@ -367,20 +460,45 @@ Status ArrayColumnReader::_generate_array_offset(std::vector<size_t>& element_of
}
void ArrayColumnReader::_fill_array_offset(MutableColumnPtr& doris_column,
std::vector<size_t>& element_offsets) {
auto& offsets_data = static_cast<ColumnArray&>(*doris_column).get_offsets();
level_t* definitions = _def_levels_buf.get();
auto prev_offset = offsets_data.back();
size_t null_empty_array = 0;
for (int i = 0; i < element_offsets.size(); ++i) {
auto& offset = element_offsets[i];
if (i != 0) {
offsets_data.emplace_back(prev_offset + offset - null_empty_array);
}
if ((i != element_offsets.size() - 1) &&
(definitions[offset] == _EMPTY_ARRAY || definitions[offset] == _NULL_ARRAY)) {
null_empty_array++;
std::vector<size_t>& element_offsets, int offset_index,
size_t num_rows) {
if (LIKELY(num_rows > 0)) {
auto& offsets_data = static_cast<ColumnArray&>(*doris_column).get_offsets();
level_t* definitions = _def_levels_buf.get();
auto prev_offset = offsets_data.back();
auto base_offset = element_offsets[offset_index];
size_t null_empty_array = 0;
for (int i = offset_index; i < offset_index + num_rows + 1; ++i) {
auto& offset = element_offsets[i];
if (i != offset_index) {
offsets_data.emplace_back(prev_offset + offset - base_offset - null_empty_array);
}
if ((i != offset_index + num_rows) &&
(definitions[offset] == _EMPTY_ARRAY || definitions[offset] == _NULL_ARRAY)) {
null_empty_array++;
}
}
}
}
Status ArrayColumnReader::_skip_values(size_t num_values) {
if (LIKELY(num_values > 0)) {
level_t* definitions = _def_levels_buf.get();
bool prev_is_null = definitions[_def_offset] != _CONCRETE_ELEMENT;
int cnt = 1;
for (int i = _def_offset + 1; i < _def_offset + num_values; ++i) {
bool curr_is_null = definitions[i] != _CONCRETE_ELEMENT;
if (prev_is_null ^ curr_is_null) {
RETURN_IF_ERROR(_chunk_reader->skip_values(cnt, !prev_is_null));
prev_is_null = curr_is_null;
cnt = 1;
} else {
cnt++;
}
}
RETURN_IF_ERROR(_chunk_reader->skip_values(cnt, !prev_is_null));
_def_offset += num_values;
}
return Status::OK();
}
}; // namespace doris::vectorized

View File

@ -67,8 +67,8 @@ public:
virtual void close() = 0;
protected:
void _skipped_pages();
void _reserve_def_levels_buf(size_t size);
void _generate_read_ranges(int64_t start_index, int64_t end_index,
std::list<RowRange>& read_ranges);
const ParquetReadColumn& _column;
BufferedFileStreamReader* _stream_reader;
@ -76,9 +76,9 @@ protected:
std::vector<RowRange> _row_ranges;
cctz::time_zone* _ctz;
std::unique_ptr<ColumnChunkReader> _chunk_reader;
std::unique_ptr<level_t[]> _def_levels_buf = nullptr;
size_t _def_levels_buf_size = 0;
tparquet::OffsetIndex* _offset_index;
int64_t _current_row_index = 0;
int _row_range_index = 0;
};
class ScalarColumnReader : public ParquetColumnReader {
@ -90,6 +90,8 @@ public:
std::vector<RowRange>& row_ranges);
Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type, size_t batch_size,
size_t* read_rows, bool* eof) override;
Status _skip_values(size_t num_values);
Status _read_values(size_t num_values, ColumnPtr& doris_column, DataTypePtr& type);
void close() override;
};
@ -105,12 +107,19 @@ public:
void close() override;
private:
void _reserve_def_levels_buf(size_t size);
void _init_rep_levels_buf();
void _load_rep_levels();
Status _load_nested_column(ColumnPtr& doris_column, DataTypePtr& type, size_t read_values);
Status _generate_array_offset(std::vector<size_t>& element_offsets, size_t pre_batch_size,
size_t* real_batch_size, size_t* num_values);
void _fill_array_offset(MutableColumnPtr& doris_column, std::vector<size_t>& element_offsets);
void _fill_array_offset(MutableColumnPtr& doris_column, std::vector<size_t>& element_offsets,
int offset_index, size_t num_rows);
Status _skip_values(size_t num_values);
std::unique_ptr<level_t[]> _def_levels_buf = nullptr;
size_t _def_levels_buf_size = 0;
size_t _def_offset = 0;
std::unique_ptr<level_t[]> _rep_levels_buf = nullptr;
size_t _rep_levels_buf_size = 0;

View File

@ -29,11 +29,14 @@ PageReader::PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t l
: _reader(reader), _start_offset(offset), _end_offset(offset + length) {}
Status PageReader::next_page_header() {
if (_offset < _start_offset || _offset >= _end_offset) {
if (UNLIKELY(_offset < _start_offset || _offset >= _end_offset)) {
return Status::IOError("Out-of-bounds Access");
}
if (_offset != _next_header_offset) {
return Status::IOError("Wrong header position");
if (UNLIKELY(_offset != _next_header_offset)) {
return Status::IOError("Wrong header position, should seek to a page header first");
}
if (UNLIKELY(_state != INITIALIZED)) {
return Status::IOError("Should skip or load current page to get next page");
}
const uint8_t* page_header_buf = nullptr;
@ -58,25 +61,28 @@ Status PageReader::next_page_header() {
_offset += real_header_size;
_next_header_offset = _offset + _cur_page_header.compressed_page_size;
_state = HEADER_PARSED;
return Status::OK();
}
Status PageReader::skip_page() {
if (_offset == _next_header_offset) {
return Status::InternalError("Should call next_page() to generate page header");
if (UNLIKELY(_state != HEADER_PARSED)) {
return Status::IOError("Should generate page header first to skip current page");
}
_offset = _next_header_offset;
_state = INITIALIZED;
return Status::OK();
}
Status PageReader::get_page_data(Slice& slice) {
if (_offset == _next_header_offset) {
return Status::InternalError("Should call next_page() to generate page header");
if (UNLIKELY(_state != HEADER_PARSED)) {
return Status::IOError("Should generate page header first to load current page data");
}
slice.size = _cur_page_header.compressed_page_size;
RETURN_IF_ERROR(_reader->read_bytes(slice, _offset));
DCHECK_EQ(slice.size, _cur_page_header.compressed_page_size);
_offset += slice.size;
_state = INITIALIZED;
return Status::OK();
}

View File

@ -27,7 +27,6 @@ namespace doris::vectorized {
* Use to deserialize parquet page header, and get the page data in iterator interface.
*/
class PageReader {
public:
public:
PageReader(BufferedStreamReader* reader, uint64_t offset, uint64_t length);
~PageReader() = default;
@ -45,11 +44,15 @@ public:
void seek_to_page(int64_t page_header_offset) {
_offset = page_header_offset;
_next_header_offset = page_header_offset;
_state = INITIALIZED;
}
private:
enum PageReaderState { INITIALIZED, HEADER_PARSED };
BufferedStreamReader* _reader;
tparquet::PageHeader _cur_page_header;
PageReaderState _state = INITIALIZED;
uint64_t _offset = 0;
uint64_t _next_header_offset = 0;

View File

@ -47,6 +47,8 @@ class RowGroupReader;
class PageIndex;
struct RowRange {
RowRange() {}
RowRange(int64_t first, int64_t last) : first_row(first), last_row(last) {}
int64_t first_row;
int64_t last_row;
};