From 598038e6745526ec6acdc9101cbcfc2abdb0f815 Mon Sep 17 00:00:00 2001 From: slothever <18522955+wsjz@users.noreply.github.com> Date: Tue, 28 Feb 2023 14:23:45 +0800 Subject: [PATCH] [improvement](parquet-reader)support parquet data page v2 (#17054) Support parquet data page v2 Now the parquet data on AWS glue use data page v2, but we didn't support before. --- .../vec/exec/format/parquet/level_decoder.cpp | 24 +++++-- .../vec/exec/format/parquet/level_decoder.h | 2 + .../parquet/vparquet_column_chunk_reader.cpp | 69 +++++++++++++++---- .../parquet/vparquet_column_chunk_reader.h | 3 + .../format/parquet/vparquet_page_reader.cpp | 9 ++- 5 files changed, 86 insertions(+), 21 deletions(-) diff --git a/be/src/vec/exec/format/parquet/level_decoder.cpp b/be/src/vec/exec/format/parquet/level_decoder.cpp index 616da10b8e..387d032c05 100644 --- a/be/src/vec/exec/format/parquet/level_decoder.cpp +++ b/be/src/vec/exec/format/parquet/level_decoder.cpp @@ -20,6 +20,8 @@ #include "util/bit_util.h" #include "util/coding.h" +static constexpr size_t V1_LEVEL_SIZE = 4; + doris::Status doris::vectorized::LevelDecoder::init(doris::Slice* slice, tparquet::Encoding::type encoding, doris::vectorized::level_t max_level, @@ -30,19 +32,19 @@ doris::Status doris::vectorized::LevelDecoder::init(doris::Slice* slice, _num_levels = num_levels; switch (encoding) { case tparquet::Encoding::RLE: { - if (slice->size < 4) { + if (slice->size < V1_LEVEL_SIZE) { return Status::Corruption("Wrong parquet level format"); } uint8_t* data = (uint8_t*)slice->data; uint32_t num_bytes = decode_fixed32_le(data); - if (num_bytes > slice->size - 4) { + if (num_bytes > slice->size - V1_LEVEL_SIZE) { return Status::Corruption("Wrong parquet level format"); } - _rle_decoder = RleDecoder(data + 4, num_bytes, _bit_width); + _rle_decoder = RleDecoder(data + V1_LEVEL_SIZE, num_bytes, _bit_width); - slice->data += 4 + num_bytes; - slice->size -= 4 + num_bytes; + slice->data += V1_LEVEL_SIZE + num_bytes; + slice->size -= V1_LEVEL_SIZE + num_bytes; break; } case tparquet::Encoding::BIT_PACKED: { @@ -63,6 +65,18 @@ doris::Status doris::vectorized::LevelDecoder::init(doris::Slice* slice, return Status::OK(); } +doris::Status doris::vectorized::LevelDecoder::init_v2(const doris::Slice& levels, + doris::vectorized::level_t max_level, + uint32_t num_levels) { + _encoding = tparquet::Encoding::RLE; + _bit_width = BitUtil::log2(max_level + 1); + _max_level = max_level; + _num_levels = num_levels; + size_t byte_length = levels.size; + _rle_decoder = RleDecoder((uint8_t*)levels.data, byte_length, _bit_width); + return Status::OK(); +} + size_t doris::vectorized::LevelDecoder::get_levels(doris::vectorized::level_t* levels, size_t n) { if (_encoding == tparquet::Encoding::RLE) { n = std::min((size_t)_num_levels, n); diff --git a/be/src/vec/exec/format/parquet/level_decoder.h b/be/src/vec/exec/format/parquet/level_decoder.h index 63f3bfc34b..324f914993 100644 --- a/be/src/vec/exec/format/parquet/level_decoder.h +++ b/be/src/vec/exec/format/parquet/level_decoder.h @@ -35,6 +35,8 @@ public: Status init(Slice* slice, tparquet::Encoding::type encoding, level_t max_level, uint32_t num_levels); + Status init_v2(const Slice& levels, level_t max_level, uint32_t num_levels); + inline bool has_levels() const { return _num_levels > 0; } size_t get_levels(level_t* levels, size_t n); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index 2bfe74628d..d53023c1a2 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -65,6 +65,10 @@ Status ColumnChunkReader::next_page() { RETURN_IF_ERROR(_decode_dict_page()); // parse the real first data page return next_page(); + } else if (_page_reader->get_page_header()->type == tparquet::PageType::DATA_PAGE_V2) { + _remaining_num_values = _page_reader->get_page_header()->data_page_header_v2.num_values; + _state = HEADER_PARSED; + return Status::OK(); } else { _remaining_num_values = _page_reader->get_page_header()->data_page_header.num_values; _state = HEADER_PARSED; @@ -72,6 +76,16 @@ Status ColumnChunkReader::next_page() { } } +void ColumnChunkReader::_get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2, + Slice& page_data) { + int32_t rl = page_v2.repetition_levels_byte_length; + int32_t dl = page_v2.definition_levels_byte_length; + _v2_rep_levels = Slice(page_data.data, rl); + _v2_def_levels = Slice(page_data.data + rl, dl); + page_data.data += dl + rl; + page_data.size -= dl + rl; +} + Status ColumnChunkReader::load_page_data() { if (UNLIKELY(_state != HEADER_PARSED)) { return Status::Corruption("Should parse page header"); @@ -83,31 +97,58 @@ Status ColumnChunkReader::load_page_data() { if (_block_compress_codec != nullptr) { Slice compressed_data; RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data)); - // check decompressed buffer size - _reserve_decompress_buf(uncompressed_size); - _page_data = Slice(_decompress_buf.get(), uncompressed_size); - SCOPED_RAW_TIMER(&_statistics.decompress_time); - _statistics.decompress_cnt++; - RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data)); + if (header.__isset.data_page_header_v2) { + tparquet::DataPageHeaderV2 header_v2 = header.data_page_header_v2; + uncompressed_size -= header_v2.repetition_levels_byte_length + + header_v2.definition_levels_byte_length; + _get_uncompressed_levels(header_v2, compressed_data); + } + bool is_v2_compressed = + header.__isset.data_page_header_v2 && header.data_page_header_v2.is_compressed; + if (header.__isset.data_page_header || is_v2_compressed) { + // check decompressed buffer size + _reserve_decompress_buf(uncompressed_size); + _page_data = Slice(_decompress_buf.get(), uncompressed_size); + SCOPED_RAW_TIMER(&_statistics.decompress_time); + _statistics.decompress_cnt++; + RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data)); + } else { + // Don't need decompress + _page_data = Slice(compressed_data.data, compressed_data.size); + } } else { RETURN_IF_ERROR(_page_reader->get_page_data(_page_data)); + if (header.__isset.data_page_header_v2) { + tparquet::DataPageHeaderV2 header_v2 = header.data_page_header_v2; + _get_uncompressed_levels(header_v2, _page_data); + } } // Initialize repetition level and definition level. Skip when level = 0, which means required field. if (_max_rep_level > 0) { SCOPED_RAW_TIMER(&_statistics.decode_level_time); - RETURN_IF_ERROR(_rep_level_decoder.init(&_page_data, - header.data_page_header.repetition_level_encoding, - _max_rep_level, _remaining_num_values)); + if (header.__isset.data_page_header_v2) { + RETURN_IF_ERROR(_rep_level_decoder.init_v2(_v2_rep_levels, _max_rep_level, + _remaining_num_values)); + } else { + RETURN_IF_ERROR(_rep_level_decoder.init( + &_page_data, header.data_page_header.repetition_level_encoding, _max_rep_level, + _remaining_num_values)); + } } if (_max_def_level > 0) { SCOPED_RAW_TIMER(&_statistics.decode_level_time); - RETURN_IF_ERROR(_def_level_decoder.init(&_page_data, - header.data_page_header.definition_level_encoding, - _max_def_level, _remaining_num_values)); + if (header.__isset.data_page_header_v2) { + RETURN_IF_ERROR(_def_level_decoder.init_v2(_v2_def_levels, _max_def_level, + _remaining_num_values)); + } else { + RETURN_IF_ERROR(_def_level_decoder.init( + &_page_data, header.data_page_header.definition_level_encoding, _max_def_level, + _remaining_num_values)); + } } - - auto encoding = header.data_page_header.encoding; + auto encoding = header.__isset.data_page_header_v2 ? header.data_page_header_v2.encoding + : header.data_page_header.encoding; // change the deprecated encoding to RLE_DICTIONARY if (encoding == tparquet::Encoding::PLAIN_DICTIONARY) { encoding = tparquet::Encoding::RLE_DICTIONARY; diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index a852f97cda..2bf21fbf7b 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -148,6 +148,7 @@ private: Status _decode_dict_page(); void _reserve_decompress_buf(size_t size); int32_t _get_type_length(); + void _get_uncompressed_levels(const tparquet::DataPageHeaderV2& page_v2, Slice& page_data); ColumnChunkReaderState _state = NOT_INIT; FieldSchema* _field_schema; @@ -168,6 +169,8 @@ private: Slice _page_data; std::unique_ptr _decompress_buf; size_t _decompress_buf_size = 0; + Slice _v2_rep_levels; + Slice _v2_def_levels; bool _has_dict = false; Decoder* _page_decoder = nullptr; // Map: encoding -> Decoder diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp index cd018f4107..f32d571a7a 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp @@ -80,11 +80,16 @@ Status PageReader::get_page_data(Slice& slice) { if (UNLIKELY(_state != HEADER_PARSED)) { return Status::IOError("Should generate page header first to load current page data"); } - slice.size = _cur_page_header.compressed_page_size; + if (_cur_page_header.__isset.data_page_header_v2) { + auto& page_v2 = _cur_page_header.data_page_header_v2; + slice.size = _cur_page_header.compressed_page_size + page_v2.repetition_levels_byte_length + + page_v2.definition_levels_byte_length; + } else { + slice.size = _cur_page_header.compressed_page_size; + } RETURN_IF_ERROR(_reader->read_bytes(slice, _offset)); _offset += slice.size; _state = INITIALIZED; return Status::OK(); } - } // namespace doris::vectorized