From ef2fdb79bb98d484a05943a9ba4e159f46cbe877 Mon Sep 17 00:00:00 2001 From: Qi Chen Date: Mon, 20 Feb 2023 11:42:29 +0800 Subject: [PATCH] [Improvement](parquet-reader) Optimize and refactor parquet reader to improve performance. (#16818) Optimize and refactor parquet reader to improve performance. - Improve 2x performance for small dict string by aligned copying. - Refactor code to decrease condition(if) checking. - Don't call skip(0). - Don't read page index if no condition. **ssb-flat-100**: (single-machine, single-thread) | Query | before opt | after opt | | ------------- |:-------------:| ---------:| | SELECT count(lo_revenue) FROM lineorder_flat | 9.23 | 9.12 | | SELECT count(lo_linenumber) FROM lineorder_flat | 4.50 | 4.36 | | SELECT count(c_name) FROM lineorder_flat | 18.22 | 17.88| | **SELECT count(lo_shipmode) FROM lineorder_flat** |**10.09** | **6.15**| --- be/src/vec/CMakeLists.txt | 5 + be/src/vec/columns/column.h | 5 + be/src/vec/columns/column_string.h | 46 ++ .../format/parquet/bool_plain_decoder.cpp | 85 +++ .../exec/format/parquet/bool_plain_decoder.h | 76 +++ .../parquet/byte_array_dict_decoder.cpp | 131 ++++ .../format/parquet/byte_array_dict_decoder.h | 100 +++ .../parquet/byte_array_plain_decoder.cpp | 110 ++++ .../format/parquet/byte_array_plain_decoder.h | 98 +++ be/src/vec/exec/format/parquet/decoder.cpp | 155 +++++ be/src/vec/exec/format/parquet/decoder.h | 157 +++++ .../parquet/fix_length_dict_decoder.hpp | 531 ++++++++++++++++ .../parquet/fix_length_plain_decoder.cpp | 412 +++++++++++++ .../format/parquet/fix_length_plain_decoder.h | 65 ++ .../exec/format/parquet/parquet_common.cpp | 521 ---------------- .../vec/exec/format/parquet/parquet_common.h | 573 +----------------- .../exec/format/parquet/parquet_pred_cmp.h | 3 + .../parquet/vparquet_column_chunk_reader.h | 2 +- .../format/parquet/vparquet_column_reader.cpp | 14 +- .../format/parquet/vparquet_group_reader.cpp | 1 + .../exec/format/parquet/vparquet_page_index.h | 1 + .../exec/format/parquet/vparquet_reader.cpp | 5 + be/src/vec/exec/format/table/iceberg_reader.h | 1 + 23 files changed, 2005 insertions(+), 1092 deletions(-) create mode 100644 be/src/vec/exec/format/parquet/bool_plain_decoder.cpp create mode 100644 be/src/vec/exec/format/parquet/bool_plain_decoder.h create mode 100644 be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp create mode 100644 be/src/vec/exec/format/parquet/byte_array_dict_decoder.h create mode 100644 be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp create mode 100644 be/src/vec/exec/format/parquet/byte_array_plain_decoder.h create mode 100644 be/src/vec/exec/format/parquet/decoder.cpp create mode 100644 be/src/vec/exec/format/parquet/decoder.h create mode 100644 be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp create mode 100644 be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp create mode 100644 be/src/vec/exec/format/parquet/fix_length_plain_decoder.h diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 6aaae22fef..a57815f3d6 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -281,6 +281,11 @@ set(VEC_FILES exec/format/parquet/schema_desc.cpp exec/format/parquet/vparquet_column_reader.cpp exec/format/parquet/level_decoder.cpp + exec/format/parquet/decoder.cpp + exec/format/parquet/fix_length_plain_decoder.cpp + exec/format/parquet/byte_array_plain_decoder.cpp + exec/format/parquet/byte_array_dict_decoder.cpp + exec/format/parquet/bool_plain_decoder.cpp exec/format/parquet/parquet_common.cpp exec/scan/vscan_node.cpp exec/scan/vscanner.cpp diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h index 0132ea6892..af94604749 100644 --- a/be/src/vec/columns/column.h +++ b/be/src/vec/columns/column.h @@ -265,6 +265,11 @@ public: LOG(FATAL) << "Method insert_many_binary_data is not supported for " << get_name(); } + virtual void insert_many_strings_overflow(const StringRef* strings, size_t num, + size_t max_length) { + LOG(FATAL) << "Method insert_many_strings_overflow is not supported for " << get_name(); + } + // Here `pos` points to the memory data type is the same as the data type of the column. // This function is used by `insert_keys_into_columns` in AggregationNode. virtual void insert_many_raw_data(const char* pos, size_t num) { diff --git a/be/src/vec/columns/column_string.h b/be/src/vec/columns/column_string.h index 710cba9e3e..4623d02eda 100644 --- a/be/src/vec/columns/column_string.h +++ b/be/src/vec/columns/column_string.h @@ -280,6 +280,52 @@ public: } } +#define MAX_STRINGS_OVERFLOW_SIZE 128 + template + void insert_many_strings_fixed_length(const StringRef* strings, size_t num) + __attribute__((noinline)); + + template + void insert_many_strings_fixed_length(const StringRef* strings, size_t num) { + size_t new_size = 0; + for (size_t i = 0; i < num; i++) { + new_size += strings[i].size; + } + + const size_t old_size = chars.size(); + check_chars_length(old_size + new_size, offsets.size() + num); + chars.resize(old_size + new_size + copy_length); + + Char* data = chars.data(); + size_t offset = old_size; + for (size_t i = 0; i < num; i++) { + uint32_t len = strings[i].size; + if (len) { + memcpy(data + offset, strings[i].data, copy_length); + offset += len; + } + offsets.push_back(offset); + } + chars.resize(old_size + new_size); + } + + void insert_many_strings_overflow(const StringRef* strings, size_t num, + size_t max_length) override { + if (max_length <= 8) { + insert_many_strings_fixed_length<8>(strings, num); + } else if (max_length <= 16) { + insert_many_strings_fixed_length<16>(strings, num); + } else if (max_length <= 32) { + insert_many_strings_fixed_length<32>(strings, num); + } else if (max_length <= 64) { + insert_many_strings_fixed_length<64>(strings, num); + } else if (max_length <= 128) { + insert_many_strings_fixed_length<128>(strings, num); + } else { + insert_many_strings(strings, num); + } + } + void insert_many_dict_data(const int32_t* data_array, size_t start_index, const StringRef* dict, size_t num, uint32_t /*dict_num*/) override { size_t offset_size = offsets.size(); diff --git a/be/src/vec/exec/format/parquet/bool_plain_decoder.cpp b/be/src/vec/exec/format/parquet/bool_plain_decoder.cpp new file mode 100644 index 0000000000..87e6a31828 --- /dev/null +++ b/be/src/vec/exec/format/parquet/bool_plain_decoder.cpp @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/parquet/bool_plain_decoder.h" + +namespace doris::vectorized { +Status BoolPlainDecoder::skip_values(size_t num_values) { + int skip_cached = std::min(num_unpacked_values_ - unpacked_value_idx_, (int)num_values); + unpacked_value_idx_ += skip_cached; + if (skip_cached == num_values) { + return Status::OK(); + } + int num_remaining = num_values - skip_cached; + int num_to_skip = BitUtil::RoundDownToPowerOf2(num_remaining, 32); + if (num_to_skip > 0) { + bool_values_.SkipBatch(1, num_to_skip); + } + num_remaining -= num_to_skip; + if (num_remaining > 0) { + DCHECK_LE(num_remaining, UNPACKED_BUFFER_LEN); + num_unpacked_values_ = + bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]); + if (UNLIKELY(num_unpacked_values_ < num_remaining)) { + return Status::IOError("Can't skip enough booleans in plain decoder"); + } + unpacked_value_idx_ = num_remaining; + } + return Status::OK(); +} + +Status BoolPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) { + auto& column_data = static_cast&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + bool value; + for (size_t i = 0; i < run_length; ++i) { + if (UNLIKELY(!_decode_value(&value))) { + return Status::IOError("Can't read enough booleans in plain decoder"); + } + column_data[data_index++] = (UInt8)value; + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + bool value; + for (int i = 0; i < run_length; ++i) { + if (UNLIKELY(!_decode_value(&value))) { + return Status::IOError("Can't read enough booleans in plain decoder"); + } + } + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/bool_plain_decoder.h b/be/src/vec/exec/format/parquet/bool_plain_decoder.h new file mode 100644 index 0000000000..77ab7f4ccd --- /dev/null +++ b/be/src/vec/exec/format/parquet/bool_plain_decoder.h @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "util/bit_stream_utils.inline.h" +#include "vec/exec/format/parquet/decoder.h" + +namespace doris::vectorized { +/// Decoder bit-packed boolean-encoded values. +/// Implementation from https://github.com/apache/impala/blob/master/be/src/exec/parquet/parquet-bool-decoder.h +class BoolPlainDecoder final : public Decoder { +public: + BoolPlainDecoder() = default; + ~BoolPlainDecoder() override = default; + + // Set the data to be decoded + void set_data(Slice* data) override { + bool_values_.Reset((const uint8_t*)data->data, data->size); + num_unpacked_values_ = 0; + unpacked_value_idx_ = 0; + _offset = 0; + } + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override; + + Status skip_values(size_t num_values) override; + +protected: + inline bool _decode_value(bool* value) { + if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) { + *value = unpacked_values_[unpacked_value_idx_++]; + } else { + num_unpacked_values_ = + bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]); + if (UNLIKELY(num_unpacked_values_ == 0)) { + return false; + } + *value = unpacked_values_[0]; + unpacked_value_idx_ = 1; + } + return true; + } + + /// A buffer to store unpacked values. Must be a multiple of 32 size to use the + /// batch-oriented interface of BatchedBitReader. We use uint8_t instead of bool because + /// bit unpacking is only supported for unsigned integers. The values are converted to + /// bool when returned to the user. + static const int UNPACKED_BUFFER_LEN = 128; + uint8_t unpacked_values_[UNPACKED_BUFFER_LEN]; + + /// The number of valid values in 'unpacked_values_'. + int num_unpacked_values_ = 0; + + /// The next value to return from 'unpacked_values_'. + int unpacked_value_idx_ = 0; + + /// Bit packed decoder, used if 'encoding_' is PLAIN. + BatchedBitReader bool_values_; +}; +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp new file mode 100644 index 0000000000..c30d74b19d --- /dev/null +++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/parquet/byte_array_dict_decoder.h" + +#include "util/coding.h" +#include "vec/columns/column_dictionary.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +Status ByteArrayDictDecoder::set_dict(std::unique_ptr& dict, int32_t length, + size_t num_values) { + _dict = std::move(dict); + _dict_items.reserve(num_values); + uint32_t offset_cursor = 0; + char* dict_item_address = reinterpret_cast(_dict.get()); + + size_t total_length = 0; + for (int i = 0; i < num_values; ++i) { + uint32_t l = decode_fixed32_le(_dict.get() + offset_cursor); + offset_cursor += 4; + offset_cursor += l; + total_length += l; + } + + // For insert_many_strings_overflow + _dict_data.resize(total_length + MAX_STRINGS_OVERFLOW_SIZE); + _max_value_length = 0; + size_t offset = 0; + offset_cursor = 0; + for (int i = 0; i < num_values; ++i) { + uint32_t l = decode_fixed32_le(_dict.get() + offset_cursor); + offset_cursor += 4; + memcpy(&_dict_data[offset], dict_item_address + offset_cursor, l); + _dict_items.emplace_back(&_dict_data[offset], l); + offset_cursor += l; + offset += l; + if (offset_cursor > length) { + return Status::Corruption("Wrong data length in dictionary"); + } + if (l > _max_value_length) { + _max_value_length = l; + } + } + if (offset_cursor != length) { + return Status::Corruption("Wrong dictionary data for byte array type"); + } + return Status::OK(); +} + +Status ByteArrayDictDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) { + size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); + if (doris_column->is_column_dictionary() && + assert_cast(*doris_column).dict_size() == 0) { + assert_cast(*doris_column) + .insert_many_dict_data(&_dict_items[0], _dict_items.size()); + } + _indexes.resize(non_null_size); + _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); + + if (doris_column->is_column_dictionary()) { + return _decode_dict_values(doris_column, select_vector); + } + + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { + case TypeIndex::String: + case TypeIndex::FixedString: { + size_t dict_index = 0; + + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + std::vector string_values; + string_values.reserve(run_length); + for (size_t i = 0; i < run_length; ++i) { + string_values.emplace_back(_dict_items[_indexes[dict_index++]]); + } + doris_column->insert_many_strings_overflow(&string_values[0], run_length, + _max_value_length); + break; + } + case ColumnSelectVector::NULL_DATA: { + doris_column->insert_many_defaults(run_length); + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + case TypeIndex::Decimal32: + return _decode_binary_decimal(doris_column, data_type, select_vector); + case TypeIndex::Decimal64: + return _decode_binary_decimal(doris_column, data_type, select_vector); + case TypeIndex::Decimal128: + return _decode_binary_decimal(doris_column, data_type, select_vector); + case TypeIndex::Decimal128I: + return _decode_binary_decimal(doris_column, data_type, select_vector); + default: + break; + } + return Status::InvalidArgument( + "Can't decode parquet physical type BYTE_ARRAY to doris logical type {}", + getTypeName(logical_type)); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h new file mode 100644 index 0000000000..a90226d7c3 --- /dev/null +++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "gutil/endian.h" +#include "util/rle_encoding.h" +#include "vec/exec/format/parquet/decoder.h" + +namespace doris::vectorized { + +class ByteArrayDictDecoder final : public BaseDictDecoder { +public: + ByteArrayDictDecoder() = default; + ~ByteArrayDictDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override; + + Status set_dict(std::unique_ptr& dict, int32_t length, size_t num_values) override; + +protected: + template + Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector); + + // For dictionary encoding + std::vector _dict_items; + std::vector _dict_data; + size_t _max_value_length; +}; + +template +Status ByteArrayDictDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, + DataTypePtr& data_type, + ColumnSelectVector& select_vector) { + init_decimal_converter(data_type); + auto& column_data = + static_cast>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + DecimalScaleParams& scale_params = _decode_params->decimal_scale; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + StringRef& slice = _dict_items[_indexes[dict_index++]]; + char* buf_start = const_cast(slice.data); + uint32_t length = (uint32_t)slice.size; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + Int128 value = buf_start[0] & 0x80 ? -1 : 0; + memcpy(reinterpret_cast(&value) + sizeof(Int128) - length, buf_start, + length); + value = BigEndian::ToHost128(value); + if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } + auto& v = reinterpret_cast(column_data[data_index++]); + v = (DecimalPrimitiveType)value; + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp b/be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp new file mode 100644 index 0000000000..aac4fcb4a2 --- /dev/null +++ b/be/src/vec/exec/format/parquet/byte_array_plain_decoder.cpp @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/parquet/byte_array_plain_decoder.h" + +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +Status ByteArrayPlainDecoder::skip_values(size_t num_values) { + for (int i = 0; i < num_values; ++i) { + if (UNLIKELY(_offset + 4 > _data->size)) { + return Status::IOError("Can't read byte array length from plain decoder"); + } + uint32_t length = + decode_fixed32_le(reinterpret_cast(_data->data) + _offset); + _offset += 4; + if (UNLIKELY(_offset + length) > _data->size) { + return Status::IOError("Can't skip enough bytes in plain decoder"); + } + _offset += length; + } + return Status::OK(); +} + +Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) { + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { + case TypeIndex::String: + case TypeIndex::FixedString: { + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + std::vector string_values; + string_values.reserve(run_length); + for (size_t i = 0; i < run_length; ++i) { + if (UNLIKELY(_offset + 4 > _data->size)) { + return Status::IOError("Can't read byte array length from plain decoder"); + } + uint32_t length = decode_fixed32_le( + reinterpret_cast(_data->data) + _offset); + _offset += 4; + if (UNLIKELY(_offset + length) > _data->size) { + return Status::IOError("Can't read enough bytes in plain decoder"); + } + string_values.emplace_back(_data->data + _offset, length); + _offset += length; + } + doris_column->insert_many_strings(&string_values[0], run_length); + break; + } + case ColumnSelectVector::NULL_DATA: { + doris_column->insert_many_defaults(run_length); + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + for (int i = 0; i < run_length; ++i) { + if (UNLIKELY(_offset + 4 > _data->size)) { + return Status::IOError("Can't read byte array length from plain decoder"); + } + uint32_t length = decode_fixed32_le( + reinterpret_cast(_data->data) + _offset); + _offset += 4; + if (UNLIKELY(_offset + length) > _data->size) { + return Status::IOError("Can't read enough bytes in plain decoder"); + } + _offset += length; + } + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + case TypeIndex::Decimal32: + return _decode_binary_decimal(doris_column, data_type, select_vector); + case TypeIndex::Decimal64: + return _decode_binary_decimal(doris_column, data_type, select_vector); + case TypeIndex::Decimal128: + return _decode_binary_decimal(doris_column, data_type, select_vector); + case TypeIndex::Decimal128I: + return _decode_binary_decimal(doris_column, data_type, select_vector); + default: + break; + } + return Status::InvalidArgument( + "Can't decode parquet physical type BYTE_ARRAY to doris logical type {}", + getTypeName(logical_type)); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/byte_array_plain_decoder.h b/be/src/vec/exec/format/parquet/byte_array_plain_decoder.h new file mode 100644 index 0000000000..84bed0dec3 --- /dev/null +++ b/be/src/vec/exec/format/parquet/byte_array_plain_decoder.h @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "util/coding.h" +#include "vec/exec/format/parquet/decoder.h" + +namespace doris::vectorized { + +class ByteArrayPlainDecoder final : public Decoder { +public: + ByteArrayPlainDecoder() = default; + ~ByteArrayPlainDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override; + + Status skip_values(size_t num_values) override; + +protected: + template + Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector); +}; + +template +Status ByteArrayPlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, + DataTypePtr& data_type, + ColumnSelectVector& select_vector) { + init_decimal_converter(data_type); + auto& column_data = + static_cast>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + DecimalScaleParams& scale_params = _decode_params->decimal_scale; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + if (UNLIKELY(_offset + 4 > _data->size)) { + return Status::IOError("Can't read byte array length from plain decoder"); + } + uint32_t length = + decode_fixed32_le(reinterpret_cast(_data->data) + _offset); + _offset += 4; + char* buf_start = _data->data + _offset; + _offset += length; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + Int128 value = buf_start[0] & 0x80 ? -1 : 0; + memcpy(reinterpret_cast(&value) + sizeof(Int128) - length, buf_start, + length); + value = BigEndian::ToHost128(value); + if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } + auto& v = reinterpret_cast(column_data[data_index++]); + v = (DecimalPrimitiveType)value; + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + _offset += _type_length * run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/decoder.cpp b/be/src/vec/exec/format/parquet/decoder.cpp new file mode 100644 index 0000000000..41313cf327 --- /dev/null +++ b/be/src/vec/exec/format/parquet/decoder.cpp @@ -0,0 +1,155 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/parquet/decoder.h" + +#include "vec/data_types/data_type_decimal.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/exec/format/parquet/bool_plain_decoder.h" +#include "vec/exec/format/parquet/byte_array_dict_decoder.h" +#include "vec/exec/format/parquet/byte_array_plain_decoder.h" +#include "vec/exec/format/parquet/fix_length_dict_decoder.hpp" +#include "vec/exec/format/parquet/fix_length_plain_decoder.h" + +namespace doris::vectorized { + +const cctz::time_zone DecodeParams::utc0 = cctz::utc_time_zone(); + +Status Decoder::get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding, + std::unique_ptr& decoder) { + switch (encoding) { + case tparquet::Encoding::PLAIN: + switch (type) { + case tparquet::Type::BOOLEAN: + decoder.reset(new BoolPlainDecoder()); + break; + case tparquet::Type::BYTE_ARRAY: + decoder.reset(new ByteArrayPlainDecoder()); + break; + case tparquet::Type::INT32: + case tparquet::Type::INT64: + case tparquet::Type::INT96: + case tparquet::Type::FLOAT: + case tparquet::Type::DOUBLE: + case tparquet::Type::FIXED_LEN_BYTE_ARRAY: + decoder.reset(new FixLengthPlainDecoder(type)); + break; + default: + return Status::InternalError("Unsupported type {}(encoding={}) in parquet decoder", + tparquet::to_string(type), tparquet::to_string(encoding)); + } + break; + case tparquet::Encoding::RLE_DICTIONARY: + switch (type) { + case tparquet::Type::BOOLEAN: + if (encoding != tparquet::Encoding::PLAIN) { + return Status::InternalError("Bool type can't has dictionary page"); + } + case tparquet::Type::BYTE_ARRAY: + decoder.reset(new ByteArrayDictDecoder()); + break; + case tparquet::Type::INT32: + decoder.reset(new FixLengthDictDecoder(type)); + break; + case tparquet::Type::INT64: + decoder.reset(new FixLengthDictDecoder(type)); + break; + case tparquet::Type::INT96: + decoder.reset(new FixLengthDictDecoder(type)); + break; + case tparquet::Type::FLOAT: + decoder.reset(new FixLengthDictDecoder(type)); + break; + case tparquet::Type::DOUBLE: + decoder.reset(new FixLengthDictDecoder(type)); + break; + case tparquet::Type::FIXED_LEN_BYTE_ARRAY: + decoder.reset(new FixLengthDictDecoder(type)); + break; + default: + return Status::InternalError("Unsupported type {}(encoding={}) in parquet decoder", + tparquet::to_string(type), tparquet::to_string(encoding)); + } + break; + default: + return Status::InternalError("Unsupported encoding {}(type={}) in parquet decoder", + tparquet::to_string(encoding), tparquet::to_string(type)); + } + return Status::OK(); +} + +void Decoder::init(FieldSchema* field_schema, cctz::time_zone* ctz) { + _field_schema = field_schema; + if (_decode_params == nullptr) { + _decode_params.reset(new DecodeParams()); + } + if (ctz != nullptr) { + _decode_params->ctz = ctz; + } + const auto& schema = field_schema->parquet_schema; + if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) { + const auto& timestamp_info = schema.logicalType.TIMESTAMP; + if (!timestamp_info.isAdjustedToUTC) { + // should set timezone to utc+0 + _decode_params->ctz = const_cast(&_decode_params->utc0); + } + const auto& time_unit = timestamp_info.unit; + if (time_unit.__isset.MILLIS) { + _decode_params->second_mask = 1000; + _decode_params->scale_to_nano_factor = 1000000; + } else if (time_unit.__isset.MICROS) { + _decode_params->second_mask = 1000000; + _decode_params->scale_to_nano_factor = 1000; + } else if (time_unit.__isset.NANOS) { + _decode_params->second_mask = 1000000000; + _decode_params->scale_to_nano_factor = 1; + } + } else if (schema.__isset.converted_type) { + const auto& converted_type = schema.converted_type; + if (converted_type == tparquet::ConvertedType::TIMESTAMP_MILLIS) { + _decode_params->second_mask = 1000; + _decode_params->scale_to_nano_factor = 1000000; + } else if (converted_type == tparquet::ConvertedType::TIMESTAMP_MICROS) { + _decode_params->second_mask = 1000000; + _decode_params->scale_to_nano_factor = 1000; + } + } +} +template +void Decoder::init_decimal_converter(DataTypePtr& data_type) { + if (_decode_params == nullptr || _field_schema == nullptr || + _decode_params->decimal_scale.scale_type != DecimalScaleParams::NOT_INIT) { + return; + } + auto scale = _field_schema->parquet_schema.scale; + auto* decimal_type = reinterpret_cast>*>( + const_cast(remove_nullable(data_type).get())); + auto dest_scale = decimal_type->get_scale(); + if (dest_scale > scale) { + _decode_params->decimal_scale.scale_type = DecimalScaleParams::SCALE_UP; + _decode_params->decimal_scale.scale_factor = + DecimalScaleParams::get_scale_factor(dest_scale - scale); + } else if (dest_scale < scale) { + _decode_params->decimal_scale.scale_type = DecimalScaleParams::SCALE_DOWN; + _decode_params->decimal_scale.scale_factor = + DecimalScaleParams::get_scale_factor(scale - dest_scale); + } else { + _decode_params->decimal_scale.scale_type = DecimalScaleParams::NO_SCALE; + _decode_params->decimal_scale.scale_factor = 1; + } +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/decoder.h b/be/src/vec/exec/format/parquet/decoder.h new file mode 100644 index 0000000000..84683ae187 --- /dev/null +++ b/be/src/vec/exec/format/parquet/decoder.h @@ -0,0 +1,157 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "gen_cpp/parquet_types.h" +#include "schema_desc.h" +#include "util/rle_encoding.h" +#include "vec/columns/column_dictionary.h" +#include "vec/data_types/data_type.h" +#include "vec/exec/format/format_common.h" +#include "vec/exec/format/parquet/parquet_common.h" + +namespace doris::vectorized { + +#define FOR_LOGICAL_NUMERIC_TYPES(M) \ + M(TypeIndex::Int8, Int8, Int32) \ + M(TypeIndex::UInt8, UInt8, Int32) \ + M(TypeIndex::Int16, Int16, Int32) \ + M(TypeIndex::UInt16, UInt16, Int32) \ + M(TypeIndex::Int32, Int32, Int32) \ + M(TypeIndex::UInt32, UInt32, Int32) \ + M(TypeIndex::Int64, Int64, Int64) \ + M(TypeIndex::UInt64, UInt64, Int64) \ + M(TypeIndex::Float32, Float32, Float32) \ + M(TypeIndex::Float64, Float64, Float64) + +struct DecodeParams { + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == false + static const cctz::time_zone utc0; + // schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set the time zone + cctz::time_zone* ctz = nullptr; + int64_t second_mask = 1; + int64_t scale_to_nano_factor = 1; + DecimalScaleParams decimal_scale; +}; + +class Decoder { +public: + Decoder() = default; + virtual ~Decoder() = default; + + static Status get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding, + std::unique_ptr& decoder); + + // The type with fix length + void set_type_length(int32_t type_length) { _type_length = type_length; } + + // Set the data to be decoded + virtual void set_data(Slice* data) { + _data = data; + _offset = 0; + } + + void init(FieldSchema* field_schema, cctz::time_zone* ctz); + + template + void init_decimal_converter(DataTypePtr& data_type); + + // Write the decoded values batch to doris's column + virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) = 0; + + virtual Status skip_values(size_t num_values) = 0; + + virtual Status set_dict(std::unique_ptr& dict, int32_t length, size_t num_values) { + return Status::NotSupported("set_dict is not supported"); + } + +protected: + int32_t _type_length; + Slice* _data = nullptr; + uint32_t _offset = 0; + FieldSchema* _field_schema = nullptr; + std::unique_ptr _decode_params = nullptr; +}; + +class BaseDictDecoder : public Decoder { +public: + BaseDictDecoder() = default; + virtual ~BaseDictDecoder() override = default; + + // Set the data to be decoded + virtual void set_data(Slice* data) override { + _data = data; + _offset = 0; + uint8_t bit_width = *data->data; + _index_batch_decoder.reset( + new RleBatchDecoder(reinterpret_cast(data->data) + 1, + static_cast(data->size) - 1, bit_width)); + } + +protected: + /** + * Decode dictionary-coded values into doris_column, ensure that doris_column is ColumnDictI32 type, + * and the coded values must be read into _indexes previously. + */ + Status _decode_dict_values(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + DCHECK(doris_column->is_column_dictionary()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + auto& column_data = assert_cast(*doris_column).get_data(); + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + uint32_t* start_index = &_indexes[0]; + column_data.insert(start_index + dict_index, start_index + dict_index + run_length); + dict_index += run_length; + break; + } + case ColumnSelectVector::NULL_DATA: { + doris_column->insert_many_defaults(run_length); + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + break; + } + } + } + return Status::OK(); + } + + Status skip_values(size_t num_values) override { + _indexes.resize(num_values); + _index_batch_decoder->GetBatch(&_indexes[0], num_values); + return Status::OK(); + } + +protected: + // For dictionary encoding + std::unique_ptr _dict = nullptr; + std::unique_ptr> _index_batch_decoder = nullptr; + std::vector _indexes; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp new file mode 100644 index 0000000000..8df7a864ac --- /dev/null +++ b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp @@ -0,0 +1,531 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/columns/column_dictionary.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +template +class FixLengthDictDecoder final : public BaseDictDecoder { +public: + FixLengthDictDecoder(tparquet::Type::type physical_type) + : BaseDictDecoder(), _physical_type(physical_type) {}; + ~FixLengthDictDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); + if (doris_column->is_column_dictionary() && + assert_cast(*doris_column).dict_size() == 0) { + std::vector dict_items; + dict_items.reserve(_dict_items.size()); + for (int i = 0; i < _dict_items.size(); ++i) { + dict_items.emplace_back((char*)(&_dict_items[i]), _type_length); + } + assert_cast(*doris_column) + .insert_many_dict_data(&dict_items[0], dict_items.size()); + } + _indexes.resize(non_null_size); + _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); + + if (doris_column->is_column_dictionary()) { + return _decode_dict_values(doris_column, select_vector); + } + + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { +#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \ + case NUMERIC_TYPE: \ + if constexpr (std::is_same_v) { \ + return _decode_numeric(doris_column, select_vector); \ + } + FOR_LOGICAL_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + case TypeIndex::Date: + if constexpr (std::is_same_v) { + return _decode_date(doris_column, select_vector); + } + break; + case TypeIndex::DateV2: + if constexpr (std::is_same_v) { + return _decode_date, UInt32>(doris_column, + select_vector); + } + break; + case TypeIndex::DateTime: + if constexpr (std::is_same_v) { + return _decode_datetime96(doris_column, select_vector); + } else if constexpr (std::is_same_v) { + return _decode_datetime64(doris_column, select_vector); + } + break; + case TypeIndex::DateTimeV2: + // Spark can set the timestamp precision by the following configuration: + // spark.sql.parquet.outputTimestampType = INT96(NANOS), TIMESTAMP_MICROS, TIMESTAMP_MILLIS + if constexpr (std::is_same_v) { + return _decode_datetime96, UInt64>(doris_column, + select_vector); + } else if constexpr (std::is_same_v) { + return _decode_datetime64, UInt64>(doris_column, + select_vector); + } + break; + case TypeIndex::Decimal32: + if constexpr (std::is_same_v) { + return _decode_primitive_decimal(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v) { + return _decode_primitive_decimal(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal64: + if constexpr (std::is_same_v) { + return _decode_primitive_decimal(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v) { + return _decode_primitive_decimal(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal128: + if constexpr (std::is_same_v) { + return _decode_primitive_decimal(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v) { + return _decode_primitive_decimal(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::Decimal128I: + if constexpr (std::is_same_v) { + return _decode_primitive_decimal(doris_column, data_type, + select_vector); + } else if constexpr (std::is_same_v) { + return _decode_primitive_decimal(doris_column, data_type, + select_vector); + } + break; + case TypeIndex::String: + case TypeIndex::FixedString: + break; + default: + break; + } + + return Status::InvalidArgument( + "Can't decode parquet physical type {} to doris logical type {}", + tparquet::to_string(_physical_type), getTypeName(logical_type)); + } + + Status set_dict(std::unique_ptr& dict, int32_t length, size_t num_values) override { + if (num_values * _type_length != length) { + return Status::Corruption("Wrong dictionary data for fixed length type"); + } + _dict = std::move(dict); + char* dict_item_address = reinterpret_cast(_dict.get()); + _dict_items.resize(num_values); + for (size_t i = 0; i < num_values; ++i) { + _dict_items[i] = *(T*)dict_item_address; + dict_item_address += _type_length; + } + return Status::OK(); + } + +protected: + template + Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + column_data[data_index++] = + static_cast(_dict_items[_indexes[dict_index++]]); + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template + Status _decode_date(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + int64_t date_value = _dict_items[_indexes[dict_index++]]; + auto& v = reinterpret_cast(column_data[data_index++]); + v.from_unixtime(date_value * 24 * 60 * 60, + *_decode_params->ctz); // day to seconds + if constexpr (std::is_same_v) { + // we should cast to date if using date v1. + v.cast_to_date(); + } + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template + Status _decode_datetime64(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + int64_t date_value = _dict_items[_indexes[dict_index++]]; + auto& v = reinterpret_cast(column_data[data_index++]); + v.from_unixtime(date_value / _decode_params->second_mask, *_decode_params->ctz); + if constexpr (std::is_same_v>) { + // nanoseconds will be ignored. + v.set_microsecond((date_value % _decode_params->second_mask) * + _decode_params->scale_to_nano_factor / 1000); + // TODO: the precision of datetime v1 + } + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template + Status _decode_datetime96(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + auto& column_data = static_cast&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + ParquetInt96& datetime96 = _dict_items[_indexes[dict_index++]]; + auto& v = reinterpret_cast(column_data[data_index++]); + int64_t micros = datetime96.to_timestamp_micros(); + v.from_unixtime(micros / 1000000, *_decode_params->ctz); + if constexpr (std::is_same_v>) { + // spark.sql.parquet.outputTimestampType = INT96(NANOS) will lost precision. + // only keep microseconds. + v.set_microsecond(micros % 1000000); + } + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + template + Status _decode_primitive_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) { + init_decimal_converter(data_type); + auto& column_data = + static_cast>&>(*doris_column) + .get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + DecimalScaleParams& scale_params = _decode_params->decimal_scale; + + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + // we should use decimal128 to scale up/down + Int128 value = static_cast(_dict_items[_indexes[dict_index++]]); + if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } + auto& v = reinterpret_cast(column_data[data_index++]); + v = (DecimalPrimitiveType)value; + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + tparquet::Type::type _physical_type; + + // For dictionary encoding + std::vector _dict_items; +}; + +template <> +class FixLengthDictDecoder final : public BaseDictDecoder { +public: + FixLengthDictDecoder(tparquet::Type::type physical_type) + : BaseDictDecoder(), _physical_type(physical_type) {}; + ~FixLengthDictDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); + if (doris_column->is_column_dictionary() && + assert_cast(*doris_column).dict_size() == 0) { + std::vector dict_items; + dict_items.reserve(_dict_items.size()); + for (int i = 0; i < _dict_items.size(); ++i) { + dict_items.emplace_back(_dict_items[i], _type_length); + } + assert_cast(*doris_column) + .insert_many_dict_data(&dict_items[0], dict_items.size()); + } + _indexes.resize(non_null_size); + _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); + + if (doris_column->is_column_dictionary()) { + return _decode_dict_values(doris_column, select_vector); + } + + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { + case TypeIndex::Decimal32: + if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + return _decode_binary_decimal(doris_column, data_type, select_vector); + } + break; + case TypeIndex::Decimal64: + if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + return _decode_binary_decimal(doris_column, data_type, select_vector); + } + break; + case TypeIndex::Decimal128: + if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + return _decode_binary_decimal(doris_column, data_type, select_vector); + } + break; + case TypeIndex::Decimal128I: + if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + return _decode_binary_decimal(doris_column, data_type, select_vector); + } + break; + case TypeIndex::String: + case TypeIndex::FixedString: + if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + return _decode_string(doris_column, select_vector); + } + break; + default: + break; + } + + return Status::InvalidArgument( + "Can't decode parquet physical type {} to doris logical type {}", + tparquet::to_string(_physical_type), getTypeName(logical_type)); + } + + Status skip_values(size_t num_values) override { + _indexes.resize(num_values); + _index_batch_decoder->GetBatch(&_indexes[0], num_values); + return Status::OK(); + } + + Status set_dict(std::unique_ptr& dict, int32_t length, size_t num_values) override { + if (num_values * _type_length != length) { + return Status::Corruption("Wrong dictionary data for fixed length type"); + } + _dict = std::move(dict); + char* dict_item_address = reinterpret_cast(_dict.get()); + _dict_items.resize(num_values); + for (size_t i = 0; i < num_values; ++i) { + _dict_items[i] = dict_item_address; + dict_item_address += _type_length; + } + return Status::OK(); + } + +protected: + template + Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) { + init_decimal_converter(data_type); + auto& column_data = + static_cast>&>(*doris_column) + .get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t dict_index = 0; + DecimalScaleParams& scale_params = _decode_params->decimal_scale; + + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + char* buf_start = _dict_items[_indexes[dict_index++]]; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + Int128 value = buf_start[0] & 0x80 ? -1 : 0; + memcpy(reinterpret_cast(&value) + sizeof(Int128) - _type_length, + buf_start, _type_length); + value = BigEndian::ToHost128(value); + if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } + auto& v = reinterpret_cast(column_data[data_index++]); + v = (DecimalPrimitiveType)value; + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + Status _decode_string(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) { + size_t dict_index = 0; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + std::vector string_values; + string_values.reserve(run_length); + for (size_t i = 0; i < run_length; ++i) { + string_values.emplace_back(_dict_items[_indexes[dict_index++]], _type_length); + } + doris_column->insert_many_strings(&string_values[0], run_length); + break; + } + case ColumnSelectVector::NULL_DATA: { + doris_column->insert_many_defaults(run_length); + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + dict_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); + } + + tparquet::Type::type _physical_type; + + // For dictionary encoding + std::vector _dict_items; +}; + +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp new file mode 100644 index 0000000000..0bf505f230 --- /dev/null +++ b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.cpp @@ -0,0 +1,412 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/parquet/fix_length_plain_decoder.h" + +#include "gutil/endian.h" +#include "vec/columns/column_nullable.h" +#include "vec/data_types/data_type_nullable.h" + +namespace doris::vectorized { + +Status FixLengthPlainDecoder::skip_values(size_t num_values) { + _offset += _type_length * num_values; + if (UNLIKELY(_offset > _data->size)) { + return Status::IOError("Out-of-bounds access in parquet data decoder"); + } + return Status::OK(); +} + +Status FixLengthPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) { + size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); + if (UNLIKELY(_offset + _type_length * non_null_size > _data->size)) { + return Status::IOError("Out-of-bounds access in parquet data decoder"); + } + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { +#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \ + case NUMERIC_TYPE: \ + return _decode_numeric(doris_column, select_vector); + FOR_LOGICAL_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + case TypeIndex::Date: + if (_physical_type == tparquet::Type::INT32) { + return _decode_date(doris_column, select_vector); + } + break; + case TypeIndex::DateV2: + if (_physical_type == tparquet::Type::INT32) { + return _decode_date, UInt32>(doris_column, select_vector); + } + break; + case TypeIndex::DateTime: + if (_physical_type == tparquet::Type::INT96) { + return _decode_datetime96(doris_column, select_vector); + } else if (_physical_type == tparquet::Type::INT64) { + return _decode_datetime64(doris_column, select_vector); + } + break; + case TypeIndex::DateTimeV2: + // Spark can set the timestamp precision by the following configuration: + // spark.sql.parquet.outputTimestampType = INT96(NANOS), TIMESTAMP_MICROS, TIMESTAMP_MILLIS + if (_physical_type == tparquet::Type::INT96) { + return _decode_datetime96, UInt64>(doris_column, + select_vector); + } else if (_physical_type == tparquet::Type::INT64) { + return _decode_datetime64, UInt64>(doris_column, + select_vector); + } + break; + case TypeIndex::Decimal32: + if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + return _decode_binary_decimal(doris_column, data_type, select_vector); + } else if (_physical_type == tparquet::Type::INT32) { + return _decode_primitive_decimal(doris_column, data_type, select_vector); + } else if (_physical_type == tparquet::Type::INT64) { + return _decode_primitive_decimal(doris_column, data_type, select_vector); + } + break; + case TypeIndex::Decimal64: + if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + return _decode_binary_decimal(doris_column, data_type, select_vector); + } else if (_physical_type == tparquet::Type::INT32) { + return _decode_primitive_decimal(doris_column, data_type, select_vector); + } else if (_physical_type == tparquet::Type::INT64) { + return _decode_primitive_decimal(doris_column, data_type, select_vector); + } + break; + case TypeIndex::Decimal128: + if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + return _decode_binary_decimal(doris_column, data_type, select_vector); + } else if (_physical_type == tparquet::Type::INT32) { + return _decode_primitive_decimal(doris_column, data_type, select_vector); + } else if (_physical_type == tparquet::Type::INT64) { + return _decode_primitive_decimal(doris_column, data_type, select_vector); + } + break; + case TypeIndex::Decimal128I: + if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + return _decode_binary_decimal(doris_column, data_type, select_vector); + } else if (_physical_type == tparquet::Type::INT32) { + return _decode_primitive_decimal(doris_column, data_type, select_vector); + } else if (_physical_type == tparquet::Type::INT64) { + return _decode_primitive_decimal(doris_column, data_type, select_vector); + } + break; + case TypeIndex::String: + case TypeIndex::FixedString: + if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { + return _decode_string(doris_column, select_vector); + } + break; + default: + break; + } + + return Status::InvalidArgument("Can't decode parquet physical type {} to doris logical type {}", + tparquet::to_string(_physical_type), getTypeName(logical_type)); +} + +Status FixLengthPlainDecoder::_decode_string(MutableColumnPtr& doris_column, + ColumnSelectVector& select_vector) { + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + std::vector string_values; + string_values.reserve(run_length); + for (size_t i = 0; i < run_length; ++i) { + char* buf_start = _data->data + _offset; + string_values.emplace_back(buf_start, _type_length); + _offset += _type_length; + } + doris_column->insert_many_strings(&string_values[0], run_length); + break; + } + case ColumnSelectVector::NULL_DATA: { + doris_column->insert_many_defaults(run_length); + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + _offset += _type_length * run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); +} +template +Status FixLengthPlainDecoder::_decode_numeric(MutableColumnPtr& doris_column, + ColumnSelectVector& select_vector) { + auto& column_data = static_cast&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + char* buf_start = _data->data + _offset; + column_data[data_index++] = *(Numeric*)buf_start; + _offset += _type_length; + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + _offset += _type_length * run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); +} + +template +Status FixLengthPlainDecoder::_decode_date(MutableColumnPtr& doris_column, + ColumnSelectVector& select_vector) { + auto& column_data = static_cast&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + char* buf_start = _data->data + _offset; + int64_t date_value = static_cast(*reinterpret_cast(buf_start)); + auto& v = reinterpret_cast(column_data[data_index++]); + v.from_unixtime(date_value * 24 * 60 * 60, *_decode_params->ctz); // day to seconds + if constexpr (std::is_same_v) { + // we should cast to date if using date v1. + v.cast_to_date(); + } + _offset += _type_length; + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + _offset += _type_length * run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); +} + +template +Status FixLengthPlainDecoder::_decode_datetime64(MutableColumnPtr& doris_column, + ColumnSelectVector& select_vector) { + auto& column_data = static_cast&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + char* buf_start = _data->data + _offset; + int64_t& date_value = *reinterpret_cast(buf_start); + auto& v = reinterpret_cast(column_data[data_index++]); + v.from_unixtime(date_value / _decode_params->second_mask, *_decode_params->ctz); + if constexpr (std::is_same_v>) { + // nanoseconds will be ignored. + v.set_microsecond((date_value % _decode_params->second_mask) * + _decode_params->scale_to_nano_factor / 1000); + // TODO: the precision of datetime v1 + } + _offset += _type_length; + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + _offset += _type_length * run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); +} + +template +Status FixLengthPlainDecoder::_decode_datetime96(MutableColumnPtr& doris_column, + ColumnSelectVector& select_vector) { + auto& column_data = static_cast&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + char* buf_start = _data->data + _offset; + ParquetInt96& datetime96 = *reinterpret_cast(buf_start); + auto& v = reinterpret_cast(column_data[data_index++]); + int64_t micros = datetime96.to_timestamp_micros(); + v.from_unixtime(micros / 1000000, *_decode_params->ctz); + if constexpr (std::is_same_v>) { + // spark.sql.parquet.outputTimestampType = INT96(NANOS) will lost precision. + // only keep microseconds. + v.set_microsecond(micros % 1000000); + } + _offset += _type_length; + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + _offset += _type_length * run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); +} + +template +Status FixLengthPlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, + DataTypePtr& data_type, + ColumnSelectVector& select_vector) { + init_decimal_converter(data_type); + auto& column_data = + static_cast>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + DecimalScaleParams& scale_params = _decode_params->decimal_scale; + + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + char* buf_start = _data->data + _offset; + // When Decimal in parquet is stored in byte arrays, binary and fixed, + // the unscaled number must be encoded as two's complement using big-endian byte order. + Int128 value = buf_start[0] & 0x80 ? -1 : 0; + memcpy(reinterpret_cast(&value) + sizeof(Int128) - _type_length, buf_start, + _type_length); + value = BigEndian::ToHost128(value); + if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } + auto& v = reinterpret_cast(column_data[data_index++]); + v = (DecimalPrimitiveType)value; + _offset += _type_length; + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + _offset += _type_length * run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); +} + +template +Status FixLengthPlainDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_column, + DataTypePtr& data_type, + ColumnSelectVector& select_vector) { + init_decimal_converter(data_type); + auto& column_data = + static_cast>&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + DecimalScaleParams& scale_params = _decode_params->decimal_scale; + + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + for (size_t i = 0; i < run_length; ++i) { + char* buf_start = _data->data + _offset; + // we should use decimal128 to scale up/down + Int128 value = *reinterpret_cast(buf_start); + if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { + value *= scale_params.scale_factor; + } else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { + value /= scale_params.scale_factor; + } + auto& v = reinterpret_cast(column_data[data_index++]); + v = (DecimalPrimitiveType)value; + _offset += _type_length; + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + _offset += _type_length * run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + return Status::OK(); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h new file mode 100644 index 0000000000..b8f516444e --- /dev/null +++ b/be/src/vec/exec/format/parquet/fix_length_plain_decoder.h @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "gen_cpp/parquet_types.h" +#include "util/slice.h" +#include "vec/columns/column.h" +#include "vec/data_types/data_type.h" +#include "vec/exec/format/parquet/decoder.h" +#include "vec/exec/format/parquet/parquet_common.h" + +namespace doris::vectorized { + +class FixLengthPlainDecoder final : public Decoder { +public: + FixLengthPlainDecoder(tparquet::Type::type physical_type) : _physical_type(physical_type) {}; + ~FixLengthPlainDecoder() override = default; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override; + + Status skip_values(size_t num_values) override; + +protected: + template + Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector); + + template + Status _decode_date(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector); + + template + Status _decode_datetime64(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector); + + template + Status _decode_datetime96(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector); + + template + Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector); + + template + Status _decode_primitive_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector); + + Status _decode_string(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector); + + tparquet::Type::type _physical_type; +}; +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp b/be/src/vec/exec/format/parquet/parquet_common.cpp index 1449d1bdf3..9250c15438 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.cpp +++ b/be/src/vec/exec/format/parquet/parquet_common.cpp @@ -23,24 +23,10 @@ namespace doris::vectorized { -const cctz::time_zone DecodeParams::utc0 = cctz::utc_time_zone(); - const uint32_t ParquetInt96::JULIAN_EPOCH_OFFSET_DAYS = 2440588; const uint64_t ParquetInt96::MICROS_IN_DAY = 86400000000; const uint64_t ParquetInt96::NANOS_PER_MICROSECOND = 1000; -#define FOR_LOGICAL_NUMERIC_TYPES(M) \ - M(TypeIndex::Int8, Int8) \ - M(TypeIndex::UInt8, UInt8) \ - M(TypeIndex::Int16, Int16) \ - M(TypeIndex::UInt16, UInt16) \ - M(TypeIndex::Int32, Int32) \ - M(TypeIndex::UInt32, UInt32) \ - M(TypeIndex::Int64, Int64) \ - M(TypeIndex::UInt64, UInt64) \ - M(TypeIndex::Float32, Float32) \ - M(TypeIndex::Float64, Float64) - ColumnSelectVector::ColumnSelectVector(const uint8_t* filter_map, size_t filter_map_size, bool filter_all) { build(filter_map, filter_map_size, filter_all); @@ -200,511 +186,4 @@ size_t ColumnSelectVector::get_next_run(DataReadType* data_read_type) { return run_length; } } - -Status Decoder::get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding, - std::unique_ptr& decoder) { - switch (encoding) { - case tparquet::Encoding::PLAIN: - case tparquet::Encoding::RLE_DICTIONARY: - switch (type) { - case tparquet::Type::BOOLEAN: - if (encoding != tparquet::Encoding::PLAIN) { - return Status::InternalError("Bool type can't has dictionary page"); - } - decoder.reset(new BoolPlainDecoder()); - break; - case tparquet::Type::BYTE_ARRAY: - decoder.reset(new ByteArrayDecoder()); - break; - case tparquet::Type::INT32: - case tparquet::Type::INT64: - case tparquet::Type::INT96: - case tparquet::Type::FLOAT: - case tparquet::Type::DOUBLE: - case tparquet::Type::FIXED_LEN_BYTE_ARRAY: - decoder.reset(new FixLengthDecoder(type)); - break; - default: - return Status::InternalError("Unsupported type {}(encoding={}) in parquet decoder", - tparquet::to_string(type), tparquet::to_string(encoding)); - } - break; - default: - return Status::InternalError("Unsupported encoding {}(type={}) in parquet decoder", - tparquet::to_string(encoding), tparquet::to_string(type)); - } - return Status::OK(); -} - -void Decoder::init(FieldSchema* field_schema, cctz::time_zone* ctz) { - _field_schema = field_schema; - if (_decode_params == nullptr) { - _decode_params.reset(new DecodeParams()); - } - if (ctz != nullptr) { - _decode_params->ctz = ctz; - } - const auto& schema = field_schema->parquet_schema; - if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) { - const auto& timestamp_info = schema.logicalType.TIMESTAMP; - if (!timestamp_info.isAdjustedToUTC) { - // should set timezone to utc+0 - _decode_params->ctz = const_cast(&_decode_params->utc0); - } - const auto& time_unit = timestamp_info.unit; - if (time_unit.__isset.MILLIS) { - _decode_params->second_mask = 1000; - _decode_params->scale_to_nano_factor = 1000000; - } else if (time_unit.__isset.MICROS) { - _decode_params->second_mask = 1000000; - _decode_params->scale_to_nano_factor = 1000; - } else if (time_unit.__isset.NANOS) { - _decode_params->second_mask = 1000000000; - _decode_params->scale_to_nano_factor = 1; - } - } else if (schema.__isset.converted_type) { - const auto& converted_type = schema.converted_type; - if (converted_type == tparquet::ConvertedType::TIMESTAMP_MILLIS) { - _decode_params->second_mask = 1000; - _decode_params->scale_to_nano_factor = 1000000; - } else if (converted_type == tparquet::ConvertedType::TIMESTAMP_MICROS) { - _decode_params->second_mask = 1000000; - _decode_params->scale_to_nano_factor = 1000; - } - } -} - -Status Decoder::_decode_dict_values(MutableColumnPtr& doris_column, - ColumnSelectVector& select_vector) { - DCHECK(doris_column->is_column_dictionary()); - size_t dict_index = 0; - ColumnSelectVector::DataReadType read_type; - auto& column_data = assert_cast(*doris_column).get_data(); - while (size_t run_length = select_vector.get_next_run(&read_type)) { - switch (read_type) { - case ColumnSelectVector::CONTENT: { - uint32_t* start_index = &_indexes[0]; - column_data.insert(start_index + dict_index, start_index + dict_index + run_length); - dict_index += run_length; - break; - } - case ColumnSelectVector::NULL_DATA: { - doris_column->insert_many_defaults(run_length); - break; - } - case ColumnSelectVector::FILTERED_CONTENT: { - dict_index += run_length; - break; - } - case ColumnSelectVector::FILTERED_NULL: { - break; - } - } - } - return Status::OK(); -} - -Status FixLengthDecoder::set_dict(std::unique_ptr& dict, int32_t length, - size_t num_values) { - if (num_values * _type_length != length) { - return Status::Corruption("Wrong dictionary data for fixed length type"); - } - _has_dict = true; - _dict = std::move(dict); - char* dict_item_address = reinterpret_cast(_dict.get()); - _dict_items.resize(num_values); - for (size_t i = 0; i < num_values; ++i) { - _dict_items[i] = dict_item_address; - dict_item_address += _type_length; - } - return Status::OK(); -} - -void FixLengthDecoder::set_data(Slice* data) { - _data = data; - _offset = 0; - if (_has_dict) { - uint8_t bit_width = *data->data; - _index_batch_decoder.reset( - new RleBatchDecoder(reinterpret_cast(data->data) + 1, - static_cast(data->size) - 1, bit_width)); - } -} - -Status FixLengthDecoder::skip_values(size_t num_values) { - if (_has_dict) { - _indexes.resize(num_values); - _index_batch_decoder->GetBatch(&_indexes[0], num_values); - } else { - _offset += _type_length * num_values; - if (UNLIKELY(_offset > _data->size)) { - return Status::IOError("Out-of-bounds access in parquet data decoder"); - } - } - return Status::OK(); -} - -Status FixLengthDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) { - size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); - if (_has_dict) { - if (doris_column->is_column_dictionary() && - assert_cast(*doris_column).dict_size() == 0) { - std::vector dict_items; - dict_items.reserve(_dict_items.size()); - for (int i = 0; i < _dict_items.size(); ++i) { - dict_items.emplace_back(_dict_items[i], _type_length); - } - assert_cast(*doris_column) - .insert_many_dict_data(&dict_items[0], dict_items.size()); - } - _indexes.resize(non_null_size); - _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); - } else if (UNLIKELY(_offset + _type_length * non_null_size > _data->size)) { - return Status::IOError("Out-of-bounds access in parquet data decoder"); - } - - if (doris_column->is_column_dictionary()) { - return _decode_dict_values(doris_column, select_vector); - } - - TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); - switch (logical_type) { -#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE) \ - case NUMERIC_TYPE: \ - return _decode_numeric(doris_column, select_vector); - FOR_LOGICAL_NUMERIC_TYPES(DISPATCH) -#undef DISPATCH - case TypeIndex::Date: - if (_physical_type == tparquet::Type::INT32) { - return _decode_date(doris_column, select_vector); - } - break; - case TypeIndex::DateV2: - if (_physical_type == tparquet::Type::INT32) { - return _decode_date, UInt32>(doris_column, select_vector); - } - break; - case TypeIndex::DateTime: - if (_physical_type == tparquet::Type::INT96) { - return _decode_datetime96(doris_column, select_vector); - } else if (_physical_type == tparquet::Type::INT64) { - return _decode_datetime64(doris_column, select_vector); - } - break; - case TypeIndex::DateTimeV2: - // Spark can set the timestamp precision by the following configuration: - // spark.sql.parquet.outputTimestampType = INT96(NANOS), TIMESTAMP_MICROS, TIMESTAMP_MILLIS - if (_physical_type == tparquet::Type::INT96) { - return _decode_datetime96, UInt64>(doris_column, - select_vector); - } else if (_physical_type == tparquet::Type::INT64) { - return _decode_datetime64, UInt64>(doris_column, - select_vector); - } - break; - case TypeIndex::Decimal32: - if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { - return _decode_binary_decimal(doris_column, data_type, select_vector); - } else if (_physical_type == tparquet::Type::INT32) { - return _decode_primitive_decimal(doris_column, data_type, select_vector); - } else if (_physical_type == tparquet::Type::INT64) { - return _decode_primitive_decimal(doris_column, data_type, select_vector); - } - break; - case TypeIndex::Decimal64: - if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { - return _decode_binary_decimal(doris_column, data_type, select_vector); - } else if (_physical_type == tparquet::Type::INT32) { - return _decode_primitive_decimal(doris_column, data_type, select_vector); - } else if (_physical_type == tparquet::Type::INT64) { - return _decode_primitive_decimal(doris_column, data_type, select_vector); - } - break; - case TypeIndex::Decimal128: - if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { - return _decode_binary_decimal(doris_column, data_type, select_vector); - } else if (_physical_type == tparquet::Type::INT32) { - return _decode_primitive_decimal(doris_column, data_type, select_vector); - } else if (_physical_type == tparquet::Type::INT64) { - return _decode_primitive_decimal(doris_column, data_type, select_vector); - } - break; - case TypeIndex::Decimal128I: - if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { - return _decode_binary_decimal(doris_column, data_type, select_vector); - } else if (_physical_type == tparquet::Type::INT32) { - return _decode_primitive_decimal(doris_column, data_type, select_vector); - } else if (_physical_type == tparquet::Type::INT64) { - return _decode_primitive_decimal(doris_column, data_type, select_vector); - } - break; - case TypeIndex::String: - case TypeIndex::FixedString: - if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) { - return _decode_string(doris_column, select_vector); - } - break; - default: - break; - } - - return Status::InvalidArgument("Can't decode parquet physical type {} to doris logical type {}", - tparquet::to_string(_physical_type), getTypeName(logical_type)); -} - -Status FixLengthDecoder::_decode_string(MutableColumnPtr& doris_column, - ColumnSelectVector& select_vector) { - size_t dict_index = 0; - ColumnSelectVector::DataReadType read_type; - while (size_t run_length = select_vector.get_next_run(&read_type)) { - switch (read_type) { - case ColumnSelectVector::CONTENT: { - std::vector string_values; - string_values.reserve(run_length); - for (size_t i = 0; i < run_length; ++i) { - char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++); - string_values.emplace_back(buf_start, _type_length); - _FIXED_SHIFT_DATA_OFFSET(); - } - doris_column->insert_many_strings(&string_values[0], run_length); - break; - } - case ColumnSelectVector::NULL_DATA: { - doris_column->insert_many_defaults(run_length); - break; - } - case ColumnSelectVector::FILTERED_CONTENT: { - if (_has_dict) { - dict_index += run_length; - } else { - _offset += _type_length * run_length; - } - break; - } - case ColumnSelectVector::FILTERED_NULL: { - // do nothing - break; - } - } - } - return Status::OK(); -} - -Status ByteArrayDecoder::set_dict(std::unique_ptr& dict, int32_t length, - size_t num_values) { - _has_dict = true; - _dict = std::move(dict); - _dict_items.reserve(num_values); - uint32_t offset_cursor = 0; - char* dict_item_address = reinterpret_cast(_dict.get()); - for (int i = 0; i < num_values; ++i) { - uint32_t l = decode_fixed32_le(_dict.get() + offset_cursor); - offset_cursor += 4; - _dict_items.emplace_back(dict_item_address + offset_cursor, l); - offset_cursor += l; - if (offset_cursor > length) { - return Status::Corruption("Wrong data length in dictionary"); - } - } - if (offset_cursor != length) { - return Status::Corruption("Wrong dictionary data for byte array type"); - } - return Status::OK(); -} - -void ByteArrayDecoder::set_data(Slice* data) { - _data = data; - _offset = 0; - if (_has_dict) { - uint8_t bit_width = *data->data; - _index_batch_decoder.reset( - new RleBatchDecoder(reinterpret_cast(data->data) + 1, - static_cast(data->size) - 1, bit_width)); - } -} - -Status ByteArrayDecoder::skip_values(size_t num_values) { - if (_has_dict) { - _indexes.resize(num_values); - _index_batch_decoder->GetBatch(&_indexes[0], num_values); - } else { - for (int i = 0; i < num_values; ++i) { - if (UNLIKELY(_offset + 4 > _data->size)) { - return Status::IOError("Can't read byte array length from plain decoder"); - } - uint32_t length = - decode_fixed32_le(reinterpret_cast(_data->data) + _offset); - _offset += 4; - if (UNLIKELY(_offset + length) > _data->size) { - return Status::IOError("Can't skip enough bytes in plain decoder"); - } - _offset += length; - } - } - return Status::OK(); -} - -Status ByteArrayDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) { - size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); - if (_has_dict) { - if (doris_column->is_column_dictionary() && - assert_cast(*doris_column).dict_size() == 0) { - assert_cast(*doris_column) - .insert_many_dict_data(&_dict_items[0], _dict_items.size()); - } - _indexes.resize(non_null_size); - _index_batch_decoder->GetBatch(&_indexes[0], non_null_size); - } - - if (doris_column->is_column_dictionary()) { - return _decode_dict_values(doris_column, select_vector); - } - - TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); - switch (logical_type) { - case TypeIndex::String: - case TypeIndex::FixedString: { - size_t dict_index = 0; - - ColumnSelectVector::DataReadType read_type; - while (size_t run_length = select_vector.get_next_run(&read_type)) { - switch (read_type) { - case ColumnSelectVector::CONTENT: { - std::vector string_values; - string_values.reserve(run_length); - for (size_t i = 0; i < run_length; ++i) { - if (_has_dict) { - string_values.emplace_back(_dict_items[_indexes[dict_index++]]); - } else { - if (UNLIKELY(_offset + 4 > _data->size)) { - return Status::IOError( - "Can't read byte array length from plain decoder"); - } - uint32_t length = decode_fixed32_le( - reinterpret_cast(_data->data) + _offset); - _offset += 4; - if (UNLIKELY(_offset + length) > _data->size) { - return Status::IOError("Can't read enough bytes in plain decoder"); - } - string_values.emplace_back(_data->data + _offset, length); - _offset += length; - } - } - doris_column->insert_many_strings(&string_values[0], run_length); - break; - } - case ColumnSelectVector::NULL_DATA: { - doris_column->insert_many_defaults(run_length); - break; - } - case ColumnSelectVector::FILTERED_CONTENT: { - if (_has_dict) { - dict_index += run_length; - } else { - for (int i = 0; i < run_length; ++i) { - if (UNLIKELY(_offset + 4 > _data->size)) { - return Status::IOError( - "Can't read byte array length from plain decoder"); - } - uint32_t length = decode_fixed32_le( - reinterpret_cast(_data->data) + _offset); - _offset += 4; - if (UNLIKELY(_offset + length) > _data->size) { - return Status::IOError("Can't read enough bytes in plain decoder"); - } - _offset += length; - } - } - break; - } - case ColumnSelectVector::FILTERED_NULL: { - // do nothing - break; - } - } - } - return Status::OK(); - } - case TypeIndex::Decimal32: - return _decode_binary_decimal(doris_column, data_type, select_vector); - case TypeIndex::Decimal64: - return _decode_binary_decimal(doris_column, data_type, select_vector); - case TypeIndex::Decimal128: - return _decode_binary_decimal(doris_column, data_type, select_vector); - case TypeIndex::Decimal128I: - return _decode_binary_decimal(doris_column, data_type, select_vector); - default: - break; - } - return Status::InvalidArgument( - "Can't decode parquet physical type BYTE_ARRAY to doris logical type {}", - getTypeName(logical_type)); -} - -Status BoolPlainDecoder::skip_values(size_t num_values) { - int skip_cached = std::min(num_unpacked_values_ - unpacked_value_idx_, (int)num_values); - unpacked_value_idx_ += skip_cached; - if (skip_cached == num_values) { - return Status::OK(); - } - int num_remaining = num_values - skip_cached; - int num_to_skip = BitUtil::RoundDownToPowerOf2(num_remaining, 32); - if (num_to_skip > 0) { - bool_values_.SkipBatch(1, num_to_skip); - } - num_remaining -= num_to_skip; - if (num_remaining > 0) { - DCHECK_LE(num_remaining, UNPACKED_BUFFER_LEN); - num_unpacked_values_ = - bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]); - if (UNLIKELY(num_unpacked_values_ < num_remaining)) { - return Status::IOError("Can't skip enough booleans in plain decoder"); - } - unpacked_value_idx_ = num_remaining; - } - return Status::OK(); -} - -Status BoolPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) { - auto& column_data = static_cast&>(*doris_column).get_data(); - size_t data_index = column_data.size(); - column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); - - ColumnSelectVector::DataReadType read_type; - while (size_t run_length = select_vector.get_next_run(&read_type)) { - switch (read_type) { - case ColumnSelectVector::CONTENT: { - bool value; - for (size_t i = 0; i < run_length; ++i) { - if (UNLIKELY(!_decode_value(&value))) { - return Status::IOError("Can't read enough booleans in plain decoder"); - } - column_data[data_index++] = (UInt8)value; - } - break; - } - case ColumnSelectVector::NULL_DATA: { - data_index += run_length; - break; - } - case ColumnSelectVector::FILTERED_CONTENT: { - bool value; - for (int i = 0; i < run_length; ++i) { - if (UNLIKELY(!_decode_value(&value))) { - return Status::IOError("Can't read enough booleans in plain decoder"); - } - } - break; - } - case ColumnSelectVector::FILTERED_NULL: { - // do nothing - break; - } - } - } - return Status::OK(); -} } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h index 95727ed139..eb2e117bd6 100644 --- a/be/src/vec/exec/format/parquet/parquet_common.h +++ b/be/src/vec/exec/format/parquet/parquet_common.h @@ -28,6 +28,7 @@ #include "util/rle_encoding.h" #include "util/simd/bits.h" #include "vec/columns/column_array.h" +#include "vec/columns/column_dictionary.h" #include "vec/columns/column_nullable.h" #include "vec/columns/column_string.h" #include "vec/common/int_exp.h" @@ -58,12 +59,13 @@ struct RowRange { struct ParquetReadColumn { ParquetReadColumn(int parquet_col_id, const std::string& file_slot_name) - : _parquet_col_id(parquet_col_id), _file_slot_name(file_slot_name) {} + : _parquet_col_id(parquet_col_id), _file_slot_name(file_slot_name) {}; int _parquet_col_id; const std::string& _file_slot_name; }; +#pragma pack(1) struct ParquetInt96 { uint64_t lo; // time of nanoseconds in a day uint32_t hi; // days from julian epoch @@ -76,16 +78,8 @@ struct ParquetInt96 { static const uint64_t MICROS_IN_DAY; static const uint64_t NANOS_PER_MICROSECOND; }; - -struct DecodeParams { - // schema.logicalType.TIMESTAMP.isAdjustedToUTC == false - static const cctz::time_zone utc0; - // schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set the time zone - cctz::time_zone* ctz = nullptr; - int64_t second_mask = 1; - int64_t scale_to_nano_factor = 1; - DecimalScaleParams decimal_scale; -}; +#pragma pack() +static_assert(sizeof(ParquetInt96) == 12, "The size of ParquetInt96 is not 12."); class ColumnSelectVector { public: @@ -146,561 +140,4 @@ private: size_t _num_filtered; size_t _read_index; }; - -class Decoder { -public: - Decoder() = default; - virtual ~Decoder() = default; - - static Status get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding, - std::unique_ptr& decoder); - - // The type with fix length - void set_type_length(int32_t type_length) { _type_length = type_length; } - - // Set the data to be decoded - virtual void set_data(Slice* data) { - _data = data; - _offset = 0; - } - - void init(FieldSchema* field_schema, cctz::time_zone* ctz); - - template - void init_decimal_converter(DataTypePtr& data_type); - - // Write the decoded values batch to doris's column - virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) = 0; - - virtual Status skip_values(size_t num_values) = 0; - - virtual Status set_dict(std::unique_ptr& dict, int32_t length, size_t num_values) { - return Status::NotSupported("set_dict is not supported"); - } - -protected: - /** - * Decode dictionary-coded values into doris_column, ensure that doris_column is ColumnDictI32 type, - * and the coded values must be read into _indexes previously. - */ - Status _decode_dict_values(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector); - - int32_t _type_length; - Slice* _data = nullptr; - uint32_t _offset = 0; - FieldSchema* _field_schema = nullptr; - std::unique_ptr _decode_params = nullptr; - - // For dictionary encoding - bool _has_dict = false; - std::unique_ptr _dict = nullptr; - std::unique_ptr> _index_batch_decoder = nullptr; - std::vector _indexes; -}; - -template -void Decoder::init_decimal_converter(DataTypePtr& data_type) { - if (_decode_params == nullptr || _field_schema == nullptr || - _decode_params->decimal_scale.scale_type != DecimalScaleParams::NOT_INIT) { - return; - } - auto scale = _field_schema->parquet_schema.scale; - auto* decimal_type = reinterpret_cast>*>( - const_cast(remove_nullable(data_type).get())); - auto dest_scale = decimal_type->get_scale(); - if (dest_scale > scale) { - _decode_params->decimal_scale.scale_type = DecimalScaleParams::SCALE_UP; - _decode_params->decimal_scale.scale_factor = - DecimalScaleParams::get_scale_factor(dest_scale - scale); - } else if (dest_scale < scale) { - _decode_params->decimal_scale.scale_type = DecimalScaleParams::SCALE_DOWN; - _decode_params->decimal_scale.scale_factor = - DecimalScaleParams::get_scale_factor(scale - dest_scale); - } else { - _decode_params->decimal_scale.scale_type = DecimalScaleParams::NO_SCALE; - _decode_params->decimal_scale.scale_factor = 1; - } -} - -class FixLengthDecoder final : public Decoder { -public: - FixLengthDecoder(tparquet::Type::type physical_type) : _physical_type(physical_type) {} - ~FixLengthDecoder() override = default; - - Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) override; - - Status skip_values(size_t num_values) override; - - Status set_dict(std::unique_ptr& dict, int32_t length, size_t num_values) override; - - void set_data(Slice* data) override; - -protected: - template - Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector); - - template - Status _decode_date(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector); - - template - Status _decode_datetime64(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector); - - template - Status _decode_datetime96(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector); - - template - Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector); - - template - Status _decode_primitive_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector); - - Status _decode_string(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector); - -#define _FIXED_GET_DATA_OFFSET(index) \ - _has_dict ? _dict_items[_indexes[index]] : _data->data + _offset - -#define _FIXED_SHIFT_DATA_OFFSET() \ - if (!_has_dict) _offset += _type_length - - tparquet::Type::type _physical_type; - - // For dictionary encoding - std::vector _dict_items; -}; - -template -Status FixLengthDecoder::_decode_numeric(MutableColumnPtr& doris_column, - ColumnSelectVector& select_vector) { - auto& column_data = static_cast&>(*doris_column).get_data(); - size_t data_index = column_data.size(); - column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); - size_t dict_index = 0; - ColumnSelectVector::DataReadType read_type; - while (size_t run_length = select_vector.get_next_run(&read_type)) { - switch (read_type) { - case ColumnSelectVector::CONTENT: { - for (size_t i = 0; i < run_length; ++i) { - char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++); - column_data[data_index++] = *(Numeric*)buf_start; - _FIXED_SHIFT_DATA_OFFSET(); - } - break; - } - case ColumnSelectVector::NULL_DATA: { - data_index += run_length; - break; - } - case ColumnSelectVector::FILTERED_CONTENT: { - if (_has_dict) { - dict_index += run_length; - } else { - _offset += _type_length * run_length; - } - break; - } - case ColumnSelectVector::FILTERED_NULL: { - // do nothing - break; - } - } - } - return Status::OK(); -} - -template -Status FixLengthDecoder::_decode_date(MutableColumnPtr& doris_column, - ColumnSelectVector& select_vector) { - auto& column_data = static_cast&>(*doris_column).get_data(); - size_t data_index = column_data.size(); - column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); - size_t dict_index = 0; - ColumnSelectVector::DataReadType read_type; - while (size_t run_length = select_vector.get_next_run(&read_type)) { - switch (read_type) { - case ColumnSelectVector::CONTENT: { - for (size_t i = 0; i < run_length; ++i) { - char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++); - int64_t date_value = static_cast(*reinterpret_cast(buf_start)); - auto& v = reinterpret_cast(column_data[data_index++]); - v.from_unixtime(date_value * 24 * 60 * 60, *_decode_params->ctz); // day to seconds - if constexpr (std::is_same_v) { - // we should cast to date if using date v1. - v.cast_to_date(); - } - _FIXED_SHIFT_DATA_OFFSET(); - } - break; - } - case ColumnSelectVector::NULL_DATA: { - data_index += run_length; - break; - } - case ColumnSelectVector::FILTERED_CONTENT: { - if (_has_dict) { - dict_index += run_length; - } else { - _offset += _type_length * run_length; - } - break; - } - case ColumnSelectVector::FILTERED_NULL: { - // do nothing - break; - } - } - } - return Status::OK(); -} - -template -Status FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column, - ColumnSelectVector& select_vector) { - auto& column_data = static_cast&>(*doris_column).get_data(); - size_t data_index = column_data.size(); - column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); - size_t dict_index = 0; - ColumnSelectVector::DataReadType read_type; - while (size_t run_length = select_vector.get_next_run(&read_type)) { - switch (read_type) { - case ColumnSelectVector::CONTENT: { - for (size_t i = 0; i < run_length; ++i) { - char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++); - int64_t& date_value = *reinterpret_cast(buf_start); - auto& v = reinterpret_cast(column_data[data_index++]); - v.from_unixtime(date_value / _decode_params->second_mask, *_decode_params->ctz); - if constexpr (std::is_same_v>) { - // nanoseconds will be ignored. - v.set_microsecond((date_value % _decode_params->second_mask) * - _decode_params->scale_to_nano_factor / 1000); - // TODO: the precision of datetime v1 - } - _FIXED_SHIFT_DATA_OFFSET(); - } - break; - } - case ColumnSelectVector::NULL_DATA: { - data_index += run_length; - break; - } - case ColumnSelectVector::FILTERED_CONTENT: { - if (_has_dict) { - dict_index += run_length; - } else { - _offset += _type_length * run_length; - } - break; - } - case ColumnSelectVector::FILTERED_NULL: { - // do nothing - break; - } - } - } - return Status::OK(); -} - -template -Status FixLengthDecoder::_decode_datetime96(MutableColumnPtr& doris_column, - ColumnSelectVector& select_vector) { - auto& column_data = static_cast&>(*doris_column).get_data(); - size_t data_index = column_data.size(); - column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); - size_t dict_index = 0; - ColumnSelectVector::DataReadType read_type; - while (size_t run_length = select_vector.get_next_run(&read_type)) { - switch (read_type) { - case ColumnSelectVector::CONTENT: { - for (size_t i = 0; i < run_length; ++i) { - char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++); - ParquetInt96& datetime96 = *reinterpret_cast(buf_start); - auto& v = reinterpret_cast(column_data[data_index++]); - int64_t micros = datetime96.to_timestamp_micros(); - v.from_unixtime(micros / 1000000, *_decode_params->ctz); - if constexpr (std::is_same_v>) { - // spark.sql.parquet.outputTimestampType = INT96(NANOS) will lost precision. - // only keep microseconds. - v.set_microsecond(micros % 1000000); - } - _FIXED_SHIFT_DATA_OFFSET(); - } - break; - } - case ColumnSelectVector::NULL_DATA: { - data_index += run_length; - break; - } - case ColumnSelectVector::FILTERED_CONTENT: { - if (_has_dict) { - dict_index += run_length; - } else { - _offset += _type_length * run_length; - } - break; - } - case ColumnSelectVector::FILTERED_NULL: { - // do nothing - break; - } - } - } - return Status::OK(); -} - -template -Status FixLengthDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, - DataTypePtr& data_type, - ColumnSelectVector& select_vector) { - init_decimal_converter(data_type); - auto& column_data = - static_cast>&>(*doris_column).get_data(); - size_t data_index = column_data.size(); - column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); - size_t dict_index = 0; - DecimalScaleParams& scale_params = _decode_params->decimal_scale; - - ColumnSelectVector::DataReadType read_type; - while (size_t run_length = select_vector.get_next_run(&read_type)) { - switch (read_type) { - case ColumnSelectVector::CONTENT: { - for (size_t i = 0; i < run_length; ++i) { - char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++); - // When Decimal in parquet is stored in byte arrays, binary and fixed, - // the unscaled number must be encoded as two's complement using big-endian byte order. - Int128 value = buf_start[0] & 0x80 ? -1 : 0; - memcpy(reinterpret_cast(&value) + sizeof(Int128) - _type_length, buf_start, - _type_length); - value = BigEndian::ToHost128(value); - if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { - value *= scale_params.scale_factor; - } else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { - value /= scale_params.scale_factor; - } - auto& v = reinterpret_cast(column_data[data_index++]); - v = (DecimalPrimitiveType)value; - _FIXED_SHIFT_DATA_OFFSET(); - } - break; - } - case ColumnSelectVector::NULL_DATA: { - data_index += run_length; - break; - } - case ColumnSelectVector::FILTERED_CONTENT: { - if (_has_dict) { - dict_index += run_length; - } else { - _offset += _type_length * run_length; - } - break; - } - case ColumnSelectVector::FILTERED_NULL: { - // do nothing - break; - } - } - } - return Status::OK(); -} - -template -Status FixLengthDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_column, - DataTypePtr& data_type, - ColumnSelectVector& select_vector) { - init_decimal_converter(data_type); - auto& column_data = - static_cast>&>(*doris_column).get_data(); - size_t data_index = column_data.size(); - column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); - size_t dict_index = 0; - DecimalScaleParams& scale_params = _decode_params->decimal_scale; - - ColumnSelectVector::DataReadType read_type; - while (size_t run_length = select_vector.get_next_run(&read_type)) { - switch (read_type) { - case ColumnSelectVector::CONTENT: { - for (size_t i = 0; i < run_length; ++i) { - char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++); - // we should use decimal128 to scale up/down - Int128 value = *reinterpret_cast(buf_start); - if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { - value *= scale_params.scale_factor; - } else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { - value /= scale_params.scale_factor; - } - auto& v = reinterpret_cast(column_data[data_index++]); - v = (DecimalPrimitiveType)value; - _FIXED_SHIFT_DATA_OFFSET(); - } - break; - } - case ColumnSelectVector::NULL_DATA: { - data_index += run_length; - break; - } - case ColumnSelectVector::FILTERED_CONTENT: { - if (_has_dict) { - dict_index += run_length; - } else { - _offset += _type_length * run_length; - } - break; - } - case ColumnSelectVector::FILTERED_NULL: { - // do nothing - break; - } - } - } - return Status::OK(); -} - -class ByteArrayDecoder final : public Decoder { -public: - ByteArrayDecoder() = default; - ~ByteArrayDecoder() override = default; - - Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) override; - - Status skip_values(size_t num_values) override; - - void set_data(Slice* data) override; - - Status set_dict(std::unique_ptr& dict, int32_t length, size_t num_values) override; - -protected: - template - Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector); - - // For dictionary encoding - std::vector _dict_items; -}; - -template -Status ByteArrayDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, - DataTypePtr& data_type, - ColumnSelectVector& select_vector) { - init_decimal_converter(data_type); - auto& column_data = - static_cast>&>(*doris_column).get_data(); - size_t data_index = column_data.size(); - column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); - size_t dict_index = 0; - DecimalScaleParams& scale_params = _decode_params->decimal_scale; - ColumnSelectVector::DataReadType read_type; - while (size_t run_length = select_vector.get_next_run(&read_type)) { - switch (read_type) { - case ColumnSelectVector::CONTENT: { - for (size_t i = 0; i < run_length; ++i) { - char* buf_start; - uint32_t length; - if (_has_dict) { - StringRef& slice = _dict_items[_indexes[dict_index++]]; - buf_start = const_cast(slice.data); - length = (uint32_t)slice.size; - } else { - if (UNLIKELY(_offset + 4 > _data->size)) { - return Status::IOError("Can't read byte array length from plain decoder"); - } - length = decode_fixed32_le(reinterpret_cast(_data->data) + - _offset); - _offset += 4; - buf_start = _data->data + _offset; - _offset += length; - } - // When Decimal in parquet is stored in byte arrays, binary and fixed, - // the unscaled number must be encoded as two's complement using big-endian byte order. - Int128 value = buf_start[0] & 0x80 ? -1 : 0; - memcpy(reinterpret_cast(&value) + sizeof(Int128) - length, buf_start, - length); - value = BigEndian::ToHost128(value); - if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { - value *= scale_params.scale_factor; - } else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { - value /= scale_params.scale_factor; - } - auto& v = reinterpret_cast(column_data[data_index++]); - v = (DecimalPrimitiveType)value; - } - break; - } - case ColumnSelectVector::NULL_DATA: { - data_index += run_length; - break; - } - case ColumnSelectVector::FILTERED_CONTENT: { - if (_has_dict) { - dict_index += run_length; - } else { - _offset += _type_length * run_length; - } - break; - } - case ColumnSelectVector::FILTERED_NULL: { - // do nothing - break; - } - } - } - return Status::OK(); -} - -/// Decoder bit-packed boolean-encoded values. -/// Implementation from https://github.com/apache/impala/blob/master/be/src/exec/parquet/parquet-bool-decoder.h -class BoolPlainDecoder final : public Decoder { -public: - BoolPlainDecoder() = default; - ~BoolPlainDecoder() override = default; - - // Set the data to be decoded - void set_data(Slice* data) override { - bool_values_.Reset((const uint8_t*)data->data, data->size); - num_unpacked_values_ = 0; - unpacked_value_idx_ = 0; - _offset = 0; - } - - Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, - ColumnSelectVector& select_vector) override; - - Status skip_values(size_t num_values) override; - -protected: - inline bool _decode_value(bool* value) { - if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) { - *value = unpacked_values_[unpacked_value_idx_++]; - } else { - num_unpacked_values_ = - bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]); - if (UNLIKELY(num_unpacked_values_ == 0)) { - return false; - } - *value = unpacked_values_[0]; - unpacked_value_idx_ = 1; - } - return true; - } - - /// A buffer to store unpacked values. Must be a multiple of 32 size to use the - /// batch-oriented interface of BatchedBitReader. We use uint8_t instead of bool because - /// bit unpacking is only supported for unsigned integers. The values are converted to - /// bool when returned to the user. - static const int UNPACKED_BUFFER_LEN = 128; - uint8_t unpacked_values_[UNPACKED_BUFFER_LEN]; - - /// The number of valid values in 'unpacked_values_'. - int num_unpacked_values_ = 0; - - /// The next value to return from 'unpacked_values_'. - int unpacked_value_idx_ = 0; - - /// Bit packed decoder, used if 'encoding_' is PLAIN. - BatchedBitReader bool_values_; -}; - } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h index adc71dcedd..8dc3bd6b32 100644 --- a/be/src/vec/exec/format/parquet/parquet_pred_cmp.h +++ b/be/src/vec/exec/format/parquet/parquet_pred_cmp.h @@ -21,7 +21,10 @@ #include #include "exec/olap_common.h" +#include "gutil/endian.h" #include "parquet_common.h" +#include "vec/data_types/data_type_decimal.h" +#include "vec/exec/format/format_common.h" namespace doris::vectorized { diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index b73513751c..a852f97cda 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -23,10 +23,10 @@ #include #include "common/status.h" +#include "decoder.h" #include "gen_cpp/parquet_types.h" #include "io/buffered_reader.h" #include "level_decoder.h" -#include "parquet_common.h" #include "schema_desc.h" #include "util/block_compression.h" #include "vparquet_page_reader.h" diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp index da08d1299e..7456962848 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp @@ -21,7 +21,10 @@ #include #include "schema_desc.h" +#include "vec/columns/column_array.h" +#include "vec/columns/column_nullable.h" #include "vec/data_types/data_type_array.h" +#include "vec/data_types/data_type_nullable.h" #include "vparquet_column_chunk_reader.h" namespace doris::vectorized { @@ -96,6 +99,9 @@ Status ScalarColumnReader::init(io::FileReaderSPtr file, FieldSchema* field, } Status ScalarColumnReader::_skip_values(size_t num_values) { + if (num_values == 0) { + return Status::OK(); + } if (_chunk_reader->max_def_level() > 0) { LevelDecoder& def_decoder = _chunk_reader->def_level_decoder(); size_t skipped = 0; @@ -114,8 +120,12 @@ Status ScalarColumnReader::_skip_values(size_t num_values) { } skipped += loop_skip; } - RETURN_IF_ERROR(_chunk_reader->skip_values(null_size, false)); - RETURN_IF_ERROR(_chunk_reader->skip_values(nonnull_size, true)); + if (null_size > 0) { + RETURN_IF_ERROR(_chunk_reader->skip_values(null_size, false)); + } + if (nonnull_size > 0) { + RETURN_IF_ERROR(_chunk_reader->skip_values(nonnull_size, true)); + } } else { RETURN_IF_ERROR(_chunk_reader->skip_values(num_values)); } diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 54ee6164e4..2877e5ccc9 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -18,6 +18,7 @@ #include "vparquet_group_reader.h" #include "schema_desc.h" +#include "util/simd/bits.h" #include "vec/columns/column_const.h" #include "vparquet_column_reader.h" diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h b/be/src/vec/exec/format/parquet/vparquet_page_index.h index 00cbbdcfdd..708f79d3fb 100644 --- a/be/src/vec/exec/format/parquet/vparquet_page_index.h +++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h @@ -21,6 +21,7 @@ #include "exec/olap_common.h" #include "parquet_common.h" +#include "schema_desc.h" namespace doris::vectorized { diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 3a5d45bfa4..ca346f3b3a 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -544,6 +544,11 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group, _statistics.read_rows += row_group.num_rows; }; + if (_lazy_read_ctx.vconjunct_ctx == nullptr) { + read_whole_row_group(); + return Status::OK(); + } + if (_colname_to_value_range == nullptr || _colname_to_value_range->empty()) { read_whole_row_group(); return Status::OK(); diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index b161c06845..0f3343e3ca 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -21,6 +21,7 @@ #include "table_format_reader.h" #include "vec/columns/column_dictionary.h" +#include "vec/exec/format/format_common.h" #include "vec/exec/format/generic_reader.h" #include "vec/exec/format/parquet/parquet_common.h" #include "vec/exprs/vexpr.h"