From dd2f834c7922406ab72f9c60e3e00b1ab66f90dc Mon Sep 17 00:00:00 2001 From: Ashin Gau Date: Thu, 8 Sep 2022 09:54:25 +0800 Subject: [PATCH] [feature-wip](parquet-reader) bug fix, create compress codec before parsing dictionary (#12422) ## Fix five bugs: 1. Parquet dictionary data may be compressed, but `ColumnChunkReader` try to parse dictionary data before creating compression codec, causing unexpected data errors. 2. `FE` doesn't resolve array type 3. `ParquetFileHdfsScanner` doesn't fill partition values when the table is partitioned 4. `ParquetFileHdfsScanner` set `_scanner_eof = true` when a scan range is empty, causing the end of the scanner, and resulting in data loss 5. typographical error in `PageReader` --- be/src/vec/exec/file_hdfs_scanner.cpp | 18 +++++----- be/src/vec/exec/file_hdfs_scanner.h | 2 +- .../exec/format/parquet/parquet_common.cpp | 34 +++++++++++++------ .../vec/exec/format/parquet/parquet_common.h | 6 ++-- .../parquet/vparquet_column_chunk_reader.cpp | 20 ++++++----- .../format/parquet/vparquet_page_reader.cpp | 2 +- .../format/parquet/vparquet_page_reader.h | 2 +- .../catalog/HiveMetaStoreClientHelper.java | 6 ++++ 8 files changed, 56 insertions(+), 34 deletions(-) diff --git a/be/src/vec/exec/file_hdfs_scanner.cpp b/be/src/vec/exec/file_hdfs_scanner.cpp index 1ba650dedd..6ba8fe1b27 100644 --- a/be/src/vec/exec/file_hdfs_scanner.cpp +++ b/be/src/vec/exec/file_hdfs_scanner.cpp @@ -37,33 +37,36 @@ Status ParquetFileHdfsScanner::open() { if (_ranges.empty()) { return Status::OK(); } - RETURN_IF_ERROR(_get_next_reader(_next_range)); + RETURN_IF_ERROR(_get_next_reader()); return Status(); } void ParquetFileHdfsScanner::_init_profiles(RuntimeProfile* profile) {} Status ParquetFileHdfsScanner::get_next(vectorized::Block* block, bool* eof) { - if (_next_range >= _ranges.size() || _scanner_eof) { + if (_scanner_eof) { *eof = true; return Status::OK(); } RETURN_IF_ERROR(init_block(block)); bool range_eof = false; RETURN_IF_ERROR(_reader->read_next_batch(block, &range_eof)); + if (block->rows() > 0) { + _fill_columns_from_path(block, block->rows()); + } if (range_eof) { - _next_range++; - RETURN_IF_ERROR(_get_next_reader(_next_range)); + RETURN_IF_ERROR(_get_next_reader()); + *eof = _scanner_eof; } return Status::OK(); } -Status ParquetFileHdfsScanner::_get_next_reader(int _next_range) { +Status ParquetFileHdfsScanner::_get_next_reader() { if (_next_range >= _ranges.size()) { _scanner_eof = true; return Status::OK(); } - const TFileRangeDesc& range = _ranges[_next_range]; + const TFileRangeDesc& range = _ranges[_next_range++]; std::unique_ptr file_reader; RETURN_IF_ERROR(FileFactory::create_file_reader(_state->exec_env(), _profile, _params, range, file_reader)); @@ -75,8 +78,7 @@ Status ParquetFileHdfsScanner::_get_next_reader(int _next_range) { _reader->init_reader(tuple_desc, _file_slot_descs, _conjunct_ctxs, _state->timezone()); if (!status.ok()) { if (status.is_end_of_file()) { - _scanner_eof = true; - return Status::OK(); + return _get_next_reader(); } return status; } diff --git a/be/src/vec/exec/file_hdfs_scanner.h b/be/src/vec/exec/file_hdfs_scanner.h index ac73444fcd..b9883b88b5 100644 --- a/be/src/vec/exec/file_hdfs_scanner.h +++ b/be/src/vec/exec/file_hdfs_scanner.h @@ -48,7 +48,7 @@ protected: void _init_profiles(RuntimeProfile* profile) override; private: - Status _get_next_reader(int _next_range); + Status _get_next_reader(); private: std::shared_ptr _reader; diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp b/be/src/vec/exec/format/parquet/parquet_common.cpp index 26c1ab7735..fd8d5d7428 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.cpp +++ b/be/src/vec/exec/format/parquet/parquet_common.cpp @@ -64,12 +64,13 @@ Status Decoder::get_decoder(tparquet::Type::type type, tparquet::Encoding::type decoder.reset(new FixLengthDecoder(type)); break; default: - return Status::InternalError("Unsupported type {} in parquet decoder", - tparquet::to_string(type)); + return Status::InternalError("Unsupported type {}(encoding={}) in parquet decoder", + tparquet::to_string(type), tparquet::to_string(encoding)); } + break; default: - return Status::InternalError("Unsupported encoding {} in parquet decoder", - tparquet::to_string(encoding)); + return Status::InternalError("Unsupported encoding {}(type={}) in parquet decoder", + tparquet::to_string(encoding), tparquet::to_string(type)); } return Status::OK(); } @@ -120,7 +121,11 @@ Status Decoder::decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, s return decode_values(data_column, data_type, num_values); } -Status FixLengthDecoder::set_dict(std::unique_ptr& dict, size_t dict_size) { +Status FixLengthDecoder::set_dict(std::unique_ptr& dict, int32_t length, + size_t num_values) { + if (num_values * _type_length != length) { + return Status::Corruption("Wrong dictionary data for fixed length type"); + } _has_dict = true; _dict = std::move(dict); return Status::OK(); @@ -261,18 +266,25 @@ Status FixLengthDecoder::decode_values(MutableColumnPtr& doris_column, DataTypeP tparquet::to_string(_physical_type), getTypeName(logical_type)); } -Status ByteArrayDecoder::set_dict(std::unique_ptr& dict, size_t dict_size) { +Status ByteArrayDecoder::set_dict(std::unique_ptr& dict, int32_t length, + size_t num_values) { _has_dict = true; _dict = std::move(dict); - _dict_offsets.resize(dict_size + 1); + _dict_offsets.resize(num_values + 1); uint32_t offset_cursor = 0; - for (int i = 0; i < dict_size; ++i) { - uint32_t length = decode_fixed32_le(_dict.get() + offset_cursor); + for (int i = 0; i < num_values; ++i) { + uint32_t l = decode_fixed32_le(_dict.get() + offset_cursor); offset_cursor += 4; _dict_offsets[i] = offset_cursor; - offset_cursor += length; + offset_cursor += l; + if (offset_cursor > length) { + return Status::Corruption("Wrong data length in dictionary"); + } } - _dict_offsets[dict_size] = offset_cursor + 4; + if (offset_cursor != length) { + return Status::Corruption("Wrong dictionary data for byte array type"); + } + _dict_offsets[num_values] = offset_cursor + 4; return Status::OK(); } diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h index 9504fd32b5..a56fdb6476 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.h +++ b/be/src/vec/exec/format/parquet/parquet_common.h @@ -113,7 +113,7 @@ public: virtual Status skip_values(size_t num_values) = 0; - virtual Status set_dict(std::unique_ptr& dict, size_t dict_size) { + virtual Status set_dict(std::unique_ptr& dict, int32_t length, size_t num_values) { return Status::NotSupported("set_dict is not supported"); } @@ -159,7 +159,7 @@ public: Status skip_values(size_t num_values) override; - Status set_dict(std::unique_ptr& dict, size_t dict_size) override; + Status set_dict(std::unique_ptr& dict, int32_t length, size_t num_values) override; void set_data(Slice* data) override; @@ -344,7 +344,7 @@ public: void set_data(Slice* data) override; - Status set_dict(std::unique_ptr& dict, size_t dict_size) override; + Status set_dict(std::unique_ptr& dict, int32_t length, size_t num_values) override; protected: template diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index 0c44446586..c8192f46b2 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -35,6 +35,8 @@ Status ColumnChunkReader::init() { : _metadata.data_page_offset; size_t chunk_size = _metadata.total_compressed_size; _page_reader = std::make_unique(_stream_reader, start_offset, chunk_size); + // get the block compression codec + RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, _block_compress_codec)); if (_metadata.__isset.dictionary_page_offset) { // seek to the directory page _page_reader->seek_to_page(_metadata.dictionary_page_offset); @@ -44,8 +46,6 @@ Status ColumnChunkReader::init() { // seek to the first data page _page_reader->seek_to_page(_metadata.data_page_offset); } - // get the block compression codec - RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, _block_compress_codec)); return Status::OK(); } @@ -70,13 +70,13 @@ Status ColumnChunkReader::load_page_data() { if (_block_compress_codec != nullptr) { Slice compressed_data; - RETURN_IF_ERROR(_page_reader->get_page_date(compressed_data)); + RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data)); // check decompressed buffer size _reserve_decompress_buf(uncompressed_size); _page_data = Slice(_decompress_buf.get(), uncompressed_size); RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &_page_data)); } else { - RETURN_IF_ERROR(_page_reader->get_page_date(_page_data)); + RETURN_IF_ERROR(_page_reader->get_page_data(_page_data)); } // Initialize repetition level and definition level. Skip when level = 0, which means required field. @@ -102,7 +102,7 @@ Status ColumnChunkReader::load_page_data() { _page_decoder = _decoders[static_cast(encoding)].get(); } else { std::unique_ptr page_decoder; - Decoder::get_decoder(_metadata.type, encoding, page_decoder); + RETURN_IF_ERROR(Decoder::get_decoder(_metadata.type, encoding, page_decoder)); // Set type length page_decoder->set_type_length(_get_type_length()); // Initialize the time convert context @@ -135,25 +135,27 @@ Status ColumnChunkReader::_decode_dict_page() { std::unique_ptr dict_data(new uint8_t[uncompressed_size]); if (_block_compress_codec != nullptr) { Slice compressed_data; - RETURN_IF_ERROR(_page_reader->get_page_date(compressed_data)); + RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data)); Slice dict_slice(dict_data.get(), uncompressed_size); RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &dict_slice)); } else { Slice dict_slice; - RETURN_IF_ERROR(_page_reader->get_page_date(dict_slice)); + RETURN_IF_ERROR(_page_reader->get_page_data(dict_slice)); // The data is stored by BufferedStreamReader, we should copy it out memcpy(dict_data.get(), dict_slice.data, dict_slice.size); } // Cache page decoder std::unique_ptr page_decoder; - Decoder::get_decoder(_metadata.type, tparquet::Encoding::RLE_DICTIONARY, page_decoder); + RETURN_IF_ERROR( + Decoder::get_decoder(_metadata.type, tparquet::Encoding::RLE_DICTIONARY, page_decoder)); // Set type length page_decoder->set_type_length(_get_type_length()); // Initialize the time convert context page_decoder->init(_field_schema, _ctz); // Set the dictionary data - RETURN_IF_ERROR(page_decoder->set_dict(dict_data, header.dictionary_page_header.num_values)); + RETURN_IF_ERROR(page_decoder->set_dict(dict_data, uncompressed_size, + header.dictionary_page_header.num_values)); _decoders[static_cast(tparquet::Encoding::RLE_DICTIONARY)] = std::move(page_decoder); return Status::OK(); diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp index fa2631e287..d2a229d7d8 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.cpp @@ -69,7 +69,7 @@ Status PageReader::skip_page() { return Status::OK(); } -Status PageReader::get_page_date(Slice& slice) { +Status PageReader::get_page_data(Slice& slice) { if (_offset == _next_header_offset) { return Status::InternalError("Should call next_page() to generate page header"); } diff --git a/be/src/vec/exec/format/parquet/vparquet_page_reader.h b/be/src/vec/exec/format/parquet/vparquet_page_reader.h index 256ddd13d1..1795cac52d 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_reader.h @@ -40,7 +40,7 @@ public: const tparquet::PageHeader* get_page_header() const { return &_cur_page_header; } - Status get_page_date(Slice& slice); + Status get_page_data(Slice& slice); void seek_to_page(int64_t page_header_offset) { _offset = page_header_offset; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index 8d95cad038..7ddd388338 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -813,6 +813,12 @@ public class HiveMetaStoreClientHelper { default: break; } + if (lowerCaseType.startsWith("array")) { + if (lowerCaseType.indexOf("<") == 5 && lowerCaseType.lastIndexOf(">") == lowerCaseType.length() - 1) { + Type innerType = hiveTypeToDorisType(lowerCaseType.substring(6, lowerCaseType.length() - 1)); + return ArrayType.create(innerType, true); + } + } if (lowerCaseType.startsWith("char")) { ScalarType type = ScalarType.createType(PrimitiveType.CHAR); Matcher match = digitPattern.matcher(lowerCaseType);