[feature-wip](parquet-reader) parquet dictionary decoder (#11981)

Parse parquet data with dictionary encoding.

Using the PLAIN_DICTIONARY enum value is deprecated in the Parquet 2.0 specification.
Prefer using RLE_DICTIONARY in a data page and PLAIN in a dictionary page for Parquet 2.0+ files.
refer: https://github.com/apache/parquet-format/blob/master/Encodings.md
This commit is contained in:
Ashin Gau
2022-08-26 19:24:37 +08:00
committed by GitHub
parent 818afa4c66
commit 0b5bb565a7
9 changed files with 657 additions and 366 deletions

View File

@ -568,4 +568,295 @@ inline void RleEncoder<T>::Clear() {
bit_writer_.Clear();
}
// Copy from https://github.com/apache/impala/blob/master/be/src/util/rle-encoding.h
// Utility classes to do run length encoding (RLE) for fixed bit width values. If runs
// are sufficiently long, RLE is used, otherwise, the values are just bit-packed
// (literal encoding).
//
// For both types of runs, there is a byte-aligned indicator which encodes the length
// of the run and the type of the run.
//
// This encoding has the benefit that when there aren't any long enough runs, values
// are always decoded at fixed (can be precomputed) bit offsets OR both the value and
// the run length are byte aligned. This allows for very efficient decoding
// implementations.
// The encoding is:
// encoded-block := run*
// run := literal-run | repeated-run
// literal-run := literal-indicator < literal bytes >
// repeated-run := repeated-indicator < repeated value. padded to byte boundary >
// literal-indicator := varint_encode( number_of_groups << 1 | 1)
// repeated-indicator := varint_encode( number_of_repetitions << 1 )
//
// Each run is preceded by a varint. The varint's least significant bit is
// used to indicate whether the run is a literal run or a repeated run. The rest
// of the varint is used to determine the length of the run (eg how many times the
// value repeats).
//
// In the case of literal runs, the run length is always a multiple of 8 (i.e. encode
// in groups of 8), so that no matter the bit-width of the value, the sequence will end
// on a byte boundary without padding.
// Given that we know it is a multiple of 8, we store the number of 8-groups rather than
// the actual number of encoded ints. (This means that the total number of encoded values
// can not be determined from the encoded data, since the number of values in the last
// group may not be a multiple of 8). For the last group of literal runs, we pad
// the group to 8 with zeros. This allows for 8 at a time decoding on the read side
// without the need for additional checks.
//
// There is a break-even point when it is more storage efficient to do run length
// encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes
// for both the repeated encoding or the literal encoding. This value can always
// be computed based on the bit-width.
// TODO: For 1 bit-width values it can be optimal to use 16 or 24 values, but more
// investigation is needed to do this efficiently, see the reverted IMPALA-6658.
// TODO: think about how to use this for strings. The bit packing isn't quite the same.
//
// Examples with bit-width 1 (eg encoding booleans):
// ----------------------------------------
// 100 1s followed by 100 0s:
// <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte>
// - (total 4 bytes)
//
// alternating 1s and 0s (200 total):
// 200 ints = 25 groups of 8
// <varint((25 << 1) | 1)> <25 bytes of values, bitpacked>
// (total 26 bytes, 1 byte overhead)
// RLE decoder with a batch-oriented interface that enables fast decoding.
// Users of this class must first initialize the class to point to a buffer of
// RLE-encoded data, passed into the constructor or Reset(). The provided
// bit_width must be at most min(sizeof(T) * 8, BatchedBitReader::MAX_BITWIDTH).
// Then they can decode data by checking NextNumRepeats()/NextNumLiterals() to
// see if the next run is a repeated or literal run, then calling
// GetRepeatedValue() or GetLiteralValues() respectively to read the values.
//
// End-of-input is signalled by NextNumRepeats() == NextNumLiterals() == 0.
// Other decoding errors are signalled by functions returning false. If an
// error is encountered then it is not valid to read any more data until
// Reset() is called.
template <typename T>
class RleBatchDecoder {
public:
RleBatchDecoder(uint8_t* buffer, int buffer_len, int bit_width) {
Reset(buffer, buffer_len, bit_width);
}
RleBatchDecoder() = default;
// Reset the decoder to read from a new buffer.
void Reset(uint8_t* buffer, int buffer_len, int bit_width);
// Return the size of the current repeated run. Returns zero if the current run is
// a literal run or if no more runs can be read from the input.
int32_t NextNumRepeats();
// Get the value of the current repeated run and consume the given number of repeats.
// Only valid to call when NextNumRepeats() > 0. The given number of repeats cannot
// be greater than the remaining number of repeats in the run. 'num_repeats_to_consume'
// can be set to 0 to peek at the value without consuming repeats.
T GetRepeatedValue(int32_t num_repeats_to_consume);
// Return the size of the current literal run. Returns zero if the current run is
// a repeated run or if no more runs can be read from the input.
int32_t NextNumLiterals();
// Consume 'num_literals_to_consume' literals from the current literal run,
// copying the values to 'values'. 'num_literals_to_consume' must be <=
// NextNumLiterals(). Returns true if the requested number of literals were
// successfully read or false if an error was encountered, e.g. the input was
// truncated.
bool GetLiteralValues(int32_t num_literals_to_consume, T* values) WARN_UNUSED_RESULT;
// Consume 'num_values_to_consume' values and copy them to 'values'.
// Returns the number of consumed values or 0 if an error occurred.
int32_t GetBatch(T* values, int32_t batch_num);
private:
// Called when both 'literal_count_' and 'repeat_count_' have been exhausted.
// Sets either 'literal_count_' or 'repeat_count_' to the size of the next literal
// or repeated run, or leaves both at 0 if no more values can be read (either because
// the end of the input was reached or an error was encountered decoding).
void NextCounts();
/// Fill the literal buffer. Invalid to call if there are already buffered literals.
/// Return false if the input was truncated. This does not advance 'literal_count_'.
bool FillLiteralBuffer() WARN_UNUSED_RESULT;
bool HaveBufferedLiterals() const { return literal_buffer_pos_ < num_buffered_literals_; }
/// Output buffered literals, advancing 'literal_buffer_pos_' and decrementing
/// 'literal_count_'. Returns the number of literals outputted.
int32_t OutputBufferedLiterals(int32_t max_to_output, T* values);
BatchedBitReader bit_reader_;
// Number of bits needed to encode the value. Must be between 0 and 64 after
// the decoder is initialized with a buffer. -1 indicates the decoder was not
// initialized.
int bit_width_ = -1;
// If a repeated run, the number of repeats remaining in the current run to be read.
// If the current run is a literal run, this is 0.
int32_t repeat_count_ = 0;
// If a literal run, the number of literals remaining in the current run to be read.
// If the current run is a repeated run, this is 0.
int32_t literal_count_ = 0;
// If a repeated run, the current repeated value.
T repeated_value_;
// Size of buffer for literal values. Large enough to decode a full batch of 32
// literals. The buffer is needed to allow clients to read in batches that are not
// multiples of 32.
static constexpr int LITERAL_BUFFER_LEN = 32;
// Buffer containing 'num_buffered_literals_' values. 'literal_buffer_pos_' is the
// position of the next literal to be read from the buffer.
T literal_buffer_[LITERAL_BUFFER_LEN];
int num_buffered_literals_ = 0;
int literal_buffer_pos_ = 0;
};
template <typename T>
inline int32_t RleBatchDecoder<T>::OutputBufferedLiterals(int32_t max_to_output, T* values) {
int32_t num_to_output =
std::min<int32_t>(max_to_output, num_buffered_literals_ - literal_buffer_pos_);
memcpy(values, &literal_buffer_[literal_buffer_pos_], sizeof(T) * num_to_output);
literal_buffer_pos_ += num_to_output;
literal_count_ -= num_to_output;
return num_to_output;
}
template <typename T>
inline void RleBatchDecoder<T>::Reset(uint8_t* buffer, int buffer_len, int bit_width) {
bit_reader_.Reset(buffer, buffer_len);
bit_width_ = bit_width;
repeat_count_ = 0;
literal_count_ = 0;
num_buffered_literals_ = 0;
literal_buffer_pos_ = 0;
}
template <typename T>
inline int32_t RleBatchDecoder<T>::NextNumRepeats() {
if (repeat_count_ > 0) return repeat_count_;
if (literal_count_ == 0) NextCounts();
return repeat_count_;
}
template <typename T>
inline void RleBatchDecoder<T>::NextCounts() {
// Read the next run's indicator int, it could be a literal or repeated run.
// The int is encoded as a ULEB128-encoded value.
uint32_t indicator_value = 0;
if (UNLIKELY(!bit_reader_.GetUleb128<uint32_t>(&indicator_value))) {
return;
}
// lsb indicates if it is a literal run or repeated run
bool is_literal = indicator_value & 1;
// Don't try to handle run lengths that don't fit in an int32_t - just fail gracefully.
// The Parquet standard does not allow longer runs - see PARQUET-1290.
uint32_t run_len = indicator_value >> 1;
if (is_literal) {
// Use int64_t to avoid overflowing multiplication.
int64_t literal_count = static_cast<int64_t>(run_len) * 8;
if (UNLIKELY(literal_count > std::numeric_limits<int32_t>::max())) return;
literal_count_ = literal_count;
} else {
if (UNLIKELY(run_len == 0)) return;
bool result = bit_reader_.GetBytes<T>(BitUtil::Ceil(bit_width_, 8), &repeated_value_);
if (UNLIKELY(!result)) return;
repeat_count_ = run_len;
}
}
template <typename T>
inline T RleBatchDecoder<T>::GetRepeatedValue(int32_t num_repeats_to_consume) {
repeat_count_ -= num_repeats_to_consume;
return repeated_value_;
}
template <typename T>
inline int32_t RleBatchDecoder<T>::NextNumLiterals() {
if (literal_count_ > 0) return literal_count_;
if (repeat_count_ == 0) NextCounts();
return literal_count_;
}
template <typename T>
inline bool RleBatchDecoder<T>::GetLiteralValues(int32_t num_literals_to_consume, T* values) {
int32_t num_consumed = 0;
// Copy any buffered literals left over from previous calls.
if (HaveBufferedLiterals()) {
num_consumed = OutputBufferedLiterals(num_literals_to_consume, values);
}
int32_t num_remaining = num_literals_to_consume - num_consumed;
// Copy literals directly to the output, bypassing 'literal_buffer_' when possible.
// Need to round to a batch of 32 if the caller is consuming only part of the current
// run avoid ending on a non-byte boundary.
int32_t num_to_bypass =
std::min<int32_t>(literal_count_, BitUtil::RoundDownToPowerOf2(num_remaining, 32));
if (num_to_bypass > 0) {
int num_read = bit_reader_.UnpackBatch(bit_width_, num_to_bypass, values + num_consumed);
// If we couldn't read the expected number, that means the input was truncated.
if (num_read < num_to_bypass) return false;
literal_count_ -= num_to_bypass;
num_consumed += num_to_bypass;
num_remaining = num_literals_to_consume - num_consumed;
}
if (num_remaining > 0) {
// We weren't able to copy all the literals requested directly from the input.
// Buffer literals and copy over the requested number.
if (UNLIKELY(!FillLiteralBuffer())) return false;
OutputBufferedLiterals(num_remaining, values + num_consumed);
}
return true;
}
template <typename T>
inline bool RleBatchDecoder<T>::FillLiteralBuffer() {
int32_t num_to_buffer = std::min<int32_t>(LITERAL_BUFFER_LEN, literal_count_);
num_buffered_literals_ = bit_reader_.UnpackBatch(bit_width_, num_to_buffer, literal_buffer_);
// If we couldn't read the expected number, that means the input was truncated.
if (UNLIKELY(num_buffered_literals_ < num_to_buffer)) return false;
literal_buffer_pos_ = 0;
return true;
}
template <typename T>
inline int32_t RleBatchDecoder<T>::GetBatch(T* values, int32_t batch_num) {
int32_t num_consumed = 0;
while (num_consumed < batch_num) {
// Add RLE encoded values by repeating the current value this number of times.
int32_t num_repeats = NextNumRepeats();
if (num_repeats > 0) {
int32_t num_repeats_to_set = std::min(num_repeats, batch_num - num_consumed);
T repeated_value = GetRepeatedValue(num_repeats_to_set);
for (int i = 0; i < num_repeats_to_set; ++i) {
values[num_consumed + i] = repeated_value;
}
num_consumed += num_repeats_to_set;
continue;
}
// Add remaining literal values, if any.
int32_t num_literals = NextNumLiterals();
if (num_literals == 0) {
break;
}
int32_t num_literals_to_set = std::min(num_literals, batch_num - num_consumed);
if (!GetLiteralValues(num_literals_to_set, values + num_consumed)) {
return 0;
}
num_consumed += num_literals_to_set;
}
return num_consumed;
}
} // namespace doris

View File

@ -44,12 +44,16 @@ Status Decoder::get_decoder(tparquet::Type::type type, tparquet::Encoding::type
std::unique_ptr<Decoder>& decoder) {
switch (encoding) {
case tparquet::Encoding::PLAIN:
case tparquet::Encoding::RLE_DICTIONARY:
switch (type) {
case tparquet::Type::BOOLEAN:
if (encoding != tparquet::Encoding::PLAIN) {
return Status::InternalError("Bool type can't has dictionary page");
}
decoder.reset(new BoolPlainDecoder());
break;
case tparquet::Type::BYTE_ARRAY:
decoder.reset(new ByteArrayPlainDecoder());
decoder.reset(new ByteArrayDecoder());
break;
case tparquet::Type::INT32:
case tparquet::Type::INT64:
@ -57,14 +61,12 @@ Status Decoder::get_decoder(tparquet::Type::type type, tparquet::Encoding::type
case tparquet::Type::FLOAT:
case tparquet::Type::DOUBLE:
case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
decoder.reset(new PlainDecoder(type));
decoder.reset(new FixLengthDecoder(type));
break;
default:
return Status::InternalError("Unsupported plain type {} in parquet decoder",
return Status::InternalError("Unsupported type {} in parquet decoder",
tparquet::to_string(type));
}
case tparquet::Encoding::RLE_DICTIONARY:
break;
default:
return Status::InternalError("Unsupported encoding {} in parquet decoder",
tparquet::to_string(encoding));
@ -118,39 +120,55 @@ Status Decoder::decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, s
return decode_values(data_column, data_type, num_values);
}
Status PlainDecoder::decode_values(Slice& slice, size_t num_values) {
size_t to_read_bytes = _type_length * num_values;
if (UNLIKELY(to_read_bytes > slice.size)) {
return Status::IOError("Slice does not have enough space to write out the decoding data");
}
memcpy(slice.data, _data->data + _offset, to_read_bytes);
_offset += to_read_bytes;
Status FixLengthDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, size_t dict_size) {
_has_dict = true;
_dict = std::move(dict);
return Status::OK();
}
Status PlainDecoder::skip_values(size_t num_values) {
_offset += _type_length * num_values;
if (UNLIKELY(_offset > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
void FixLengthDecoder::set_data(Slice* data) {
_data = data;
_offset = 0;
if (_has_dict) {
uint8_t bit_width = *data->data;
_index_batch_decoder.reset(
new RleBatchDecoder<uint32_t>(reinterpret_cast<uint8_t*>(data->data) + 1,
static_cast<int>(data->size) - 1, bit_width));
}
}
Status FixLengthDecoder::skip_values(size_t num_values) {
if (_has_dict) {
_indexes.resize(num_values);
_index_batch_decoder->GetBatch(&_indexes[0], num_values);
} else {
_offset += _type_length * num_values;
if (UNLIKELY(_offset > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
}
}
return Status::OK();
}
Status PlainDecoder::_decode_short_int(MutableColumnPtr& doris_column, size_t num_values,
size_t real_length) {
Status FixLengthDecoder::_decode_short_int(MutableColumnPtr& doris_column, size_t num_values,
size_t real_length) {
if (UNLIKELY(_physical_type != tparquet::Type::INT32)) {
return Status::InternalError("Short int can only be decoded from INT32");
}
for (int i = 0; i < num_values; ++i) {
doris_column->insert_data(_data->data + _offset, real_length);
_offset += _type_length;
char* buf_start = _FIXED_GET_DATA_OFFSET(i);
doris_column->insert_data(buf_start, real_length);
_FIXED_SHIFT_DATA_OFFSET();
}
return Status::OK();
}
Status PlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) {
if (UNLIKELY(_offset + _type_length * num_values > _data->size)) {
Status FixLengthDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) {
if (_has_dict) {
_indexes.resize(num_values);
_index_batch_decoder->GetBatch(&_indexes[0], num_values);
} else if (UNLIKELY(_offset + _type_length * num_values > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
}
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
@ -228,8 +246,9 @@ Status PlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr&
case TypeIndex::FixedString:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
for (int i = 0; i < num_values; ++i) {
doris_column->insert_data(_data->data + _offset, _type_length);
_offset += _type_length;
char* buf_start = _FIXED_GET_DATA_OFFSET(i);
doris_column->insert_data(buf_start, _type_length);
_FIXED_SHIFT_DATA_OFFSET();
}
return Status::OK();
}
@ -243,48 +262,37 @@ Status PlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr&
getTypeName(data_type->get_type_id()));
}
Status ByteArrayPlainDecoder::decode_values(Slice& slice, size_t num_values) {
uint32_t slice_offset = 0;
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
uint32_t length =
decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
return Status::IOError("Can't read enough bytes in plain decoder");
}
memcpy(slice.data + slice_offset, _data->data + _offset, length);
slice_offset += length + 1;
slice.data[slice_offset - 1] = '\0';
_offset += length;
Status ByteArrayDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, size_t dict_size) {
_has_dict = true;
_dict = std::move(dict);
_dict_offsets.resize(dict_size + 1);
uint32_t offset_cursor = 0;
for (int i = 0; i < dict_size; ++i) {
uint32_t length = decode_fixed32_le(_dict.get() + offset_cursor);
offset_cursor += 4;
_dict_offsets[i] = offset_cursor;
offset_cursor += length;
}
_dict_offsets[dict_size] = offset_cursor + 4;
return Status::OK();
}
Status ByteArrayPlainDecoder::skip_values(size_t num_values) {
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
uint32_t length =
decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
return Status::IOError("Can't skip enough bytes in plain decoder");
}
_offset += length;
void ByteArrayDecoder::set_data(Slice* data) {
_data = data;
_offset = 0;
if (_has_dict) {
uint8_t bit_width = *data->data;
_index_batch_decoder.reset(
new RleBatchDecoder<uint32_t>(reinterpret_cast<uint8_t*>(data->data) + 1,
static_cast<int>(data->size) - 1, bit_width));
}
return Status::OK();
}
Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) {
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
case TypeIndex::String:
case TypeIndex::FixedString:
Status ByteArrayDecoder::skip_values(size_t num_values) {
if (_has_dict) {
_indexes.resize(num_values);
_index_batch_decoder->GetBatch(&_indexes[0], num_values);
} else {
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
@ -293,11 +301,44 @@ Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, Data
decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
return Status::IOError("Can't read enough bytes in plain decoder");
return Status::IOError("Can't skip enough bytes in plain decoder");
}
doris_column->insert_data(_data->data + _offset, length);
_offset += length;
}
}
return Status::OK();
}
Status ByteArrayDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) {
if (_has_dict) {
_indexes.resize(num_values);
_index_batch_decoder->GetBatch(&_indexes[0], num_values);
}
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
case TypeIndex::String:
case TypeIndex::FixedString:
for (int i = 0; i < num_values; ++i) {
if (_has_dict) {
uint32_t idx = _indexes[i];
uint32_t idx_cursor = _dict_offsets[idx];
char* buff_start = reinterpret_cast<char*>(_dict.get() + idx_cursor);
doris_column->insert_data(buff_start, _dict_offsets[idx + 1] - idx_cursor - 4);
} else {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
uint32_t length =
decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
return Status::IOError("Can't read enough bytes in plain decoder");
}
doris_column->insert_data(_data->data + _offset, length);
_offset += length;
}
}
return Status::OK();
case TypeIndex::Decimal32:
return _decode_binary_decimal<Int32>(doris_column, data_type, num_values);
@ -313,17 +354,6 @@ Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, Data
getTypeName(data_type->get_type_id()));
}
Status BoolPlainDecoder::decode_values(Slice& slice, size_t num_values) {
bool value;
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(!_decode_value(&value))) {
return Status::IOError("Can't read enough booleans in plain decoder");
}
slice.data[i] = value ? 1 : 0;
}
return Status::OK();
}
Status BoolPlainDecoder::skip_values(size_t num_values) {
int skip_cached = std::min(num_unpacked_values_ - unpacked_value_idx_, (int)num_values);
unpacked_value_idx_ += skip_cached;

View File

@ -25,6 +25,7 @@
#include "schema_desc.h"
#include "util/bit_stream_utils.inline.h"
#include "util/coding.h"
#include "util/rle_encoding.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
@ -110,10 +111,12 @@ public:
virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) = 0;
virtual Status decode_values(Slice& slice, size_t num_values) = 0;
virtual Status skip_values(size_t num_values) = 0;
virtual Status set_dict(std::unique_ptr<uint8_t[]>& dict, size_t dict_size) {
return Status::NotSupported("set_dict is not supported");
}
protected:
int32_t _type_length;
Slice* _data = nullptr;
@ -146,33 +149,25 @@ void Decoder::init_decimal_converter(DataTypePtr& data_type) {
}
}
class PlainDecoder final : public Decoder {
class FixLengthDecoder final : public Decoder {
public:
PlainDecoder(tparquet::Type::type physical_type) : _physical_type(physical_type) {};
~PlainDecoder() override = default;
FixLengthDecoder(tparquet::Type::type physical_type) : _physical_type(physical_type) {};
~FixLengthDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) override;
Status decode_values(Slice& slice, size_t num_values) override;
Status skip_values(size_t num_values) override;
Status set_dict(std::unique_ptr<uint8_t[]>& dict, size_t dict_size) override;
void set_data(Slice* data) override;
protected:
Status _decode_short_int(MutableColumnPtr& doris_column, size_t num_values, size_t real_length);
template <typename Numeric>
Status _decode_numeric(MutableColumnPtr& doris_column, size_t num_values) {
size_t to_read_bytes = _type_length * num_values;
if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
}
auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
const auto* raw_data = reinterpret_cast<const Numeric*>(_data->data + _offset);
column_data.insert(raw_data, raw_data + num_values);
_offset += to_read_bytes;
return Status::OK();
}
Status _decode_numeric(MutableColumnPtr& doris_column, size_t num_values);
template <typename CppType, typename ColumnType>
Status _decode_date(MutableColumnPtr& doris_column, TypeIndex& logical_type, size_t num_values);
@ -193,16 +188,44 @@ protected:
Status _decode_primitive_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values);
#define _FIXED_GET_DATA_OFFSET(index) \
_has_dict ? reinterpret_cast<char*>(_dict.get() + _indexes[index] * _type_length) \
: _data->data + _offset
#define _FIXED_SHIFT_DATA_OFFSET() \
if (!_has_dict) _offset += _type_length
tparquet::Type::type _physical_type;
// For dictionary encoding
bool _has_dict = false;
std::unique_ptr<uint8_t[]> _dict = nullptr;
std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
std::vector<uint32_t> _indexes;
};
template <typename Numeric>
Status FixLengthDecoder::_decode_numeric(MutableColumnPtr& doris_column, size_t num_values) {
if (_has_dict) {
for (int i = 0; i < num_values; ++i) {
char* buf_start = _FIXED_GET_DATA_OFFSET(i);
doris_column->insert_data(buf_start, _type_length);
}
} else {
auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
const auto* raw_data = reinterpret_cast<const Numeric*>(_data->data + _offset);
column_data.insert(raw_data, raw_data + num_values);
_offset += _type_length * num_values;
}
return Status::OK();
}
template <typename CppType, typename ColumnType>
Status PlainDecoder::_decode_date(MutableColumnPtr& doris_column, TypeIndex& logical_type,
size_t num_values) {
Status FixLengthDecoder::_decode_date(MutableColumnPtr& doris_column, TypeIndex& logical_type,
size_t num_values) {
auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
for (int i = 0; i < num_values; ++i) {
int64_t date_value =
static_cast<int64_t>(*reinterpret_cast<int32_t*>(_data->data + _offset));
char* buf_start = _FIXED_GET_DATA_OFFSET(i);
int64_t date_value = static_cast<int64_t>(*reinterpret_cast<int32_t*>(buf_start));
CppType v;
v.from_unixtime(date_value * 24 * 60 * 60, *_decode_params->ctz); // day to seconds
if constexpr (std::is_same_v<CppType, VecDateTimeValue>) {
@ -211,17 +234,18 @@ Status PlainDecoder::_decode_date(MutableColumnPtr& doris_column, TypeIndex& log
}
ColumnType& cast_value = *reinterpret_cast<ColumnType*>(&v);
column_data.emplace_back(cast_value);
_offset += _type_length;
_FIXED_SHIFT_DATA_OFFSET();
}
return Status::OK();
}
template <typename CppType, typename ColumnType>
Status PlainDecoder::_decode_datetime64(MutableColumnPtr& doris_column, TypeIndex& logical_type,
size_t num_values) {
Status FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column, TypeIndex& logical_type,
size_t num_values) {
auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
for (int i = 0; i < num_values; i++) {
int64_t& date_value = *reinterpret_cast<int64_t*>(_data->data + _offset);
char* buf_start = _FIXED_GET_DATA_OFFSET(i);
int64_t& date_value = *reinterpret_cast<int64_t*>(buf_start);
CppType v;
v.from_unixtime(date_value / _decode_params->second_mask, *_decode_params->ctz);
if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) {
@ -231,17 +255,18 @@ Status PlainDecoder::_decode_datetime64(MutableColumnPtr& doris_column, TypeInde
}
ColumnType& cast_value = *reinterpret_cast<ColumnType*>(&v);
column_data.emplace_back(cast_value);
_offset += _type_length;
_FIXED_SHIFT_DATA_OFFSET();
}
return Status::OK();
}
template <typename CppType, typename ColumnType>
Status PlainDecoder::_decode_datetime96(MutableColumnPtr& doris_column, TypeIndex& logical_type,
size_t num_values) {
Status FixLengthDecoder::_decode_datetime96(MutableColumnPtr& doris_column, TypeIndex& logical_type,
size_t num_values) {
auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
for (int i = 0; i < num_values; ++i) {
ParquetInt96& datetime96 = *reinterpret_cast<ParquetInt96*>(_data->data + _offset);
char* buf_start = _FIXED_GET_DATA_OFFSET(i);
ParquetInt96& datetime96 = *reinterpret_cast<ParquetInt96*>(buf_start);
CppType v;
int64_t micros = datetime96.to_timestamp_micros();
v.from_unixtime(micros / 1000000, *_decode_params->ctz);
@ -252,20 +277,20 @@ Status PlainDecoder::_decode_datetime96(MutableColumnPtr& doris_column, TypeInde
}
ColumnType& cast_value = *reinterpret_cast<ColumnType*>(&v);
column_data.emplace_back(cast_value);
_offset += _type_length;
_FIXED_SHIFT_DATA_OFFSET();
}
return Status::OK();
}
template <typename DecimalPrimitiveType>
Status PlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) {
Status FixLengthDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
DataTypePtr& data_type, size_t num_values) {
init_decimal_converter<DecimalPrimitiveType>(data_type);
auto& column_data =
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
for (int i = 0; i < num_values; ++i) {
char* buf_start = _data->data + _offset;
char* buf_start = _FIXED_GET_DATA_OFFSET(i);
// When Decimal in parquet is stored in byte arrays, binary and fixed,
// the unscaled number must be encoded as two's complement using big-endian byte order.
Int128 value = buf_start[0] & 0x80 ? -1 : 0;
@ -279,21 +304,22 @@ Status PlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column, Data
}
DecimalPrimitiveType cast_value(value);
column_data.emplace_back(*reinterpret_cast<Decimal<DecimalPrimitiveType>*>(&cast_value));
_offset += _type_length;
_FIXED_SHIFT_DATA_OFFSET();
}
return Status::OK();
}
template <typename DecimalPrimitiveType, typename DecimalPhysicalType>
Status PlainDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_column,
DataTypePtr& data_type, size_t num_values) {
Status FixLengthDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_column,
DataTypePtr& data_type, size_t num_values) {
init_decimal_converter<DecimalPrimitiveType>(data_type);
auto& column_data =
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
for (int i = 0; i < num_values; ++i) {
char* buf_start = _FIXED_GET_DATA_OFFSET(i);
// we should use decimal128 to scale up/down
Int128 value = *reinterpret_cast<DecimalPhysicalType*>(_data->data + _offset);
Int128 value = *reinterpret_cast<DecimalPhysicalType*>(buf_start);
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
value *= scale_params.scale_factor;
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
@ -301,44 +327,62 @@ Status PlainDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_column,
}
DecimalPrimitiveType cast_value(value);
column_data.emplace_back(*reinterpret_cast<Decimal<DecimalPrimitiveType>*>(&cast_value));
_offset += _type_length;
_FIXED_SHIFT_DATA_OFFSET();
}
return Status::OK();
}
class ByteArrayPlainDecoder final : public Decoder {
class ByteArrayDecoder final : public Decoder {
public:
ByteArrayPlainDecoder() = default;
~ByteArrayPlainDecoder() override = default;
ByteArrayDecoder() = default;
~ByteArrayDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) override;
Status decode_values(Slice& slice, size_t num_values) override;
Status skip_values(size_t num_values) override;
void set_data(Slice* data) override;
Status set_dict(std::unique_ptr<uint8_t[]>& dict, size_t dict_size) override;
protected:
template <typename DecimalPrimitiveType>
Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values);
// For dictionary encoding
bool _has_dict = false;
std::unique_ptr<uint8_t[]> _dict = nullptr;
std::vector<uint32_t> _dict_offsets;
std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
std::vector<uint32_t> _indexes;
};
template <typename DecimalPrimitiveType>
Status ByteArrayPlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
DataTypePtr& data_type, size_t num_values) {
Status ByteArrayDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
DataTypePtr& data_type, size_t num_values) {
init_decimal_converter<DecimalPrimitiveType>(data_type);
auto& column_data =
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
char* buf_start;
uint32_t length;
if (_has_dict) {
uint32_t idx = _indexes[i];
uint32_t idx_cursor = _dict_offsets[idx];
buf_start = reinterpret_cast<char*>(_dict.get() + idx_cursor);
length = _dict_offsets[idx + 1] - idx_cursor - 4;
} else {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
length = decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
buf_start = _data->data + _offset;
_offset += length;
}
uint32_t length =
decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
char* buf_start = _data->data + _offset;
// When Decimal in parquet is stored in byte arrays, binary and fixed,
// the unscaled number must be encoded as two's complement using big-endian byte order.
Int128 value = buf_start[0] & 0x80 ? -1 : 0;
@ -351,7 +395,6 @@ Status ByteArrayPlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_col
}
DecimalPrimitiveType cast_value(value);
column_data.emplace_back(*reinterpret_cast<Decimal<DecimalPrimitiveType>*>(&cast_value));
_offset += length;
}
return Status::OK();
}
@ -374,8 +417,6 @@ public:
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) override;
Status decode_values(Slice& slice, size_t num_values) override;
Status skip_values(size_t num_values) override;
protected:

View File

@ -119,7 +119,43 @@ Status ColumnChunkReader::load_page_data() {
Status ColumnChunkReader::_decode_dict_page() {
const tparquet::PageHeader& header = *_page_reader->get_page_header();
DCHECK_EQ(tparquet::PageType::DICTIONARY_PAGE, header.type);
// TODO(gaoxin): decode dictionary page
// Using the PLAIN_DICTIONARY enum value is deprecated in the Parquet 2.0 specification.
// Prefer using RLE_DICTIONARY in a data page and PLAIN in a dictionary page for Parquet 2.0+ files.
// refer: https://github.com/apache/parquet-format/blob/master/Encodings.md
tparquet::Encoding::type dict_encoding = header.dictionary_page_header.encoding;
if (dict_encoding != tparquet::Encoding::PLAIN_DICTIONARY &&
dict_encoding != tparquet::Encoding::PLAIN) {
return Status::InternalError("Unsupported dictionary encoding {}",
tparquet::to_string(dict_encoding));
}
// Prepare dictionary data
int32_t uncompressed_size = header.uncompressed_page_size;
std::unique_ptr<uint8_t[]> dict_data(new uint8_t[uncompressed_size]);
if (_block_compress_codec != nullptr) {
Slice compressed_data;
RETURN_IF_ERROR(_page_reader->get_page_date(compressed_data));
Slice dict_slice(dict_data.get(), uncompressed_size);
RETURN_IF_ERROR(_block_compress_codec->decompress(compressed_data, &dict_slice));
} else {
Slice dict_slice;
RETURN_IF_ERROR(_page_reader->get_page_date(dict_slice));
// The data is stored by BufferedStreamReader, we should copy it out
memcpy(dict_data.get(), dict_slice.data, dict_slice.size);
}
// Cache page decoder
std::unique_ptr<Decoder> page_decoder;
Decoder::get_decoder(_metadata.type, tparquet::Encoding::RLE_DICTIONARY, page_decoder);
// Set type length
page_decoder->set_type_length(_get_type_length());
// Initialize the time convert context
page_decoder->init(_field_schema, _ctz);
// Set the dictionary data
RETURN_IF_ERROR(page_decoder->set_dict(dict_data, header.dictionary_page_header.num_values));
_decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)] = std::move(page_decoder);
return Status::OK();
}
@ -138,6 +174,16 @@ Status ColumnChunkReader::skip_values(size_t num_values) {
return _page_decoder->skip_values(num_values);
}
void ColumnChunkReader::insert_null_values(ColumnPtr& doris_column, size_t num_values) {
DCHECK_GE(_remaining_num_values, num_values);
CHECK(doris_column->is_nullable());
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(doris_column)).mutate().get());
MutableColumnPtr data_column = nullable_column->get_nested_column_ptr();
data_column->insert_default();
_remaining_num_values -= num_values;
}
size_t ColumnChunkReader::get_rep_levels(level_t* levels, size_t n) {
DCHECK_GT(_max_rep_level, 0);
return _rep_level_decoder.get_levels(levels, n);
@ -166,14 +212,6 @@ Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, DataType
return _page_decoder->decode_values(doris_column, data_type, num_values);
}
Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) {
if (UNLIKELY(_remaining_num_values < num_values)) {
return Status::IOError("Decode too many values in current page");
}
_remaining_num_values -= num_values;
return _page_decoder->decode_values(slice, num_values);
}
int32_t ColumnChunkReader::_get_type_length() {
switch (_field_schema->physical_type) {
case tparquet::Type::INT32:

View File

@ -87,7 +87,7 @@ public:
uint32_t remaining_num_values() const { return _remaining_num_values; };
// null values are generated from definition levels
// the caller should maintain the consistency after analyzing null values from definition levels.
void dec_num_values(uint32_t dec_num) { _remaining_num_values -= dec_num; };
void insert_null_values(ColumnPtr& doris_column, size_t num_values);
// Get the raw data of current page.
Slice& get_page_data() { return _page_data; }
@ -99,8 +99,6 @@ public:
// Decode values in current page into doris column.
Status decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values);
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values);
// For test, Decode values in current page into slice.
Status decode_values(Slice& slice, size_t num_values);
// Get the repetition level decoder of current page.
LevelDecoder& rep_level_decoder() { return _rep_level_decoder; }

View File

@ -0,0 +1,16 @@
+---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+
|tinyint_col(Nullable(Int8))|smallint_col(Nullable(Int16))|int_col(Nullable(Int32))|bigint_col(Nullable(Int64))|boolean_col(Nullable(UInt8))|float_col(Nullable(Float32))|double_col(Nullable(Float64))|string_col(Nullable(String))|binary_col(Nullable(String))|timestamp_col(Nullable(DateTime))|decimal_col(Nullable(Decimal(27, 9)))|char_col(Nullable(String))|varchar_col(Nullable(String))|date_col(Nullable(Date))|date_v2_col(Nullable(DateV2))|timestamp_v2_col(Nullable(DateTimeV2))|
+---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+
| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17|
| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17|
| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17|
| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17|
| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17|
| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17|
| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17|
| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17|
| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17|
| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17|
| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17|
| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17|
+---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+

View File

@ -0,0 +1,14 @@
+---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+
|tinyint_col(Nullable(Int8))|smallint_col(Nullable(Int16))|int_col(Nullable(Int32))|bigint_col(Nullable(Int64))|boolean_col(Nullable(UInt8))|float_col(Nullable(Float32))|double_col(Nullable(Float64))|string_col(Nullable(String))|binary_col(Nullable(String))|timestamp_col(Nullable(DateTime))|decimal_col(Nullable(Decimal(27, 9)))|char_col(Nullable(String))|varchar_col(Nullable(String))|date_col(Nullable(Date))|date_v2_col(Nullable(DateV2))|timestamp_v2_col(Nullable(DateTimeV2))|
+---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+
| -1| -1| -1| -1| 0| -1.140000| -1.140000| s-row0| b-row0| 2022-08-01 07:23:17| -1.140000000| c-row0| vc-row0| 2022-08-01| 2022-08-01| 2022-08-01 07:23:17|
| 2| 2| 2| 2| 1| 2.140000| 2.140000| NULL| b-row1| 2022-08-02 07:23:18| 2.140000000| c-row1| vc-row1| 2022-08-02| 2022-08-02| 2022-08-02 07:23:18|
| -3| -3| -3| -3| 0| -3.140000| -3.140000| s-row2| b-row2| 2022-08-03 07:23:19| -3.140000000| c-row2| vc-row2| 2022-08-03| 2022-08-03| 2022-08-03 07:23:19|
| 4| 4| 4| 4| 1| 4.140000| 4.140000| NULL| b-row3| 2022-08-04 07:24:17| 4.140000000| c-row3| vc-row3| 2022-08-04| 2022-08-04| 2022-08-04 07:24:17|
| -5| -5| -5| -5| 0| -5.140000| -5.140000| s-row4| b-row4| 2022-08-05 07:25:17| -5.140000000| c-row4| vc-row4| 2022-08-05| 2022-08-05| 2022-08-05 07:25:17|
| 6| 6| 6| 6| 0| 6.140000| 6.140000| s-row5| b-row5| 2022-08-06 07:26:17| 6.140000000| c-row5| vc-row5| 2022-08-06| 2022-08-06| 2022-08-06 07:26:17|
| -7| -7| -7| -7| 1| -7.140000| -7.140000| s-row6| b-row6| 2022-08-07 07:27:17| -7.140000000| c-row6| vc-row6| 2022-08-07| 2022-08-07| 2022-08-07 07:27:17|
| 8| 8| 8| 8| 0| 8.140000| 8.140000| NULL| b-row7| 2022-08-08 07:28:17| 8.140000000| c-row7| vc-row7| 2022-08-08| 2022-08-08| 2022-08-08 07:28:17|
| -9| -9| -9| -9| 0| -9.140000| -9.140000| s-row8| b-row8| 2022-08-09 07:29:17| -9.140000000| c-row8| vc-row8| 2022-08-09| 2022-08-09| 2022-08-09 07:29:17|
| 10| 10| 10| 10| 0| 10.140000| 10.140000| s-row9| b-row9| 2022-08-10 07:21:17| 10.140000000| c-row9| vc-row9| 2022-08-10| 2022-08-10| 2022-08-10 07:21:17|
+---------------------------+-----------------------------+------------------------+---------------------------+----------------------------+----------------------------+-----------------------------+----------------------------+----------------------------+---------------------------------+-------------------------------------+--------------------------+-----------------------------+------------------------+-----------------------------+--------------------------------------+

View File

@ -131,9 +131,25 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
ASSERT_EQ(schemaDescriptor.get_column_index("mark"), 4);
}
static int fill_nullable_column(ColumnPtr& doris_column, level_t* definitions, size_t num_values) {
CHECK(doris_column->is_nullable());
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(doris_column)).mutate().get());
NullMap& map_data = nullable_column->get_null_map_data();
int null_cnt = 0;
for (int i = 0; i < num_values; ++i) {
bool nullable = definitions[i] == 0;
if (nullable) {
null_cnt++;
}
map_data.emplace_back(nullable);
}
return null_cnt;
}
static Status get_column_values(FileReader* file_reader, tparquet::ColumnChunk* column_chunk,
FieldSchema* field_schema, ColumnPtr& doris_column,
DataTypePtr& data_type) {
DataTypePtr& data_type, level_t* definitions) {
tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data;
size_t start_offset = chunk_meta.__isset.dictionary_page_offset
? chunk_meta.dictionary_page_offset
@ -150,8 +166,46 @@ static Status get_column_values(FileReader* file_reader, tparquet::ColumnChunk*
chunk_reader.next_page();
// load page data into underlying container
chunk_reader.load_page_data();
int rows = chunk_reader.remaining_num_values();
// definition levels
if (field_schema->definition_level == 0) { // required field
std::fill(definitions, definitions + rows, 1);
} else {
chunk_reader.get_def_levels(definitions, rows);
}
// fill nullable values
fill_nullable_column(doris_column, definitions, rows);
// decode page data
return chunk_reader.decode_values(doris_column, data_type, chunk_reader.remaining_num_values());
if (field_schema->definition_level == 0) {
// required column
return chunk_reader.decode_values(doris_column, data_type, rows);
} else {
// column with null values
level_t level_type = definitions[0];
int num_values = 1;
for (int i = 1; i < rows; ++i) {
if (definitions[i] != level_type) {
if (level_type == 0) {
// null values
chunk_reader.insert_null_values(doris_column, num_values);
} else {
RETURN_IF_ERROR(
chunk_reader.decode_values(doris_column, data_type, num_values));
}
level_type = definitions[i];
num_values = 1;
} else {
num_values++;
}
}
if (level_type == 0) {
// null values
chunk_reader.insert_null_values(doris_column, num_values);
} else {
RETURN_IF_ERROR(chunk_reader.decode_values(doris_column, data_type, num_values));
}
return Status::OK();
}
}
static void create_block(std::unique_ptr<vectorized::Block>& block) {
@ -192,10 +246,11 @@ static void create_block(std::unique_ptr<vectorized::Block>& block) {
}
}
TEST_F(ParquetThriftReaderTest, type_decoder) {
static void read_parquet_data_and_check(const std::string& parquet_file,
const std::string& result_file, int rows) {
/*
* type-decoder.parquet is the part of following table:
* create table `type_decoder`(
* table schema in parquet file:
* create table `decoder`(
* `tinyint_col` tinyint, // 0
* `smallint_col` smallint, // 1
* `int_col` int, // 2
@ -213,20 +268,7 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
* `list_string` array<string>) // 14
*/
LocalFileReader reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
/*
* Data in type-decoder.parquet:
* -1 -1 -1 -1 false -1.14 -1.14 s-row0 b-row0 2022-08-01 07:23:17 -1.14 c-row0 vc-row0 2022-08-01 ["as-0","as-1"]
* 2 2 2 2 true 2.14 2.14 NULL b-row1 2022-08-02 07:23:18 2.14 c-row1 vc-row1 2022-08-02 [null,"as-3"]
* -3 -3 -3 -3 false -3.14 -3.14 s-row2 b-row2 2022-08-03 07:23:19 -3.14 c-row2 vc-row2 2022-08-03 []
* 4 4 4 4 true 4.14 4.14 NULL b-row3 2022-08-04 07:24:17 4.14 c-row3 vc-row3 2022-08-04 ["as-4"]
* -5 -5 -5 -5 false -5.14 -5.14 s-row4 b-row4 2022-08-05 07:25:17 -5.14 c-row4 vc-row4 2022-08-05 ["as-5",null]
* 6 6 6 6 false 6.14 6.14 s-row5 b-row5 2022-08-06 07:26:17 6.14 c-row5 vc-row5 2022-08-06 [null,null]
* -7 -7 -7 -7 true -7.14 -7.14 s-row6 b-row6 2022-08-07 07:27:17 -7.14 c-row6 vc-row6 2022-08-07 ["as-6","as-7"]
* 8 8 8 8 false 8.14 8.14 NULL b-row7 2022-08-08 07:28:17 8.14 c-row7 vc-row7 2022-08-08 ["as-0","as-8"]
* -9 -9 -9 -9 false -9.14 -9.14 s-row8 b-row8 2022-08-09 07:29:17 -9.14 c-row8 vc-row8 2022-08-09 ["as-9","as-10"]
* 10 10 10 10 false 10.14 10.14 s-row9 b-row9 2022-08-10 07:21:17 10.14 c-row9 vc-row9 2022-08-10 ["as-11","as-12"]
*/
LocalFileReader reader(parquet_file, 0);
auto st = reader.open();
EXPECT_TRUE(st.ok());
@ -237,194 +279,15 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata();
FieldDescriptor schema_descriptor;
schema_descriptor.parse_from_thrift(t_metadata.schema);
int rows = 10;
level_t defs[rows];
// the physical_type of tinyint_col, smallint_col and int_col are all INT32
// they are distinguished by converted_type(in FieldSchema.parquet_schema.converted_type)
{
auto& column_name_with_type = block->get_by_position(0);
for (int c = 0; c < 14; ++c) {
auto& column_name_with_type = block->get_by_position(c);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[0],
const_cast<FieldSchema*>(schema_descriptor.get_column(0)), data_column,
data_type);
int int_sum = 0;
for (int i = 0; i < rows; ++i) {
int_sum += (int8_t)data_column->get64(i);
}
ASSERT_EQ(int_sum, 5);
}
{
auto& column_name_with_type = block->get_by_position(1);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[1],
const_cast<FieldSchema*>(schema_descriptor.get_column(1)), data_column,
data_type);
int int_sum = 0;
for (int i = 0; i < rows; ++i) {
int_sum += (int16_t)data_column->get64(i);
}
ASSERT_EQ(int_sum, 5);
}
{
auto& column_name_with_type = block->get_by_position(2);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[2],
const_cast<FieldSchema*>(schema_descriptor.get_column(2)), data_column,
data_type);
int int_sum = 0;
for (int i = 0; i < rows; ++i) {
int_sum += (int32_t)data_column->get64(i);
}
ASSERT_EQ(int_sum, 5);
}
{
auto& column_name_with_type = block->get_by_position(3);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[3],
const_cast<FieldSchema*>(schema_descriptor.get_column(3)), data_column,
data_type);
int64_t int_sum = 0;
for (int i = 0; i < rows; ++i) {
int_sum += (int64_t)data_column->get64(i);
}
ASSERT_EQ(int_sum, 5);
}
// `boolean_col` boolean, // 4
{
auto& column_name_with_type = block->get_by_position(4);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[4],
const_cast<FieldSchema*>(schema_descriptor.get_column(4)), data_column,
data_type);
ASSERT_FALSE(static_cast<bool>(data_column->get64(0)));
ASSERT_TRUE(static_cast<bool>(data_column->get64(1)));
ASSERT_FALSE(static_cast<bool>(data_column->get64(2)));
ASSERT_TRUE(static_cast<bool>(data_column->get64(3)));
ASSERT_FALSE(static_cast<bool>(data_column->get64(4)));
ASSERT_FALSE(static_cast<bool>(data_column->get64(5)));
ASSERT_TRUE(static_cast<bool>(data_column->get64(6)));
ASSERT_FALSE(static_cast<bool>(data_column->get64(7)));
ASSERT_FALSE(static_cast<bool>(data_column->get64(8)));
ASSERT_FALSE(static_cast<bool>(data_column->get64(9)));
}
// `double_col` double, // 6
{
auto& column_name_with_type = block->get_by_position(6);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[6],
const_cast<FieldSchema*>(schema_descriptor.get_column(6)), data_column,
data_type);
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(data_column)).mutate().get());
MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr();
ASSERT_EQ(nested_column->get_float64(0), -1.14);
ASSERT_EQ(nested_column->get_float64(1), 2.14);
ASSERT_EQ(nested_column->get_float64(2), -3.14);
ASSERT_EQ(nested_column->get_float64(3), 4.14);
}
// `string_col` string, // 7
{
auto& column_name_with_type = block->get_by_position(7);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
tparquet::ColumnChunk column_chunk = t_metadata.row_groups[0].columns[7];
tparquet::ColumnMetaData chunk_meta = column_chunk.meta_data;
size_t start_offset = chunk_meta.__isset.dictionary_page_offset
? chunk_meta.dictionary_page_offset
: chunk_meta.data_page_offset;
size_t chunk_size = chunk_meta.total_compressed_size;
BufferedFileStreamReader stream_reader(&reader, start_offset, chunk_size);
cctz::time_zone ctz;
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
ColumnChunkReader chunk_reader(&stream_reader, &column_chunk,
const_cast<FieldSchema*>(schema_descriptor.get_column(7)),
&ctz);
// initialize chunk reader
chunk_reader.init();
// seek to next page header
chunk_reader.next_page();
// load page data into underlying container
chunk_reader.load_page_data();
level_t defs[rows];
// Analyze null string
chunk_reader.get_def_levels(defs, rows);
ASSERT_EQ(defs[1], 0);
ASSERT_EQ(defs[3], 0);
ASSERT_EQ(defs[7], 0);
chunk_reader.decode_values(data_column, data_type, 7);
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(data_column)).mutate().get());
MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr();
auto row0 = nested_column->get_data_at(0).data;
auto row2 = nested_column->get_data_at(1).data;
ASSERT_STREQ("s-row0", row0);
ASSERT_STREQ("s-row2", row2);
}
// `timestamp_col` timestamp, // 9, DATETIME
{
auto& column_name_with_type = block->get_by_position(9);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[9],
const_cast<FieldSchema*>(schema_descriptor.get_column(9)), data_column,
data_type);
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(data_column)).mutate().get());
MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr();
int64_t date_value = (int64_t)nested_column->get64(0);
VecDateTimeInt64Union conv = {.i64 = date_value};
auto dt = conv.dt;
ASSERT_EQ(dt.hour(), 7);
ASSERT_EQ(dt.minute(), 23);
ASSERT_EQ(dt.second(), 17);
}
// `decimal_col` decimal, // 10
{
auto& column_name_with_type = block->get_by_position(10);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[10],
const_cast<FieldSchema*>(schema_descriptor.get_column(10)), data_column,
data_type);
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(data_column)).mutate().get());
MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr();
int neg = 1;
for (int i = 0; i < rows; ++i) {
neg *= -1;
auto decimal_field = nested_column->operator[](i)
.get<vectorized::DecimalField<vectorized::Decimal128>>();
EXPECT_EQ(DecimalV2Value(decimal_field.get_value()),
DecimalV2Value(std::to_string(neg * (1.14 + i))));
}
}
// `date_col` date, // 13, DATE
{
auto& column_name_with_type = block->get_by_position(13);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[13],
const_cast<FieldSchema*>(schema_descriptor.get_column(13)), data_column,
data_type);
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(data_column)).mutate().get());
MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr();
for (int i = 0; i < rows; ++i) {
int64_t date_value = (int64_t)nested_column->get64(i);
VecDateTimeInt64Union conv = {.i64 = date_value};
auto dt = conv.dt;
ASSERT_EQ(dt.year(), 2022);
ASSERT_EQ(dt.month(), 8);
ASSERT_EQ(dt.day(), i + 1);
}
get_column_values(&reader, &t_metadata.row_groups[0].columns[c],
const_cast<FieldSchema*>(schema_descriptor.get_column(c)), data_column,
data_type, defs);
}
// `date_v2_col` date, // 14 - 13, DATEV2
{
@ -433,18 +296,7 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[13],
const_cast<FieldSchema*>(schema_descriptor.get_column(13)), data_column,
data_type);
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(data_column)).mutate().get());
MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr();
for (int i = 0; i < rows; ++i) {
uint32_t date_value = (uint32_t)nested_column->get64(i);
DateV2UInt32Union conv = {.ui32 = date_value};
auto dt = conv.dt;
ASSERT_EQ(dt.year(), 2022);
ASSERT_EQ(dt.month(), 8);
ASSERT_EQ(dt.day(), i + 1);
}
data_type, defs);
}
// `timestamp_v2_col` timestamp, // 15 - 9, DATETIMEV2
{
@ -453,17 +305,28 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[9],
const_cast<FieldSchema*>(schema_descriptor.get_column(9)), data_column,
data_type);
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(data_column)).mutate().get());
MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr();
uint64_t date_value = nested_column->get64(0);
DateTimeV2UInt64Union conv = {.ui64 = date_value};
auto dt = conv.dt;
ASSERT_EQ(dt.hour(), 7);
ASSERT_EQ(dt.minute(), 23);
ASSERT_EQ(dt.second(), 17);
data_type, defs);
}
LocalFileReader result(result_file, 0);
auto rst = result.open();
EXPECT_TRUE(rst.ok());
uint8_t result_buf[result.size() + 1];
result_buf[result.size()] = '\0';
int64_t bytes_read;
bool eof;
result.read(result_buf, result.size(), &bytes_read, &eof);
ASSERT_STREQ(block->dump_data(0, rows).c_str(), reinterpret_cast<char*>(result_buf));
}
TEST_F(ParquetThriftReaderTest, type_decoder) {
read_parquet_data_and_check("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet",
"./be/test/exec/test_data/parquet_scanner/type-decoder.txt", 10);
}
TEST_F(ParquetThriftReaderTest, dict_decoder) {
read_parquet_data_and_check("./be/test/exec/test_data/parquet_scanner/dict-decoder.parquet",
"./be/test/exec/test_data/parquet_scanner/dict-decoder.txt", 12);
}
TEST_F(ParquetThriftReaderTest, column_reader) {