diff --git a/be/src/util/bit_stream_utils.h b/be/src/util/bit_stream_utils.h index d8efe163bb..d5abbc4d94 100644 --- a/be/src/util/bit_stream_utils.h +++ b/be/src/util/bit_stream_utils.h @@ -111,7 +111,15 @@ public: // Reads a vlq encoded int from the stream. The encoded int must start at the // beginning of a byte. Return false if there were not enough bytes in the buffer. - bool GetVlqInt(int32_t* v); + bool GetVlqInt(uint32_t* v); + // Reads a zigzag encoded int `into` v. + bool GetZigZagVlqInt(int32_t* v); + + // Reads a vlq encoded int from the stream. The encoded int must start at the + // beginning of a byte. Return false if there were not enough bytes in the buffer. + bool GetVlqInt(uint64_t* v); + // Reads a zigzag encoded int `into` v. + bool GetZigZagVlqInt(int64_t* v); // Returns the number of bytes left in the stream, not including the current byte (i.e., // there may be an additional fraction of a byte). @@ -123,12 +131,18 @@ public: // Rewind the stream by 'num_bits' bits void Rewind(int num_bits); + // Advance the stream by 'num_bits' bits + bool Advance(int64_t num_bits); + // Seek to a specific bit in the buffer void SeekToBit(unsigned int stream_position); // Maximum byte length of a vlq encoded int static const int MAX_VLQ_BYTE_LEN = 5; + // Maximum byte length of a vlq encoded int64 + static const int MAX_VLQ_BYTE_LEN_FOR_INT64 = 10; + bool is_initialized() const { return buffer_ != nullptr; } private: diff --git a/be/src/util/bit_stream_utils.inline.h b/be/src/util/bit_stream_utils.inline.h index 88a059758c..fb62e9e3ae 100644 --- a/be/src/util/bit_stream_utils.inline.h +++ b/be/src/util/bit_stream_utils.inline.h @@ -26,6 +26,7 @@ #include "util/alignment.h" #include "util/bit_packing.inline.h" #include "util/bit_stream_utils.h" +#include "util/bit_util.h" using doris::BitUtil; @@ -150,6 +151,18 @@ inline void BitReader::Rewind(int num_bits) { memcpy(&buffered_values_, buffer_ + byte_offset_, 8); } +inline bool BitReader::Advance(int64_t num_bits) { + int64_t bits_required = bit_offset_ + num_bits; + int64_t bytes_required = (bits_required >> 3) + ((bits_required & 7) != 0); + if (bytes_required > max_bytes_ - byte_offset_) { + return false; + } + byte_offset_ += static_cast(bits_required >> 3); + bit_offset_ = static_cast(bits_required & 7); + BufferValues(); + return true; +} + inline void BitReader::SeekToBit(unsigned int stream_position) { DCHECK_LE(stream_position, max_bytes_ * 8); @@ -195,17 +208,52 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) { return true; } -inline bool BitReader::GetVlqInt(int32_t* v) { - *v = 0; - int shift = 0; - int num_bytes = 0; - uint8_t byte = 0; - do { +inline bool BitReader::GetVlqInt(uint32_t* v) { + uint32_t tmp = 0; + for (int num_bytes = 0; num_bytes < MAX_VLQ_BYTE_LEN; num_bytes++) { + uint8_t byte = 0; if (!GetAligned(1, &byte)) return false; - *v |= (byte & 0x7F) << shift; - shift += 7; - DCHECK_LE(++num_bytes, MAX_VLQ_BYTE_LEN); - } while ((byte & 0x80) != 0); + tmp |= static_cast(byte & 0x7F) << (7 * num_bytes); + if ((byte & 0x80) == 0) { + *v = tmp; + return true; + } + } + return false; +} + +inline bool BitReader::GetZigZagVlqInt(int32_t* v) { + uint32_t u; + if (!GetVlqInt(&u)) { + return false; + } + u = (u >> 1) ^ (~(u & 1) + 1); + // copy uint32_t to int32_t + std::memcpy(v, &u, sizeof(uint32_t)); + return true; +} + +inline bool BitReader::GetVlqInt(uint64_t* v) { + uint64_t tmp = 0; + for (int num_bytes = 0; num_bytes < MAX_VLQ_BYTE_LEN_FOR_INT64; num_bytes++) { + uint8_t byte = 0; + if (!GetAligned(1, &byte)) return false; + tmp |= static_cast(byte & 0x7F) << (7 * num_bytes); + if ((byte & 0x80) == 0) { + *v = tmp; + return true; + } + } + return false; +} + +inline bool BitReader::GetZigZagVlqInt(int64_t* v) { + uint64_t u; + if (!GetVlqInt(&u)) { + return false; + } + u = (u >> 1) ^ (~(u & 1) + 1); + std::memcpy(v, &u, sizeof(uint64_t)); return true; } @@ -227,12 +275,14 @@ inline int BatchedBitReader::UnpackBatch(int bit_width, int num_values, T* v) { inline bool BatchedBitReader::SkipBatch(int bit_width, int num_values_to_skip) { DCHECK(buffer_pos_ != nullptr); - DCHECK_GT(bit_width, 0); + DCHECK_GE(bit_width, 0); DCHECK_LE(bit_width, MAX_BITWIDTH); - DCHECK_GT(num_values_to_skip, 0); + DCHECK_GE(num_values_to_skip, 0); int skip_bytes = BitUtil::RoundUpNumBytes(bit_width * num_values_to_skip); - if (skip_bytes > buffer_end_ - buffer_pos_) return false; + if (skip_bytes > buffer_end_ - buffer_pos_) { + return false; + } buffer_pos_ += skip_bytes; return true; } diff --git a/be/src/util/rle_encoding.h b/be/src/util/rle_encoding.h index 1409473a09..1e83599265 100644 --- a/be/src/util/rle_encoding.h +++ b/be/src/util/rle_encoding.h @@ -229,7 +229,7 @@ inline bool RleDecoder::ReadHeader() { if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) { // Read the next run's indicator int, it could be a literal or repeated run // The int is encoded as a vlq-encoded value. - int32_t indicator_value = 0; + uint32_t indicator_value = 0; bool result = bit_reader_.GetVlqInt(&indicator_value); if (PREDICT_FALSE(!result)) { return false; diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index ab364f56ea..29bea35d47 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -313,6 +313,8 @@ set(VEC_FILES exec/format/table/iceberg_reader.cpp exec/format/file_reader/new_plain_text_line_reader.cpp exec/format/file_reader/new_plain_binary_line_reader.cpp + exec/format/parquet/delta_bit_pack_decoder.cpp + exec/format/parquet/bool_rle_decoder.cpp ) if (WITH_MYSQL) diff --git a/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp b/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp new file mode 100644 index 0000000000..563b6c68df --- /dev/null +++ b/be/src/vec/exec/format/parquet/bool_rle_decoder.cpp @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/format/parquet/bool_rle_decoder.h" + +#include "util/bit_util.h" + +namespace doris::vectorized { +void BoolRLEDecoder::set_data(Slice* slice) { + _data = slice; + _num_bytes = slice->size; + _offset = 0; + if (_num_bytes < 4) { + LOG(FATAL) << "Received invalid length : " + std::to_string(_num_bytes) + + " (corrupt data page?)"; + } + // Load the first 4 bytes in little-endian, which indicates the length + const uint8_t* data = reinterpret_cast(_data->data); + uint32_t num_bytes = decode_fixed32_le(data); + if (num_bytes > static_cast(_num_bytes - 4)) { + LOG(FATAL) << ("Received invalid number of bytes : " + std::to_string(num_bytes) + + " (corrupt data page?)"); + } + _num_bytes = num_bytes; + auto decoder_data = data + 4; + _decoder = RleDecoder(decoder_data, num_bytes, 1); +} + +Status BoolRLEDecoder::skip_values(size_t num_values) { + _current_value_idx += num_values; + return Status::OK(); +} + +Status BoolRLEDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) { + auto& column_data = static_cast&>(*doris_column).get_data(); + size_t data_index = column_data.size(); + column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered()); + size_t max_values = column_data.size(); + _values.resize(max_values); + if (!_decoder.get_values(_values.data(), max_values)) { + return Status::IOError("Can't read enough booleans in rle decoder"); + } + // _num_bytes -= max_values; + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + bool value; // Can't use uint8_t directly, we should correct it. + for (size_t i = 0; i < run_length; ++i) { + value = _values[_current_value_idx++]; + column_data[data_index++] = (UInt8)value; + } + break; + } + case ColumnSelectVector::NULL_DATA: { + data_index += run_length; + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + _current_value_idx += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + break; + } + } + } + return Status::OK(); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/bool_rle_decoder.h b/be/src/vec/exec/format/parquet/bool_rle_decoder.h new file mode 100644 index 0000000000..0b3ba6e05d --- /dev/null +++ b/be/src/vec/exec/format/parquet/bool_rle_decoder.h @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "util/rle_encoding.h" +#include "vec/exec/format/parquet/bool_plain_decoder.h" +#include "vec/exec/format/parquet/decoder.h" + +namespace doris::vectorized { +class BoolRLEDecoder final : public Decoder { +public: + BoolRLEDecoder() = default; + ~BoolRLEDecoder() override = default; + + void set_data(Slice* slice) override; + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override; + + Status skip_values(size_t num_values) override; + +private: + RleDecoder _decoder; + std::vector _values; + size_t _num_bytes; + size_t _current_value_idx = 0; +}; +} // namespace doris::vectorized \ No newline at end of file diff --git a/be/src/vec/exec/format/parquet/decoder.cpp b/be/src/vec/exec/format/parquet/decoder.cpp index e35a0ff58c..e625994a21 100644 --- a/be/src/vec/exec/format/parquet/decoder.cpp +++ b/be/src/vec/exec/format/parquet/decoder.cpp @@ -20,8 +20,10 @@ #include "vec/data_types/data_type_decimal.h" #include "vec/data_types/data_type_nullable.h" #include "vec/exec/format/parquet/bool_plain_decoder.h" +#include "vec/exec/format/parquet/bool_rle_decoder.h" #include "vec/exec/format/parquet/byte_array_dict_decoder.h" #include "vec/exec/format/parquet/byte_array_plain_decoder.h" +#include "vec/exec/format/parquet/delta_bit_pack_decoder.h" #include "vec/exec/format/parquet/fix_length_dict_decoder.hpp" #include "vec/exec/format/parquet/fix_length_plain_decoder.h" @@ -88,6 +90,48 @@ Status Decoder::get_decoder(tparquet::Type::type type, tparquet::Encoding::type tparquet::to_string(type), tparquet::to_string(encoding)); } break; + case tparquet::Encoding::RLE: + switch (type) { + case tparquet::Type::BOOLEAN: + decoder.reset(new BoolRLEDecoder()); + break; + default: + return Status::InternalError("Unsupported type {}(encoding={}) in parquet decoder", + tparquet::to_string(type), tparquet::to_string(encoding)); + } + break; + case tparquet::Encoding::DELTA_BINARY_PACKED: + // Supports only INT32 and INT64. + switch (type) { + case tparquet::Type::INT32: + decoder.reset(new DeltaBitPackDecoder(type)); + break; + case tparquet::Type::INT64: + decoder.reset(new DeltaBitPackDecoder(type)); + break; + default: + return Status::InternalError("DELTA_BINARY_PACKED only supports INT32 and INT64"); + } + break; + case tparquet::Encoding::DELTA_BYTE_ARRAY: + switch (type) { + case tparquet::Type::BYTE_ARRAY: + decoder.reset(new DeltaByteArrayDecoder(type)); + break; + default: + return Status::InternalError("DELTA_BYTE_ARRAY only supports BYTE_ARRAY."); + } + break; + case tparquet::Encoding::DELTA_LENGTH_BYTE_ARRAY: + switch (type) { + case tparquet::Type::FIXED_LEN_BYTE_ARRAY: + decoder.reset(new DeltaLengthByteArrayDecoder(type)); + break; + default: + return Status::InternalError( + "DELTA_LENGTH_BYTE_ARRAY only supports FIXED_LEN_BYTE_ARRAY."); + } + break; default: return Status::InternalError("Unsupported encoding {}(type={}) in parquet decoder", tparquet::to_string(encoding), tparquet::to_string(type)); diff --git a/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.cpp b/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.cpp new file mode 100644 index 0000000000..c550e7262d --- /dev/null +++ b/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.cpp @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "delta_bit_pack_decoder.h" + +namespace doris::vectorized { + +Status DeltaDecoder::decode_byte_array(const std::vector& decoded_vals, + MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) { + TypeIndex logical_type = remove_nullable(data_type)->get_type_id(); + switch (logical_type) { + case TypeIndex::String: + [[fallthrough]]; + case TypeIndex::FixedString: { + ColumnSelectVector::DataReadType read_type; + while (size_t run_length = select_vector.get_next_run(&read_type)) { + switch (read_type) { + case ColumnSelectVector::CONTENT: { + std::vector string_values; + string_values.reserve(run_length); + for (size_t i = 0; i < run_length; ++i) { + size_t length = decoded_vals[_current_value_idx].size; + string_values.emplace_back(decoded_vals[_current_value_idx].data, length); + _current_value_idx++; + } + doris_column->insert_many_strings(&string_values[0], run_length); + break; + } + case ColumnSelectVector::NULL_DATA: { + doris_column->insert_many_defaults(run_length); + break; + } + case ColumnSelectVector::FILTERED_CONTENT: { + _current_value_idx += run_length; + break; + } + case ColumnSelectVector::FILTERED_NULL: { + // do nothing + break; + } + } + } + _current_value_idx = 0; + return Status::OK(); + } + default: + break; + } + return Status::InvalidArgument( + "Can't decode parquet physical type BYTE_ARRAY to doris logical type {}", + getTypeName(logical_type)); +} + +template +Status DeltaBitPackDecoder::_init_header() { + if (!_bit_reader->GetVlqInt(&_values_per_block) || + !_bit_reader->GetVlqInt(&_mini_blocks_per_block) || + !_bit_reader->GetVlqInt(&_total_value_count) || + !_bit_reader->GetZigZagVlqInt(&_last_value)) { + return Status::IOError("Init header eof"); + } + if (_values_per_block == 0) { + return Status::InvalidArgument("Cannot have zero value per block"); + } + if (_values_per_block % 128 != 0) { + return Status::InvalidArgument( + "the number of values in a block must be multiple of 128, but it's " + + std::to_string(_values_per_block)); + } + if (_mini_blocks_per_block == 0) { + return Status::InvalidArgument("Cannot have zero miniblock per block"); + } + _values_per_mini_block = _values_per_block / _mini_blocks_per_block; + if (_values_per_mini_block == 0) { + return Status::InvalidArgument("Cannot have zero value per miniblock"); + } + if (_values_per_mini_block % 32 != 0) { + return Status::InvalidArgument( + "The number of values in a miniblock must be multiple of 32, but it's " + + std::to_string(_values_per_mini_block)); + } + _total_values_remaining = _total_value_count; + _delta_bit_widths.resize(_mini_blocks_per_block); + // init as empty property + _block_initialized = false; + _values_remaining_current_mini_block = 0; + return Status::OK(); +} + +template +Status DeltaBitPackDecoder::_init_block() { + DCHECK_GT(_total_values_remaining, 0) << "InitBlock called at EOF"; + if (!_bit_reader->GetZigZagVlqInt(&_min_delta)) { + return Status::IOError("Init block eof"); + } + + // read the bitwidth of each miniblock + uint8_t* bit_width_data = _delta_bit_widths.data(); + for (uint32_t i = 0; i < _mini_blocks_per_block; ++i) { + if (!_bit_reader->GetAligned(1, bit_width_data + i)) { + return Status::IOError("Decode bit-width EOF"); + } + // Note that non-conformant bitwidth entries are allowed by the Parquet spec + // for extraneous miniblocks in the last block (GH-14923), so we check + // the bitwidths when actually using them (see InitMiniBlock()). + } + _mini_block_idx = 0; + _block_initialized = true; + RETURN_IF_ERROR(_init_mini_block(bit_width_data[0])); + return Status::OK(); +} + +template +Status DeltaBitPackDecoder::_init_mini_block(int bit_width) { + if (PREDICT_FALSE(bit_width > kMaxDeltaBitWidth)) { + return Status::InvalidArgument("delta bit width larger than integer bit width"); + } + _delta_bit_width = bit_width; + _values_remaining_current_mini_block = _values_per_mini_block; + return Status::OK(); +} + +template +Status DeltaBitPackDecoder::_get_internal(T* buffer, int num_values, int* out_num_values) { + num_values = static_cast(std::min(num_values, _total_values_remaining)); + if (num_values == 0) { + *out_num_values = 0; + return Status::OK(); + } + int i = 0; + while (i < num_values) { + if (PREDICT_FALSE(_values_remaining_current_mini_block == 0)) { + if (PREDICT_FALSE(!_block_initialized)) { + buffer[i++] = _last_value; + DCHECK_EQ(i, 1); // we're at the beginning of the page + if (i == num_values) { + // When block is uninitialized and i reaches num_values we have two + // different possibilities: + // 1. _total_value_count == 1, which means that the page may have only + // one value (encoded in the header), and we should not initialize + // any block. + // 2. _total_value_count != 1, which means we should initialize the + // incoming block for subsequent reads. + if (_total_value_count != 1) { + RETURN_IF_ERROR(_init_block()); + } + break; + } + RETURN_IF_ERROR(_init_block()); + } else { + ++_mini_block_idx; + if (_mini_block_idx < _mini_blocks_per_block) { + RETURN_IF_ERROR(_init_mini_block(_delta_bit_widths.data()[_mini_block_idx])); + } else { + RETURN_IF_ERROR(_init_block()); + } + } + } + + int values_decode = std::min(_values_remaining_current_mini_block, + static_cast(num_values - i)); + for (int j = 0; j < values_decode; ++j) { + if (!_bit_reader->GetValue(_delta_bit_width, buffer + i + j)) { + return Status::IOError("Get batch EOF"); + } + } + for (int j = 0; j < values_decode; ++j) { + // Addition between min_delta, packed int and last_value should be treated as + // unsigned addition. Overflow is as expected. + buffer[i + j] = static_cast(_min_delta) + static_cast(buffer[i + j]) + + static_cast(_last_value); + _last_value = buffer[i + j]; + } + _values_remaining_current_mini_block -= values_decode; + i += values_decode; + } + _total_values_remaining -= num_values; + + if (PREDICT_FALSE(_total_values_remaining == 0)) { + if (!_bit_reader->Advance(_delta_bit_width * _values_remaining_current_mini_block)) { + return Status::IOError("Skip padding EOF"); + } + _values_remaining_current_mini_block = 0; + } + *out_num_values = num_values; + return Status::OK(); +} + +void DeltaLengthByteArrayDecoder::_decode_lengths() { + _len_decoder.set_bit_reader(_bit_reader); + // get the number of encoded lengths + int num_length = _len_decoder.valid_values_count(); + _buffered_length.resize(num_length); + + // decode all the lengths. all the lengths are buffered in buffered_length_. + int ret; + Status st = _len_decoder.decode(_buffered_length.data(), num_length, &ret); + if (st != Status::OK()) { + LOG(FATAL) << "Fail to decode delta length, status: " << st; + } + DCHECK_EQ(ret, num_length); + _length_idx = 0; + _num_valid_values = num_length; +} + +Status DeltaLengthByteArrayDecoder::_get_internal(Slice* buffer, int max_values, + int* out_num_values) { + // Decode up to `max_values` strings into an internal buffer + // and reference them into `buffer`. + max_values = std::min(max_values, _num_valid_values); + if (max_values == 0) { + *out_num_values = 0; + return Status::OK(); + } + + int32_t data_size = 0; + const int32_t* length_ptr = _buffered_length.data() + _length_idx; + for (int i = 0; i < max_values; ++i) { + int32_t len = length_ptr[i]; + if (PREDICT_FALSE(len < 0)) { + return Status::InvalidArgument("Negative string delta length"); + } + buffer[i].size = len; + if (common::add_overflow(data_size, len, data_size)) { + return Status::InvalidArgument("Excess expansion in DELTA_(LENGTH_)BYTE_ARRAY"); + } + } + _length_idx += max_values; + + _buffered_data.resize(data_size); + char* data_ptr = _buffered_data.data(); + for (int j = 0; j < data_size; j++) { + if (!_bit_reader->GetValue(8, data_ptr + j)) { + return Status::IOError("Get length bytes EOF"); + } + } + + for (int i = 0; i < max_values; ++i) { + buffer[i].data = data_ptr; + data_ptr += buffer[i].size; + } + // this->num_values_ -= max_values; + _num_valid_values -= max_values; + *out_num_values = max_values; + return Status::OK(); +} + +Status DeltaByteArrayDecoder::_get_internal(Slice* buffer, int max_values, int* out_num_values) { + // Decode up to `max_values` strings into an internal buffer + // and reference them into `buffer`. + max_values = std::min(max_values, _num_valid_values); + if (max_values == 0) { + *out_num_values = max_values; + return Status::OK(); + } + + int suffix_read; + RETURN_IF_ERROR(_suffix_decoder.decode(buffer, max_values, &suffix_read)); + if (PREDICT_FALSE(suffix_read != max_values)) { + return Status::IOError("Read {}, expecting {} from suffix decoder", + std::to_string(suffix_read), std::to_string(max_values)); + } + + int64_t data_size = 0; + const int32_t* prefix_len_ptr = _buffered_prefix_length.data() + _prefix_len_offset; + for (int i = 0; i < max_values; ++i) { + if (PREDICT_FALSE(prefix_len_ptr[i] < 0)) { + return Status::InvalidArgument("negative prefix length in DELTA_BYTE_ARRAY"); + } + if (PREDICT_FALSE(common::add_overflow(data_size, static_cast(prefix_len_ptr[i]), + data_size) || + common::add_overflow(data_size, static_cast(buffer[i].size), + data_size))) { + return Status::InvalidArgument("excess expansion in DELTA_BYTE_ARRAY"); + } + } + _buffered_data.resize(data_size); + + std::string_view prefix {_last_value}; + + char* data_ptr = _buffered_data.data(); + for (int i = 0; i < max_values; ++i) { + if (PREDICT_FALSE(static_cast(prefix_len_ptr[i]) > prefix.length())) { + return Status::InvalidArgument("prefix length too large in DELTA_BYTE_ARRAY"); + } + memcpy(data_ptr, prefix.data(), prefix_len_ptr[i]); + // buffer[i] currently points to the string suffix + memcpy(data_ptr + prefix_len_ptr[i], buffer[i].data, buffer[i].size); + buffer[i].data = data_ptr; + buffer[i].size += prefix_len_ptr[i]; + data_ptr += buffer[i].size; + prefix = std::string_view {buffer[i].data, buffer[i].size}; + } + _prefix_len_offset += max_values; + _num_valid_values -= max_values; + _last_value = std::string {prefix}; + + if (_num_valid_values == 0) { + _last_value_in_previous_page = _last_value; + } + *out_num_values = max_values; + return Status::OK(); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h b/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h new file mode 100644 index 0000000000..bdebd4d82f --- /dev/null +++ b/be/src/vec/exec/format/parquet/delta_bit_pack_decoder.h @@ -0,0 +1,280 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "util/bit_stream_utils.h" +#include "vec/exec/format/parquet/byte_array_plain_decoder.h" +#include "vec/exec/format/parquet/fix_length_plain_decoder.h" + +namespace doris::vectorized { + +class DeltaDecoder : public Decoder { +public: + DeltaDecoder(Decoder* decoder) { _type_converted_decoder.reset(decoder); } + + ~DeltaDecoder() override = default; + + Status skip_values(size_t num_values) override { + return _type_converted_decoder->skip_values(num_values); + } + + Status decode_byte_array(const std::vector& decoded_vals, MutableColumnPtr& doris_column, + DataTypePtr& data_type, ColumnSelectVector& select_vector); + +protected: + void init_values_converter() { + _type_converted_decoder->set_data(_data); + _type_converted_decoder->set_type_length(_type_length); + _type_converted_decoder->init(_field_schema, _decode_params->ctz); + } + // Convert decoded value to doris type value. + std::unique_ptr _type_converted_decoder; + size_t _current_value_idx = 0; +}; + +/** + * Format + * [header] [block 1] [block 2] ... [block N] + * Header + * [block size] [_mini_blocks_per_block] [_total_value_count] [first value] + * Block + * [min delta] [list of bitwidths of the mini blocks] [miniblocks] + */ +template +class DeltaBitPackDecoder final : public DeltaDecoder { +public: + using UT = std::make_unsigned_t; + + DeltaBitPackDecoder(const tparquet::Type::type& physical_type) + : DeltaDecoder(new FixLengthPlainDecoder(physical_type)) {} + ~DeltaBitPackDecoder() override = default; + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t non_null_size = select_vector.num_values() - select_vector.num_nulls(); + // decode values + _values.resize(non_null_size); + int decoded_count = 0; + RETURN_IF_ERROR(_get_internal(_values.data(), non_null_size, &decoded_count)); + _data->data = reinterpret_cast(_values.data()); + _type_length = sizeof(T); + _data->size = _values.size() * _type_length; + // set decoded value with fix plain decoder + init_values_converter(); + return _type_converted_decoder->decode_values(doris_column, data_type, select_vector); + } + + Status decode(T* buffer, int num_values, int* out_num_values) { + return _get_internal(buffer, num_values, out_num_values); + } + + int valid_values_count() { + // _total_value_count in header ignores of null values + return static_cast(_total_values_remaining); + } + + void set_data(Slice* slice) override { + _bit_reader.reset(new BitReader((const uint8_t*)slice->data, slice->size)); + Status st = _init_header(); + if (st != Status::OK()) { + LOG(FATAL) << "Fail to init delta encoding header for " << st.to_string(); + } + _data = slice; + _offset = 0; + } + + // Set BitReader which is already initialized by DeltaLengthByteArrayDecoder or + // DeltaByteArrayDecoder + void set_bit_reader(std::shared_ptr bit_reader) { + _bit_reader = std::move(bit_reader); + Status st = _init_header(); + if (st != Status::OK()) { + LOG(FATAL) << "Fail to init delta encoding header for " << st.to_string(); + } + } + +private: + static constexpr int kMaxDeltaBitWidth = static_cast(sizeof(T) * 8); + Status _init_header(); + Status _init_block(); + Status _init_mini_block(int bit_width); + Status _get_internal(T* buffer, int max_values, int* out_num_values); + + std::vector _values; + + std::shared_ptr _bit_reader; + uint32_t _values_per_block; + uint32_t _mini_blocks_per_block; + uint32_t _values_per_mini_block; + uint32_t _total_value_count; + + T _min_delta; + T _last_value; + + uint32_t _mini_block_idx; + std::vector _delta_bit_widths; + int _delta_bit_width; + // If the page doesn't contain any block, `_block_initialized` will + // always be false. Otherwise, it will be true when first block initialized. + bool _block_initialized; + + uint32_t _total_values_remaining; + // Remaining values in current mini block. If the current block is the last mini block, + // _values_remaining_current_mini_block may greater than _total_values_remaining. + uint32_t _values_remaining_current_mini_block; +}; +template class DeltaBitPackDecoder; +template class DeltaBitPackDecoder; + +class DeltaLengthByteArrayDecoder final : public DeltaDecoder { +public: + explicit DeltaLengthByteArrayDecoder(const tparquet::Type::type& physical_type) + : DeltaDecoder(nullptr), + _len_decoder(physical_type), + _buffered_length(0), + _buffered_data(0) {} + + Status skip_values(size_t num_values) override { + _current_value_idx += num_values; + RETURN_IF_ERROR(_len_decoder.skip_values(num_values)); + return Status::OK(); + } + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t num_values = select_vector.num_values(); + size_t null_count = select_vector.num_nulls(); + // init read buffer + _values.resize(num_values - null_count); + int num_valid_values; + RETURN_IF_ERROR(_get_internal(_values.data(), num_values - null_count, &num_valid_values)); + + if (PREDICT_FALSE(num_values - null_count != num_valid_values)) { + return Status::IOError("Expected to decode {} values, but decoded {} values.", + num_values - null_count, num_valid_values); + } + return decode_byte_array(_values, doris_column, data_type, select_vector); + } + + Status decode(Slice* buffer, int num_values, int* out_num_values) { + return _get_internal(buffer, num_values, out_num_values); + } + + void set_data(Slice* slice) override { + if (slice->size == 0) { + return; + } + _bit_reader = std::make_shared((const uint8_t*)slice->data, slice->size); + _data = slice; + _offset = 0; + _decode_lengths(); + } + + void set_bit_reader(std::shared_ptr bit_reader) { + _bit_reader = std::move(bit_reader); + _decode_lengths(); + } + +private: + // Decode all the encoded lengths. The decoder_ will be at the start of the encoded data + // after that. + void _decode_lengths(); + Status _get_internal(Slice* buffer, int max_values, int* out_num_values); + + std::vector _values; + std::shared_ptr _bit_reader; + DeltaBitPackDecoder _len_decoder; + + int _num_valid_values; + uint32_t _length_idx; + std::vector _buffered_length; + std::vector _buffered_data; +}; + +class DeltaByteArrayDecoder : public DeltaDecoder { +public: + explicit DeltaByteArrayDecoder(const tparquet::Type::type& physical_type) + : DeltaDecoder(nullptr), + _prefix_len_decoder(physical_type), + _suffix_decoder(physical_type), + _buffered_prefix_length(0), + _buffered_data(0) {} + + Status skip_values(size_t num_values) override { + _current_value_idx += num_values; + RETURN_IF_ERROR(_prefix_len_decoder.skip_values(num_values)); + RETURN_IF_ERROR(_suffix_decoder.skip_values(num_values)); + return Status::OK(); + } + + Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, + ColumnSelectVector& select_vector) override { + size_t num_values = select_vector.num_values(); + size_t null_count = select_vector.num_nulls(); + _values.resize(num_values - null_count); + int num_valid_values; + RETURN_IF_ERROR(_get_internal(_values.data(), num_values - null_count, &num_valid_values)); + DCHECK_EQ(num_values - null_count, num_valid_values); + return decode_byte_array(_values, doris_column, data_type, select_vector); + } + + void set_data(Slice* slice) override { + _bit_reader = std::make_shared((const uint8_t*)slice->data, slice->size); + _prefix_len_decoder.set_bit_reader(_bit_reader); + + // get the number of encoded prefix lengths + int num_prefix = _prefix_len_decoder.valid_values_count(); + // call _prefix_len_decoder.Decode to decode all the prefix lengths. + // all the prefix lengths are buffered in _buffered_prefix_length. + _buffered_prefix_length.resize(num_prefix); + int ret; + Status st = _prefix_len_decoder.decode(_buffered_prefix_length.data(), num_prefix, &ret); + if (st != Status::OK()) { + LOG(FATAL) << "Fail to decode delta prefix, status: " << st; + } + DCHECK_EQ(ret, num_prefix); + _prefix_len_offset = 0; + _num_valid_values = num_prefix; + + // at this time, the decoder_ will be at the start of the encoded suffix data. + _suffix_decoder.set_bit_reader(_bit_reader); + + // TODO: read corrupted files written with bug(PARQUET-246). _last_value should be set + // to _last_value_in_previous_page when decoding a new page(except the first page) + _last_value = ""; + } + + Status decode(Slice* buffer, int num_values, int* out_num_values) { + return _get_internal(buffer, num_values, out_num_values); + } + +private: + Status _get_internal(Slice* buffer, int max_values, int* out_num_values); + + std::vector _values; + std::shared_ptr _bit_reader; + DeltaBitPackDecoder _prefix_len_decoder; + DeltaLengthByteArrayDecoder _suffix_decoder; + std::string _last_value; + // string buffer for last value in previous page + std::string _last_value_in_previous_page; + int _num_valid_values; + uint32_t _prefix_len_offset; + std::vector _buffered_prefix_length; + std::vector _buffered_data; +}; +} // namespace doris::vectorized diff --git a/docs/en/docs/lakehouse/multi-catalog/hive.md b/docs/en/docs/lakehouse/multi-catalog/hive.md index 8b83e2e0be..c6b59bb802 100644 --- a/docs/en/docs/lakehouse/multi-catalog/hive.md +++ b/docs/en/docs/lakehouse/multi-catalog/hive.md @@ -104,7 +104,7 @@ CREATE CATALOG hive PROPERTIES ( ); ``` -Or to connect to Hive data stored in JuiceFS: +Or to connect to Hive data stored on JuiceFS: ```sql CREATE CATALOG hive PROPERTIES ( @@ -117,6 +117,23 @@ CREATE CATALOG hive PROPERTIES ( ); ``` +Or to connect to Glue and data stored on S3: + +```sql +CREATE CATALOG hive PROPERTIES ( + "type"="hms", + "hive.metastore.type" = "glue", + "aws.region" = "us-east-1", + "aws.glue.access-key" = "ak", + "aws.glue.secret-key" = "sk", + "AWS_ENDPOINT" = "s3.us-east-1.amazonaws.com", + "AWS_REGION" = "us-east-1", + "AWS_ACCESS_KEY" = "ak", + "AWS_SECRET_KEY" = "sk", + "use_path_style" = "true" +); +``` + when connecting to Hive Metastore which is authorized by Ranger, need some properties and update FE runtime environment. diff --git a/docs/en/docs/lakehouse/multi-catalog/iceberg.md b/docs/en/docs/lakehouse/multi-catalog/iceberg.md index f73f1464b2..4b870dfea8 100644 --- a/docs/en/docs/lakehouse/multi-catalog/iceberg.md +++ b/docs/en/docs/lakehouse/multi-catalog/iceberg.md @@ -61,7 +61,7 @@ Access metadata with the iceberg API. The Hive, REST, Glue and other services ca -- Using Iceberg Hive Catalog +#### Using Iceberg Hive Catalog ```sql CREATE CATALOG iceberg PROPERTIES ( @@ -77,7 +77,7 @@ CREATE CATALOG iceberg PROPERTIES ( ); ``` -- Using Iceberg Glue Catalog +#### Using Iceberg Glue Catalog ```sql CREATE CATALOG glue PROPERTIES ( @@ -97,6 +97,8 @@ CREATE CATALOG glue PROPERTIES ( `warehouse`: Glue Warehouse Location. To determine the root path of the data warehouse in storage. +The other properties can refer to [Iceberg Glue Catalog](https://iceberg.apache.org/docs/latest/aws/#glue-catalog) + - Using Iceberg REST Catalog RESTful service as the server side. Implementing RESTCatalog interface of iceberg to obtain metadata. diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md b/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md index 8c7ddcbc58..89334a1272 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/hive.md @@ -113,6 +113,23 @@ CREATE CATALOG hive PROPERTIES ( ); ``` +hive元数据存储在Glue,数据存储在S3,示例如下: + +```sql +CREATE CATALOG hive PROPERTIES ( + "type"="hms", + "hive.metastore.type" = "glue", + "aws.region" = "us-east-1", + "aws.glue.access-key" = "ak", + "aws.glue.secret-key" = "sk", + "AWS_ENDPOINT" = "s3.us-east-1.amazonaws.com", + "AWS_REGION" = "us-east-1", + "AWS_ACCESS_KEY" = "ak", + "AWS_SECRET_KEY" = "sk", + "use_path_style" = "true" +); +``` + 连接开启 Ranger 权限校验的 Hive Metastore 需要增加配置 & 配置环境: diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md index 9363767160..10e12af675 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md @@ -59,7 +59,7 @@ CREATE CATALOG iceberg PROPERTIES ( -- Hive Metastore作为元数据服务 +#### Hive Metastore作为元数据服务 ```sql CREATE CATALOG iceberg PROPERTIES ( @@ -75,7 +75,7 @@ CREATE CATALOG iceberg PROPERTIES ( ); ``` -- Glue Catalog作为元数据服务 +#### Glue Catalog作为元数据服务 ```sql CREATE CATALOG glue PROPERTIES ( @@ -95,6 +95,8 @@ CREATE CATALOG glue PROPERTIES ( `warehouse`: Glue Warehouse Location. Glue Catalog的根路径,用于指定数据存放位置。 +属性详情参见 [Iceberg Glue Catalog](https://iceberg.apache.org/docs/latest/aws/#glue-catalog) + - REST Catalog作为元数据服务 该方式需要预先提供REST服务,用户需实现获取Iceberg元数据的REST接口。 diff --git a/regression-test/data/external_table_emr_p2/hive/test_external_catalog_glue_table.out b/regression-test/data/external_table_emr_p2/hive/test_external_catalog_glue_table.out new file mode 100644 index 0000000000..23cb814ab1 --- /dev/null +++ b/regression-test/data/external_table_emr_p2/hive/test_external_catalog_glue_table.out @@ -0,0 +1,131 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q01 -- +2967 +3158 +15505 +20726 +21843 + +-- !q02 -- +1809714008 +1979816070 +2147483647 +2147483647 +2147483647 + +-- !q03 -- +\N +15821 +\N +\N +\N + +-- !q04 -- +1604.7639 +1583.2013 +1031.6219 +1295.7802 +182.5588 + +-- !q05 -- +1937.7762425702406 +992.21123681735253 +56.682069922520562 +940.70481552186243 +1876.4831949153224 + +-- !q06 -- +2023-03-07 20:34:59 +2023-03-07 20:34:59 +2023-03-07 20:34:59 +2023-03-07 20:34:59 +2023-03-07 20:34:59 +2023-03-07 20:34:59 +2023-03-07 20:34:59 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 +2023-03-07 20:35 + +-- !q07 -- +6f77a7baae184d +88 +fbbf69fc81374 +f14889 +33d471ce + +-- !q08 -- +c2b69a82f074e4f +81b1152fa774b8 +73df8eaccf +2ed59df3c824dc78b +5e3e98e07e + +-- !q09 -- +dfaac2a43 +28c5f21b8 +a20faeee91e34ce +b5e6bf2b5 +8bc56e + +-- !q10 -- +true +true +false +false +false + +-- !q11 -- +54078 8184 +122067 9731 +140902 170 +143594 5714 +170289 4567 +175294 1959 +202483 857 +222664 6449 +230156 2480 +266339 6845 + +-- !q12 -- +\N +\N +\N + +-- !q13 -- +27 +27 +34 +50 +59 +97 +99 +101 +107 +114 + +-- !q14 -- +dfaac2a43 +28c5f21b8 +a20faeee91e34ce +b5e6bf2b5 +8bc56e + +-- !q15 -- +5000 + +-- !q16 -- +2023-03-07 20:35:59 +2023-03-07 20:35:59 +2023-03-07 20:35:59 +2023-03-07 20:35:59 +2023-03-07 20:35:59 diff --git a/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_glue_table.groovy b/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_glue_table.groovy new file mode 100644 index 0000000000..d1f014d28c --- /dev/null +++ b/regression-test/suites/external_table_emr_p2/hive/test_external_catalog_glue_table.groovy @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_external_catalog_glue_table", "p2") { + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + + sql """drop catalog if exists test_external_catalog_glue;""" + sql """ + create catalog if not exists test_external_catalog_glue properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}' + ); + """ + + sql """switch test_external_catalog_glue;""" + def q01 = { + qt_q01 """ select glue_int from iceberg_glue_types order by glue_int limit 5 """ + qt_q02 """ select glue_bigint from iceberg_glue_types order by glue_int limit 5 """ + qt_q03 """ select glue_smallint from iceberg_glue_types order by glue_int limit 5 """ + qt_q04 """ select glue_decimal from iceberg_glue_types order by glue_int limit 5 """ + qt_q05 """ select glue_double from iceberg_glue_types order by glue_int limit 5 """ + qt_q06 """ select glue_timstamp from iceberg_glue_types order by glue_timstamp limit 20 """ + qt_q07 """ select glue_char from iceberg_glue_types order by glue_int limit 5 """ + qt_q08 """ select glue_varchar from iceberg_glue_types order by glue_int limit 5 """ + qt_q09 """ select glue_string from iceberg_glue_types order by glue_int limit 5 """ + qt_q10 """ select glue_bool from iceberg_glue_types order by glue_int limit 5 """ + qt_q11 """ select glue_int,glue_smallint from iceberg_glue_types where glue_int > 2000 and glue_smallint < 10000 order by glue_int limit 10 """ + qt_q12 """ select glue_smallint from iceberg_glue_types where glue_smallint is null order by glue_smallint limit 3 """ + qt_q13 """ select glue_smallint from iceberg_glue_types where glue_smallint is not null order by glue_smallint limit 10 """ + qt_q14 """ select glue_string from iceberg_glue_types where glue_string>'040abff1da4748e4b' order by glue_int limit 5 """ + qt_q15 """ select count(1) from iceberg_glue_types """ + qt_q16 """ select glue_timstamp from iceberg_glue_types where glue_timstamp > '2023-03-07 20:35:59' order by glue_timstamp limit 5 """ + } + sql """ use `iceberg_catalog`; """ + q01() + } +}