[improvement](parquet-reader)support parquet data page v2 (#17054)

Support parquet data page v2
Now the parquet data on AWS glue use data page v2, but we didn't support before.
This commit is contained in:
slothever
2023-02-28 14:23:45 +08:00
committed by GitHub
parent 89542b3e50
commit 598038e674
5 changed files with 86 additions and 21 deletions

View File

@ -20,6 +20,8 @@
#include "util/bit_util.h"
#include "util/coding.h"
static constexpr size_t V1_LEVEL_SIZE = 4;
doris::Status doris::vectorized::LevelDecoder::init(doris::Slice* slice,
tparquet::Encoding::type encoding,
doris::vectorized::level_t max_level,
@ -30,19 +32,19 @@ doris::Status doris::vectorized::LevelDecoder::init(doris::Slice* slice,
_num_levels = num_levels;
switch (encoding) {
case tparquet::Encoding::RLE: {
if (slice->size < 4) {
if (slice->size < V1_LEVEL_SIZE) {
return Status::Corruption("Wrong parquet level format");
}
uint8_t* data = (uint8_t*)slice->data;
uint32_t num_bytes = decode_fixed32_le(data);
if (num_bytes > slice->size - 4) {
if (num_bytes > slice->size - V1_LEVEL_SIZE) {
return Status::Corruption("Wrong parquet level format");
}
_rle_decoder = RleDecoder<level_t>(data + 4, num_bytes, _bit_width);
_rle_decoder = RleDecoder<level_t>(data + V1_LEVEL_SIZE, num_bytes, _bit_width);
slice->data += 4 + num_bytes;
slice->size -= 4 + num_bytes;
slice->data += V1_LEVEL_SIZE + num_bytes;
slice->size -= V1_LEVEL_SIZE + num_bytes;
break;
}
case tparquet::Encoding::BIT_PACKED: {
@ -63,6 +65,18 @@ doris::Status doris::vectorized::LevelDecoder::init(doris::Slice* slice,
return Status::OK();
}
doris::Status doris::vectorized::LevelDecoder::init_v2(const doris::Slice& levels,
doris::vectorized::level_t max_level,
uint32_t num_levels) {
_encoding = tparquet::Encoding::RLE;
_bit_width = BitUtil::log2(max_level + 1);
_max_level = max_level;
_num_levels = num_levels;
size_t byte_length = levels.size;
_rle_decoder = RleDecoder<level_t>((uint8_t*)levels.data, byte_length, _bit_width);
return Status::OK();
}
size_t doris::vectorized::LevelDecoder::get_levels(doris::vectorized::level_t* levels, size_t n) {
if (_encoding == tparquet::Encoding::RLE) {
n = std::min((size_t)_num_levels, n);

View File

@ -35,6 +35,8 @@ public:
Status init(Slice* slice, tparquet::Encoding::type encoding, level_t max_level,
uint32_t num_levels);
Status init_v2(const Slice& levels, level_t max_level, uint32_t num_levels);
inline bool has_levels() const { return _num_levels > 0; }
size_t get_levels(level_t* levels, size_t n);

View File

@ -65,6 +65,10 @@ Status ColumnChunkReader::next_page() {
RETURN_IF_ERROR(_decode_dict_page());
// parse the real first data page
return next_page();
} else if (_page_reader->get_page_header()->type == tparquet::PageType::DATA_PAGE_V2) {
_remaining_num_values = _page_reader->get_page_header()->data_page_header_v2.num_values;
_state = HEADER_PARSED;
return Status::OK();
} else {
_remaining_num_values = _page_reader->get_page_header()->data_page_header.num_values;
_state = HEADER_PARSED;
@ -72,6 +76,16 @@ Status ColumnChunkReader::next_page() {
}
}
void ColumnChunkReader::_get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2,
Slice& page_data) {
int32_t rl = page_v2.repetition_levels_byte_length;
int32_t dl = page_v2.definition_levels_byte_length;
_v2_rep_levels = Slice(page_data.data, rl);
_v2_def_levels = Slice(page_data.data + rl, dl);
page_data.data += dl + rl;
page_data.size -= dl + rl;
}
Status ColumnChunkReader::load_page_data() {
if (UNLIKELY(_state != HEADER_PARSED)) {
return Status::Corruption("Should parse page header");
@ -83,31 +97,58 @@ Status ColumnChunkReader::load_page_data() {
if (_block_compress_codec != nullptr) {
Slice compressed_data;
RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
// check decompressed buffer size
_reserve_decompress_buf(uncompressed_size);
_page_data = Slice(_decompress_buf.get(), uncompressed_size);
SCOPED_RAW_TIMER(&_statistics.decompress_time);
_statistics.decompress_cnt++;
RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data));
if (header.__isset.data_page_header_v2) {
tparquet::DataPageHeaderV2 header_v2 = header.data_page_header_v2;
uncompressed_size -= header_v2.repetition_levels_byte_length +
header_v2.definition_levels_byte_length;
_get_uncompressed_levels(header_v2, compressed_data);
}
bool is_v2_compressed =
header.__isset.data_page_header_v2 && header.data_page_header_v2.is_compressed;
if (header.__isset.data_page_header || is_v2_compressed) {
// check decompressed buffer size
_reserve_decompress_buf(uncompressed_size);
_page_data = Slice(_decompress_buf.get(), uncompressed_size);
SCOPED_RAW_TIMER(&_statistics.decompress_time);
_statistics.decompress_cnt++;
RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data));
} else {
// Don't need decompress
_page_data = Slice(compressed_data.data, compressed_data.size);
}
} else {
RETURN_IF_ERROR(_page_reader->get_page_data(_page_data));
if (header.__isset.data_page_header_v2) {
tparquet::DataPageHeaderV2 header_v2 = header.data_page_header_v2;
_get_uncompressed_levels(header_v2, _page_data);
}
}
// Initialize repetition level and definition level. Skip when level = 0, which means required field.
if (_max_rep_level > 0) {
SCOPED_RAW_TIMER(&_statistics.decode_level_time);
RETURN_IF_ERROR(_rep_level_decoder.init(&_page_data,
header.data_page_header.repetition_level_encoding,
_max_rep_level, _remaining_num_values));
if (header.__isset.data_page_header_v2) {
RETURN_IF_ERROR(_rep_level_decoder.init_v2(_v2_rep_levels, _max_rep_level,
_remaining_num_values));
} else {
RETURN_IF_ERROR(_rep_level_decoder.init(
&_page_data, header.data_page_header.repetition_level_encoding, _max_rep_level,
_remaining_num_values));
}
}
if (_max_def_level > 0) {
SCOPED_RAW_TIMER(&_statistics.decode_level_time);
RETURN_IF_ERROR(_def_level_decoder.init(&_page_data,
header.data_page_header.definition_level_encoding,
_max_def_level, _remaining_num_values));
if (header.__isset.data_page_header_v2) {
RETURN_IF_ERROR(_def_level_decoder.init_v2(_v2_def_levels, _max_def_level,
_remaining_num_values));
} else {
RETURN_IF_ERROR(_def_level_decoder.init(
&_page_data, header.data_page_header.definition_level_encoding, _max_def_level,
_remaining_num_values));
}
}
auto encoding = header.data_page_header.encoding;
auto encoding = header.__isset.data_page_header_v2 ? header.data_page_header_v2.encoding
: header.data_page_header.encoding;
// change the deprecated encoding to RLE_DICTIONARY
if (encoding == tparquet::Encoding::PLAIN_DICTIONARY) {
encoding = tparquet::Encoding::RLE_DICTIONARY;

View File

@ -148,6 +148,7 @@ private:
Status _decode_dict_page();
void _reserve_decompress_buf(size_t size);
int32_t _get_type_length();
void _get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2, Slice& page_data);
ColumnChunkReaderState _state = NOT_INIT;
FieldSchema* _field_schema;
@ -168,6 +169,8 @@ private:
Slice _page_data;
std::unique_ptr<uint8_t[]> _decompress_buf;
size_t _decompress_buf_size = 0;
Slice _v2_rep_levels;
Slice _v2_def_levels;
bool _has_dict = false;
Decoder* _page_decoder = nullptr;
// Map: encoding -> Decoder

View File

@ -80,11 +80,16 @@ Status PageReader::get_page_data(Slice& slice) {
if (UNLIKELY(_state != HEADER_PARSED)) {
return Status::IOError("Should generate page header first to load current page data");
}
slice.size = _cur_page_header.compressed_page_size;
if (_cur_page_header.__isset.data_page_header_v2) {
auto& page_v2 = _cur_page_header.data_page_header_v2;
slice.size = _cur_page_header.compressed_page_size + page_v2.repetition_levels_byte_length +
page_v2.definition_levels_byte_length;
} else {
slice.size = _cur_page_header.compressed_page_size;
}
RETURN_IF_ERROR(_reader->read_bytes(slice, _offset));
_offset += slice.size;
_state = INITIALIZED;
return Status::OK();
}
} // namespace doris::vectorized