[feature](parquet-reader) add rle bool and delta decoder to read AWS Glue (#17112)

Support delta encoding and rle(bool) to read Glue data
add delta bit pack decoder,
add delta length byte array decoder,
add delta byte array decoder.
add rle bool decoder.

We find some data type is read with delta encoding on AWS Glue, so it should be supported.
The definition of delta encoding can refer to the delta encoding in parquet.
This commit is contained in:
slothever
2023-03-12 20:09:58 +08:00
committed by GitHub
parent a452db35da
commit 455c800405
15 changed files with 1080 additions and 20 deletions

View File

@ -111,7 +111,15 @@ public:
// Reads a vlq encoded int from the stream. The encoded int must start at the
// beginning of a byte. Return false if there were not enough bytes in the buffer.
bool GetVlqInt(int32_t* v);
bool GetVlqInt(uint32_t* v);
// Reads a zigzag encoded int `into` v.
bool GetZigZagVlqInt(int32_t* v);
// Reads a vlq encoded int from the stream. The encoded int must start at the
// beginning of a byte. Return false if there were not enough bytes in the buffer.
bool GetVlqInt(uint64_t* v);
// Reads a zigzag encoded int `into` v.
bool GetZigZagVlqInt(int64_t* v);
// Returns the number of bytes left in the stream, not including the current byte (i.e.,
// there may be an additional fraction of a byte).
@ -123,12 +131,18 @@ public:
// Rewind the stream by 'num_bits' bits
void Rewind(int num_bits);
// Advance the stream by 'num_bits' bits
bool Advance(int64_t num_bits);
// Seek to a specific bit in the buffer
void SeekToBit(unsigned int stream_position);
// Maximum byte length of a vlq encoded int
static const int MAX_VLQ_BYTE_LEN = 5;
// Maximum byte length of a vlq encoded int64
static const int MAX_VLQ_BYTE_LEN_FOR_INT64 = 10;
bool is_initialized() const { return buffer_ != nullptr; }
private:

View File

@ -26,6 +26,7 @@
#include "util/alignment.h"
#include "util/bit_packing.inline.h"
#include "util/bit_stream_utils.h"
#include "util/bit_util.h"
using doris::BitUtil;
@ -150,6 +151,18 @@ inline void BitReader::Rewind(int num_bits) {
memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
}
inline bool BitReader::Advance(int64_t num_bits) {
int64_t bits_required = bit_offset_ + num_bits;
int64_t bytes_required = (bits_required >> 3) + ((bits_required & 7) != 0);
if (bytes_required > max_bytes_ - byte_offset_) {
return false;
}
byte_offset_ += static_cast<int>(bits_required >> 3);
bit_offset_ = static_cast<int>(bits_required & 7);
BufferValues();
return true;
}
inline void BitReader::SeekToBit(unsigned int stream_position) {
DCHECK_LE(stream_position, max_bytes_ * 8);
@ -195,17 +208,52 @@ inline bool BitReader::GetAligned(int num_bytes, T* v) {
return true;
}
inline bool BitReader::GetVlqInt(int32_t* v) {
*v = 0;
int shift = 0;
int num_bytes = 0;
uint8_t byte = 0;
do {
inline bool BitReader::GetVlqInt(uint32_t* v) {
uint32_t tmp = 0;
for (int num_bytes = 0; num_bytes < MAX_VLQ_BYTE_LEN; num_bytes++) {
uint8_t byte = 0;
if (!GetAligned<uint8_t>(1, &byte)) return false;
*v |= (byte & 0x7F) << shift;
shift += 7;
DCHECK_LE(++num_bytes, MAX_VLQ_BYTE_LEN);
} while ((byte & 0x80) != 0);
tmp |= static_cast<uint32_t>(byte & 0x7F) << (7 * num_bytes);
if ((byte & 0x80) == 0) {
*v = tmp;
return true;
}
}
return false;
}
inline bool BitReader::GetZigZagVlqInt(int32_t* v) {
uint32_t u;
if (!GetVlqInt(&u)) {
return false;
}
u = (u >> 1) ^ (~(u & 1) + 1);
// copy uint32_t to int32_t
std::memcpy(v, &u, sizeof(uint32_t));
return true;
}
inline bool BitReader::GetVlqInt(uint64_t* v) {
uint64_t tmp = 0;
for (int num_bytes = 0; num_bytes < MAX_VLQ_BYTE_LEN_FOR_INT64; num_bytes++) {
uint8_t byte = 0;
if (!GetAligned<uint8_t>(1, &byte)) return false;
tmp |= static_cast<uint64_t>(byte & 0x7F) << (7 * num_bytes);
if ((byte & 0x80) == 0) {
*v = tmp;
return true;
}
}
return false;
}
inline bool BitReader::GetZigZagVlqInt(int64_t* v) {
uint64_t u;
if (!GetVlqInt(&u)) {
return false;
}
u = (u >> 1) ^ (~(u & 1) + 1);
std::memcpy(v, &u, sizeof(uint64_t));
return true;
}
@ -227,12 +275,14 @@ inline int BatchedBitReader::UnpackBatch(int bit_width, int num_values, T* v) {
inline bool BatchedBitReader::SkipBatch(int bit_width, int num_values_to_skip) {
DCHECK(buffer_pos_ != nullptr);
DCHECK_GT(bit_width, 0);
DCHECK_GE(bit_width, 0);
DCHECK_LE(bit_width, MAX_BITWIDTH);
DCHECK_GT(num_values_to_skip, 0);
DCHECK_GE(num_values_to_skip, 0);
int skip_bytes = BitUtil::RoundUpNumBytes(bit_width * num_values_to_skip);
if (skip_bytes > buffer_end_ - buffer_pos_) return false;
if (skip_bytes > buffer_end_ - buffer_pos_) {
return false;
}
buffer_pos_ += skip_bytes;
return true;
}

View File

@ -229,7 +229,7 @@ inline bool RleDecoder<T>::ReadHeader() {
if (PREDICT_FALSE(literal_count_ == 0 && repeat_count_ == 0)) {
// Read the next run's indicator int, it could be a literal or repeated run
// The int is encoded as a vlq-encoded value.
int32_t indicator_value = 0;
uint32_t indicator_value = 0;
bool result = bit_reader_.GetVlqInt(&indicator_value);
if (PREDICT_FALSE(!result)) {
return false;

View File

@ -313,6 +313,8 @@ set(VEC_FILES
exec/format/table/iceberg_reader.cpp
exec/format/file_reader/new_plain_text_line_reader.cpp
exec/format/file_reader/new_plain_binary_line_reader.cpp
exec/format/parquet/delta_bit_pack_decoder.cpp
exec/format/parquet/bool_rle_decoder.cpp
)
if (WITH_MYSQL)

View File

@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/format/parquet/bool_rle_decoder.h"
#include "util/bit_util.h"
namespace doris::vectorized {
void BoolRLEDecoder::set_data(Slice* slice) {
_data = slice;
_num_bytes = slice->size;
_offset = 0;
if (_num_bytes < 4) {
LOG(FATAL) << "Received invalid length : " + std::to_string(_num_bytes) +
" (corrupt data page?)";
}
// Load the first 4 bytes in little-endian, which indicates the length
const uint8_t* data = reinterpret_cast<const uint8_t*>(_data->data);
uint32_t num_bytes = decode_fixed32_le(data);
if (num_bytes > static_cast<uint32_t>(_num_bytes - 4)) {
LOG(FATAL) << ("Received invalid number of bytes : " + std::to_string(num_bytes) +
" (corrupt data page?)");
}
_num_bytes = num_bytes;
auto decoder_data = data + 4;
_decoder = RleDecoder<uint8_t>(decoder_data, num_bytes, 1);
}
Status BoolRLEDecoder::skip_values(size_t num_values) {
_current_value_idx += num_values;
return Status::OK();
}
Status BoolRLEDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<UInt8>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
size_t max_values = column_data.size();
_values.resize(max_values);
if (!_decoder.get_values(_values.data(), max_values)) {
return Status::IOError("Can't read enough booleans in rle decoder");
}
// _num_bytes -= max_values;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
bool value; // Can't use uint8_t directly, we should correct it.
for (size_t i = 0; i < run_length; ++i) {
value = _values[_current_value_idx++];
column_data[data_index++] = (UInt8)value;
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
_current_value_idx += run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
break;
}
}
}
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "util/rle_encoding.h"
#include "vec/exec/format/parquet/bool_plain_decoder.h"
#include "vec/exec/format/parquet/decoder.h"
namespace doris::vectorized {
class BoolRLEDecoder final : public Decoder {
public:
BoolRLEDecoder() = default;
~BoolRLEDecoder() override = default;
void set_data(Slice* slice) override;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) override;
Status skip_values(size_t num_values) override;
private:
RleDecoder<uint8_t> _decoder;
std::vector<uint8_t> _values;
size_t _num_bytes;
size_t _current_value_idx = 0;
};
} // namespace doris::vectorized

View File

@ -20,8 +20,10 @@
#include "vec/data_types/data_type_decimal.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exec/format/parquet/bool_plain_decoder.h"
#include "vec/exec/format/parquet/bool_rle_decoder.h"
#include "vec/exec/format/parquet/byte_array_dict_decoder.h"
#include "vec/exec/format/parquet/byte_array_plain_decoder.h"
#include "vec/exec/format/parquet/delta_bit_pack_decoder.h"
#include "vec/exec/format/parquet/fix_length_dict_decoder.hpp"
#include "vec/exec/format/parquet/fix_length_plain_decoder.h"
@ -88,6 +90,48 @@ Status Decoder::get_decoder(tparquet::Type::type type, tparquet::Encoding::type
tparquet::to_string(type), tparquet::to_string(encoding));
}
break;
case tparquet::Encoding::RLE:
switch (type) {
case tparquet::Type::BOOLEAN:
decoder.reset(new BoolRLEDecoder());
break;
default:
return Status::InternalError("Unsupported type {}(encoding={}) in parquet decoder",
tparquet::to_string(type), tparquet::to_string(encoding));
}
break;
case tparquet::Encoding::DELTA_BINARY_PACKED:
// Supports only INT32 and INT64.
switch (type) {
case tparquet::Type::INT32:
decoder.reset(new DeltaBitPackDecoder<Int32>(type));
break;
case tparquet::Type::INT64:
decoder.reset(new DeltaBitPackDecoder<Int64>(type));
break;
default:
return Status::InternalError("DELTA_BINARY_PACKED only supports INT32 and INT64");
}
break;
case tparquet::Encoding::DELTA_BYTE_ARRAY:
switch (type) {
case tparquet::Type::BYTE_ARRAY:
decoder.reset(new DeltaByteArrayDecoder(type));
break;
default:
return Status::InternalError("DELTA_BYTE_ARRAY only supports BYTE_ARRAY.");
}
break;
case tparquet::Encoding::DELTA_LENGTH_BYTE_ARRAY:
switch (type) {
case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
decoder.reset(new DeltaLengthByteArrayDecoder(type));
break;
default:
return Status::InternalError(
"DELTA_LENGTH_BYTE_ARRAY only supports FIXED_LEN_BYTE_ARRAY.");
}
break;
default:
return Status::InternalError("Unsupported encoding {}(type={}) in parquet decoder",
tparquet::to_string(encoding), tparquet::to_string(type));

View File

@ -0,0 +1,319 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "delta_bit_pack_decoder.h"
namespace doris::vectorized {
Status DeltaDecoder::decode_byte_array(const std::vector<Slice>& decoded_vals,
MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
case TypeIndex::String:
[[fallthrough]];
case TypeIndex::FixedString: {
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
std::vector<StringRef> string_values;
string_values.reserve(run_length);
for (size_t i = 0; i < run_length; ++i) {
size_t length = decoded_vals[_current_value_idx].size;
string_values.emplace_back(decoded_vals[_current_value_idx].data, length);
_current_value_idx++;
}
doris_column->insert_many_strings(&string_values[0], run_length);
break;
}
case ColumnSelectVector::NULL_DATA: {
doris_column->insert_many_defaults(run_length);
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
_current_value_idx += run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
_current_value_idx = 0;
return Status::OK();
}
default:
break;
}
return Status::InvalidArgument(
"Can't decode parquet physical type BYTE_ARRAY to doris logical type {}",
getTypeName(logical_type));
}
template <typename T>
Status DeltaBitPackDecoder<T>::_init_header() {
if (!_bit_reader->GetVlqInt(&_values_per_block) ||
!_bit_reader->GetVlqInt(&_mini_blocks_per_block) ||
!_bit_reader->GetVlqInt(&_total_value_count) ||
!_bit_reader->GetZigZagVlqInt(&_last_value)) {
return Status::IOError("Init header eof");
}
if (_values_per_block == 0) {
return Status::InvalidArgument("Cannot have zero value per block");
}
if (_values_per_block % 128 != 0) {
return Status::InvalidArgument(
"the number of values in a block must be multiple of 128, but it's " +
std::to_string(_values_per_block));
}
if (_mini_blocks_per_block == 0) {
return Status::InvalidArgument("Cannot have zero miniblock per block");
}
_values_per_mini_block = _values_per_block / _mini_blocks_per_block;
if (_values_per_mini_block == 0) {
return Status::InvalidArgument("Cannot have zero value per miniblock");
}
if (_values_per_mini_block % 32 != 0) {
return Status::InvalidArgument(
"The number of values in a miniblock must be multiple of 32, but it's " +
std::to_string(_values_per_mini_block));
}
_total_values_remaining = _total_value_count;
_delta_bit_widths.resize(_mini_blocks_per_block);
// init as empty property
_block_initialized = false;
_values_remaining_current_mini_block = 0;
return Status::OK();
}
template <typename T>
Status DeltaBitPackDecoder<T>::_init_block() {
DCHECK_GT(_total_values_remaining, 0) << "InitBlock called at EOF";
if (!_bit_reader->GetZigZagVlqInt(&_min_delta)) {
return Status::IOError("Init block eof");
}
// read the bitwidth of each miniblock
uint8_t* bit_width_data = _delta_bit_widths.data();
for (uint32_t i = 0; i < _mini_blocks_per_block; ++i) {
if (!_bit_reader->GetAligned<uint8_t>(1, bit_width_data + i)) {
return Status::IOError("Decode bit-width EOF");
}
// Note that non-conformant bitwidth entries are allowed by the Parquet spec
// for extraneous miniblocks in the last block (GH-14923), so we check
// the bitwidths when actually using them (see InitMiniBlock()).
}
_mini_block_idx = 0;
_block_initialized = true;
RETURN_IF_ERROR(_init_mini_block(bit_width_data[0]));
return Status::OK();
}
template <typename T>
Status DeltaBitPackDecoder<T>::_init_mini_block(int bit_width) {
if (PREDICT_FALSE(bit_width > kMaxDeltaBitWidth)) {
return Status::InvalidArgument("delta bit width larger than integer bit width");
}
_delta_bit_width = bit_width;
_values_remaining_current_mini_block = _values_per_mini_block;
return Status::OK();
}
template <typename T>
Status DeltaBitPackDecoder<T>::_get_internal(T* buffer, int num_values, int* out_num_values) {
num_values = static_cast<int>(std::min<int64_t>(num_values, _total_values_remaining));
if (num_values == 0) {
*out_num_values = 0;
return Status::OK();
}
int i = 0;
while (i < num_values) {
if (PREDICT_FALSE(_values_remaining_current_mini_block == 0)) {
if (PREDICT_FALSE(!_block_initialized)) {
buffer[i++] = _last_value;
DCHECK_EQ(i, 1); // we're at the beginning of the page
if (i == num_values) {
// When block is uninitialized and i reaches num_values we have two
// different possibilities:
// 1. _total_value_count == 1, which means that the page may have only
// one value (encoded in the header), and we should not initialize
// any block.
// 2. _total_value_count != 1, which means we should initialize the
// incoming block for subsequent reads.
if (_total_value_count != 1) {
RETURN_IF_ERROR(_init_block());
}
break;
}
RETURN_IF_ERROR(_init_block());
} else {
++_mini_block_idx;
if (_mini_block_idx < _mini_blocks_per_block) {
RETURN_IF_ERROR(_init_mini_block(_delta_bit_widths.data()[_mini_block_idx]));
} else {
RETURN_IF_ERROR(_init_block());
}
}
}
int values_decode = std::min(_values_remaining_current_mini_block,
static_cast<uint32_t>(num_values - i));
for (int j = 0; j < values_decode; ++j) {
if (!_bit_reader->GetValue(_delta_bit_width, buffer + i + j)) {
return Status::IOError("Get batch EOF");
}
}
for (int j = 0; j < values_decode; ++j) {
// Addition between min_delta, packed int and last_value should be treated as
// unsigned addition. Overflow is as expected.
buffer[i + j] = static_cast<UT>(_min_delta) + static_cast<UT>(buffer[i + j]) +
static_cast<UT>(_last_value);
_last_value = buffer[i + j];
}
_values_remaining_current_mini_block -= values_decode;
i += values_decode;
}
_total_values_remaining -= num_values;
if (PREDICT_FALSE(_total_values_remaining == 0)) {
if (!_bit_reader->Advance(_delta_bit_width * _values_remaining_current_mini_block)) {
return Status::IOError("Skip padding EOF");
}
_values_remaining_current_mini_block = 0;
}
*out_num_values = num_values;
return Status::OK();
}
void DeltaLengthByteArrayDecoder::_decode_lengths() {
_len_decoder.set_bit_reader(_bit_reader);
// get the number of encoded lengths
int num_length = _len_decoder.valid_values_count();
_buffered_length.resize(num_length);
// decode all the lengths. all the lengths are buffered in buffered_length_.
int ret;
Status st = _len_decoder.decode(_buffered_length.data(), num_length, &ret);
if (st != Status::OK()) {
LOG(FATAL) << "Fail to decode delta length, status: " << st;
}
DCHECK_EQ(ret, num_length);
_length_idx = 0;
_num_valid_values = num_length;
}
Status DeltaLengthByteArrayDecoder::_get_internal(Slice* buffer, int max_values,
int* out_num_values) {
// Decode up to `max_values` strings into an internal buffer
// and reference them into `buffer`.
max_values = std::min(max_values, _num_valid_values);
if (max_values == 0) {
*out_num_values = 0;
return Status::OK();
}
int32_t data_size = 0;
const int32_t* length_ptr = _buffered_length.data() + _length_idx;
for (int i = 0; i < max_values; ++i) {
int32_t len = length_ptr[i];
if (PREDICT_FALSE(len < 0)) {
return Status::InvalidArgument("Negative string delta length");
}
buffer[i].size = len;
if (common::add_overflow(data_size, len, data_size)) {
return Status::InvalidArgument("Excess expansion in DELTA_(LENGTH_)BYTE_ARRAY");
}
}
_length_idx += max_values;
_buffered_data.resize(data_size);
char* data_ptr = _buffered_data.data();
for (int j = 0; j < data_size; j++) {
if (!_bit_reader->GetValue(8, data_ptr + j)) {
return Status::IOError("Get length bytes EOF");
}
}
for (int i = 0; i < max_values; ++i) {
buffer[i].data = data_ptr;
data_ptr += buffer[i].size;
}
// this->num_values_ -= max_values;
_num_valid_values -= max_values;
*out_num_values = max_values;
return Status::OK();
}
Status DeltaByteArrayDecoder::_get_internal(Slice* buffer, int max_values, int* out_num_values) {
// Decode up to `max_values` strings into an internal buffer
// and reference them into `buffer`.
max_values = std::min(max_values, _num_valid_values);
if (max_values == 0) {
*out_num_values = max_values;
return Status::OK();
}
int suffix_read;
RETURN_IF_ERROR(_suffix_decoder.decode(buffer, max_values, &suffix_read));
if (PREDICT_FALSE(suffix_read != max_values)) {
return Status::IOError("Read {}, expecting {} from suffix decoder",
std::to_string(suffix_read), std::to_string(max_values));
}
int64_t data_size = 0;
const int32_t* prefix_len_ptr = _buffered_prefix_length.data() + _prefix_len_offset;
for (int i = 0; i < max_values; ++i) {
if (PREDICT_FALSE(prefix_len_ptr[i] < 0)) {
return Status::InvalidArgument("negative prefix length in DELTA_BYTE_ARRAY");
}
if (PREDICT_FALSE(common::add_overflow(data_size, static_cast<int64_t>(prefix_len_ptr[i]),
data_size) ||
common::add_overflow(data_size, static_cast<int64_t>(buffer[i].size),
data_size))) {
return Status::InvalidArgument("excess expansion in DELTA_BYTE_ARRAY");
}
}
_buffered_data.resize(data_size);
std::string_view prefix {_last_value};
char* data_ptr = _buffered_data.data();
for (int i = 0; i < max_values; ++i) {
if (PREDICT_FALSE(static_cast<size_t>(prefix_len_ptr[i]) > prefix.length())) {
return Status::InvalidArgument("prefix length too large in DELTA_BYTE_ARRAY");
}
memcpy(data_ptr, prefix.data(), prefix_len_ptr[i]);
// buffer[i] currently points to the string suffix
memcpy(data_ptr + prefix_len_ptr[i], buffer[i].data, buffer[i].size);
buffer[i].data = data_ptr;
buffer[i].size += prefix_len_ptr[i];
data_ptr += buffer[i].size;
prefix = std::string_view {buffer[i].data, buffer[i].size};
}
_prefix_len_offset += max_values;
_num_valid_values -= max_values;
_last_value = std::string {prefix};
if (_num_valid_values == 0) {
_last_value_in_previous_page = _last_value;
}
*out_num_values = max_values;
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -0,0 +1,280 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "util/bit_stream_utils.h"
#include "vec/exec/format/parquet/byte_array_plain_decoder.h"
#include "vec/exec/format/parquet/fix_length_plain_decoder.h"
namespace doris::vectorized {
class DeltaDecoder : public Decoder {
public:
DeltaDecoder(Decoder* decoder) { _type_converted_decoder.reset(decoder); }
~DeltaDecoder() override = default;
Status skip_values(size_t num_values) override {
return _type_converted_decoder->skip_values(num_values);
}
Status decode_byte_array(const std::vector<Slice>& decoded_vals, MutableColumnPtr& doris_column,
DataTypePtr& data_type, ColumnSelectVector& select_vector);
protected:
void init_values_converter() {
_type_converted_decoder->set_data(_data);
_type_converted_decoder->set_type_length(_type_length);
_type_converted_decoder->init(_field_schema, _decode_params->ctz);
}
// Convert decoded value to doris type value.
std::unique_ptr<Decoder> _type_converted_decoder;
size_t _current_value_idx = 0;
};
/**
* Format
* [header] [block 1] [block 2] ... [block N]
* Header
* [block size] [_mini_blocks_per_block] [_total_value_count] [first value]
* Block
* [min delta] [list of bitwidths of the mini blocks] [miniblocks]
*/
template <typename T>
class DeltaBitPackDecoder final : public DeltaDecoder {
public:
using UT = std::make_unsigned_t<T>;
DeltaBitPackDecoder(const tparquet::Type::type& physical_type)
: DeltaDecoder(new FixLengthPlainDecoder(physical_type)) {}
~DeltaBitPackDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) override {
size_t non_null_size = select_vector.num_values() - select_vector.num_nulls();
// decode values
_values.resize(non_null_size);
int decoded_count = 0;
RETURN_IF_ERROR(_get_internal(_values.data(), non_null_size, &decoded_count));
_data->data = reinterpret_cast<char*>(_values.data());
_type_length = sizeof(T);
_data->size = _values.size() * _type_length;
// set decoded value with fix plain decoder
init_values_converter();
return _type_converted_decoder->decode_values(doris_column, data_type, select_vector);
}
Status decode(T* buffer, int num_values, int* out_num_values) {
return _get_internal(buffer, num_values, out_num_values);
}
int valid_values_count() {
// _total_value_count in header ignores of null values
return static_cast<int>(_total_values_remaining);
}
void set_data(Slice* slice) override {
_bit_reader.reset(new BitReader((const uint8_t*)slice->data, slice->size));
Status st = _init_header();
if (st != Status::OK()) {
LOG(FATAL) << "Fail to init delta encoding header for " << st.to_string();
}
_data = slice;
_offset = 0;
}
// Set BitReader which is already initialized by DeltaLengthByteArrayDecoder or
// DeltaByteArrayDecoder
void set_bit_reader(std::shared_ptr<BitReader> bit_reader) {
_bit_reader = std::move(bit_reader);
Status st = _init_header();
if (st != Status::OK()) {
LOG(FATAL) << "Fail to init delta encoding header for " << st.to_string();
}
}
private:
static constexpr int kMaxDeltaBitWidth = static_cast<int>(sizeof(T) * 8);
Status _init_header();
Status _init_block();
Status _init_mini_block(int bit_width);
Status _get_internal(T* buffer, int max_values, int* out_num_values);
std::vector<T> _values;
std::shared_ptr<BitReader> _bit_reader;
uint32_t _values_per_block;
uint32_t _mini_blocks_per_block;
uint32_t _values_per_mini_block;
uint32_t _total_value_count;
T _min_delta;
T _last_value;
uint32_t _mini_block_idx;
std::vector<uint8_t> _delta_bit_widths;
int _delta_bit_width;
// If the page doesn't contain any block, `_block_initialized` will
// always be false. Otherwise, it will be true when first block initialized.
bool _block_initialized;
uint32_t _total_values_remaining;
// Remaining values in current mini block. If the current block is the last mini block,
// _values_remaining_current_mini_block may greater than _total_values_remaining.
uint32_t _values_remaining_current_mini_block;
};
template class DeltaBitPackDecoder<int32_t>;
template class DeltaBitPackDecoder<int64_t>;
class DeltaLengthByteArrayDecoder final : public DeltaDecoder {
public:
explicit DeltaLengthByteArrayDecoder(const tparquet::Type::type& physical_type)
: DeltaDecoder(nullptr),
_len_decoder(physical_type),
_buffered_length(0),
_buffered_data(0) {}
Status skip_values(size_t num_values) override {
_current_value_idx += num_values;
RETURN_IF_ERROR(_len_decoder.skip_values(num_values));
return Status::OK();
}
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) override {
size_t num_values = select_vector.num_values();
size_t null_count = select_vector.num_nulls();
// init read buffer
_values.resize(num_values - null_count);
int num_valid_values;
RETURN_IF_ERROR(_get_internal(_values.data(), num_values - null_count, &num_valid_values));
if (PREDICT_FALSE(num_values - null_count != num_valid_values)) {
return Status::IOError("Expected to decode {} values, but decoded {} values.",
num_values - null_count, num_valid_values);
}
return decode_byte_array(_values, doris_column, data_type, select_vector);
}
Status decode(Slice* buffer, int num_values, int* out_num_values) {
return _get_internal(buffer, num_values, out_num_values);
}
void set_data(Slice* slice) override {
if (slice->size == 0) {
return;
}
_bit_reader = std::make_shared<BitReader>((const uint8_t*)slice->data, slice->size);
_data = slice;
_offset = 0;
_decode_lengths();
}
void set_bit_reader(std::shared_ptr<BitReader> bit_reader) {
_bit_reader = std::move(bit_reader);
_decode_lengths();
}
private:
// Decode all the encoded lengths. The decoder_ will be at the start of the encoded data
// after that.
void _decode_lengths();
Status _get_internal(Slice* buffer, int max_values, int* out_num_values);
std::vector<Slice> _values;
std::shared_ptr<BitReader> _bit_reader;
DeltaBitPackDecoder<int32_t> _len_decoder;
int _num_valid_values;
uint32_t _length_idx;
std::vector<int32_t> _buffered_length;
std::vector<char> _buffered_data;
};
class DeltaByteArrayDecoder : public DeltaDecoder {
public:
explicit DeltaByteArrayDecoder(const tparquet::Type::type& physical_type)
: DeltaDecoder(nullptr),
_prefix_len_decoder(physical_type),
_suffix_decoder(physical_type),
_buffered_prefix_length(0),
_buffered_data(0) {}
Status skip_values(size_t num_values) override {
_current_value_idx += num_values;
RETURN_IF_ERROR(_prefix_len_decoder.skip_values(num_values));
RETURN_IF_ERROR(_suffix_decoder.skip_values(num_values));
return Status::OK();
}
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) override {
size_t num_values = select_vector.num_values();
size_t null_count = select_vector.num_nulls();
_values.resize(num_values - null_count);
int num_valid_values;
RETURN_IF_ERROR(_get_internal(_values.data(), num_values - null_count, &num_valid_values));
DCHECK_EQ(num_values - null_count, num_valid_values);
return decode_byte_array(_values, doris_column, data_type, select_vector);
}
void set_data(Slice* slice) override {
_bit_reader = std::make_shared<BitReader>((const uint8_t*)slice->data, slice->size);
_prefix_len_decoder.set_bit_reader(_bit_reader);
// get the number of encoded prefix lengths
int num_prefix = _prefix_len_decoder.valid_values_count();
// call _prefix_len_decoder.Decode to decode all the prefix lengths.
// all the prefix lengths are buffered in _buffered_prefix_length.
_buffered_prefix_length.resize(num_prefix);
int ret;
Status st = _prefix_len_decoder.decode(_buffered_prefix_length.data(), num_prefix, &ret);
if (st != Status::OK()) {
LOG(FATAL) << "Fail to decode delta prefix, status: " << st;
}
DCHECK_EQ(ret, num_prefix);
_prefix_len_offset = 0;
_num_valid_values = num_prefix;
// at this time, the decoder_ will be at the start of the encoded suffix data.
_suffix_decoder.set_bit_reader(_bit_reader);
// TODO: read corrupted files written with bug(PARQUET-246). _last_value should be set
// to _last_value_in_previous_page when decoding a new page(except the first page)
_last_value = "";
}
Status decode(Slice* buffer, int num_values, int* out_num_values) {
return _get_internal(buffer, num_values, out_num_values);
}
private:
Status _get_internal(Slice* buffer, int max_values, int* out_num_values);
std::vector<Slice> _values;
std::shared_ptr<BitReader> _bit_reader;
DeltaBitPackDecoder<int32_t> _prefix_len_decoder;
DeltaLengthByteArrayDecoder _suffix_decoder;
std::string _last_value;
// string buffer for last value in previous page
std::string _last_value_in_previous_page;
int _num_valid_values;
uint32_t _prefix_len_offset;
std::vector<int32_t> _buffered_prefix_length;
std::vector<char> _buffered_data;
};
} // namespace doris::vectorized

View File

@ -104,7 +104,7 @@ CREATE CATALOG hive PROPERTIES (
);
```
Or to connect to Hive data stored in JuiceFS:
Or to connect to Hive data stored on JuiceFS:
```sql
CREATE CATALOG hive PROPERTIES (
@ -117,6 +117,23 @@ CREATE CATALOG hive PROPERTIES (
);
```
Or to connect to Glue and data stored on S3:
```sql
CREATE CATALOG hive PROPERTIES (
"type"="hms",
"hive.metastore.type" = "glue",
"aws.region" = "us-east-1",
"aws.glue.access-key" = "ak",
"aws.glue.secret-key" = "sk",
"AWS_ENDPOINT" = "s3.us-east-1.amazonaws.com",
"AWS_REGION" = "us-east-1",
"AWS_ACCESS_KEY" = "ak",
"AWS_SECRET_KEY" = "sk",
"use_path_style" = "true"
);
```
<version since="dev">
when connecting to Hive Metastore which is authorized by Ranger, need some properties and update FE runtime environment.

View File

@ -61,7 +61,7 @@ Access metadata with the iceberg API. The Hive, REST, Glue and other services ca
</version>
- Using Iceberg Hive Catalog
#### Using Iceberg Hive Catalog
```sql
CREATE CATALOG iceberg PROPERTIES (
@ -77,7 +77,7 @@ CREATE CATALOG iceberg PROPERTIES (
);
```
- Using Iceberg Glue Catalog
#### Using Iceberg Glue Catalog
```sql
CREATE CATALOG glue PROPERTIES (
@ -97,6 +97,8 @@ CREATE CATALOG glue PROPERTIES (
`warehouse`: Glue Warehouse Location. To determine the root path of the data warehouse in storage.
The other properties can refer to [Iceberg Glue Catalog](https://iceberg.apache.org/docs/latest/aws/#glue-catalog)
- Using Iceberg REST Catalog
RESTful service as the server side. Implementing RESTCatalog interface of iceberg to obtain metadata.

View File

@ -113,6 +113,23 @@ CREATE CATALOG hive PROPERTIES (
);
```
hive元数据存储在Glue,数据存储在S3,示例如下:
```sql
CREATE CATALOG hive PROPERTIES (
"type"="hms",
"hive.metastore.type" = "glue",
"aws.region" = "us-east-1",
"aws.glue.access-key" = "ak",
"aws.glue.secret-key" = "sk",
"AWS_ENDPOINT" = "s3.us-east-1.amazonaws.com",
"AWS_REGION" = "us-east-1",
"AWS_ACCESS_KEY" = "ak",
"AWS_SECRET_KEY" = "sk",
"use_path_style" = "true"
);
```
<version since="dev">
连接开启 Ranger 权限校验的 Hive Metastore 需要增加配置 & 配置环境:

View File

@ -59,7 +59,7 @@ CREATE CATALOG iceberg PROPERTIES (
</version>
- Hive Metastore作为元数据服务
#### Hive Metastore作为元数据服务
```sql
CREATE CATALOG iceberg PROPERTIES (
@ -75,7 +75,7 @@ CREATE CATALOG iceberg PROPERTIES (
);
```
- Glue Catalog作为元数据服务
#### Glue Catalog作为元数据服务
```sql
CREATE CATALOG glue PROPERTIES (
@ -95,6 +95,8 @@ CREATE CATALOG glue PROPERTIES (
`warehouse`: Glue Warehouse Location. Glue Catalog的根路径,用于指定数据存放位置。
属性详情参见 [Iceberg Glue Catalog](https://iceberg.apache.org/docs/latest/aws/#glue-catalog)
- REST Catalog作为元数据服务
该方式需要预先提供REST服务,用户需实现获取Iceberg元数据的REST接口。

View File

@ -0,0 +1,131 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !q01 --
2967
3158
15505
20726
21843
-- !q02 --
1809714008
1979816070
2147483647
2147483647
2147483647
-- !q03 --
\N
15821
\N
\N
\N
-- !q04 --
1604.7639
1583.2013
1031.6219
1295.7802
182.5588
-- !q05 --
1937.7762425702406
992.21123681735253
56.682069922520562
940.70481552186243
1876.4831949153224
-- !q06 --
2023-03-07 20:34:59
2023-03-07 20:34:59
2023-03-07 20:34:59
2023-03-07 20:34:59
2023-03-07 20:34:59
2023-03-07 20:34:59
2023-03-07 20:34:59
2023-03-07 20:35
2023-03-07 20:35
2023-03-07 20:35
2023-03-07 20:35
2023-03-07 20:35
2023-03-07 20:35
2023-03-07 20:35
2023-03-07 20:35
2023-03-07 20:35
2023-03-07 20:35
2023-03-07 20:35
2023-03-07 20:35
2023-03-07 20:35
-- !q07 --
6f77a7baae184d
88
fbbf69fc81374
f14889
33d471ce
-- !q08 --
c2b69a82f074e4f
81b1152fa774b8
73df8eaccf
2ed59df3c824dc78b
5e3e98e07e
-- !q09 --
dfaac2a43
28c5f21b8
a20faeee91e34ce
b5e6bf2b5
8bc56e
-- !q10 --
true
true
false
false
false
-- !q11 --
54078 8184
122067 9731
140902 170
143594 5714
170289 4567
175294 1959
202483 857
222664 6449
230156 2480
266339 6845
-- !q12 --
\N
\N
\N
-- !q13 --
27
27
34
50
59
97
99
101
107
114
-- !q14 --
dfaac2a43
28c5f21b8
a20faeee91e34ce
b5e6bf2b5
8bc56e
-- !q15 --
5000
-- !q16 --
2023-03-07 20:35:59
2023-03-07 20:35:59
2023-03-07 20:35:59
2023-03-07 20:35:59
2023-03-07 20:35:59

View File

@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_external_catalog_glue_table", "p2") {
String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost")
String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort")
sql """drop catalog if exists test_external_catalog_glue;"""
sql """
create catalog if not exists test_external_catalog_glue properties (
'type'='hms',
'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
);
"""
sql """switch test_external_catalog_glue;"""
def q01 = {
qt_q01 """ select glue_int from iceberg_glue_types order by glue_int limit 5 """
qt_q02 """ select glue_bigint from iceberg_glue_types order by glue_int limit 5 """
qt_q03 """ select glue_smallint from iceberg_glue_types order by glue_int limit 5 """
qt_q04 """ select glue_decimal from iceberg_glue_types order by glue_int limit 5 """
qt_q05 """ select glue_double from iceberg_glue_types order by glue_int limit 5 """
qt_q06 """ select glue_timstamp from iceberg_glue_types order by glue_timstamp limit 20 """
qt_q07 """ select glue_char from iceberg_glue_types order by glue_int limit 5 """
qt_q08 """ select glue_varchar from iceberg_glue_types order by glue_int limit 5 """
qt_q09 """ select glue_string from iceberg_glue_types order by glue_int limit 5 """
qt_q10 """ select glue_bool from iceberg_glue_types order by glue_int limit 5 """
qt_q11 """ select glue_int,glue_smallint from iceberg_glue_types where glue_int > 2000 and glue_smallint < 10000 order by glue_int limit 10 """
qt_q12 """ select glue_smallint from iceberg_glue_types where glue_smallint is null order by glue_smallint limit 3 """
qt_q13 """ select glue_smallint from iceberg_glue_types where glue_smallint is not null order by glue_smallint limit 10 """
qt_q14 """ select glue_string from iceberg_glue_types where glue_string>'040abff1da4748e4b' order by glue_int limit 5 """
qt_q15 """ select count(1) from iceberg_glue_types """
qt_q16 """ select glue_timstamp from iceberg_glue_types where glue_timstamp > '2023-03-07 20:35:59' order by glue_timstamp limit 5 """
}
sql """ use `iceberg_catalog`; """
q01()
}
}