[Improvement](parquet-reader) Optimize and refactor parquet reader to improve performance. (#16818)

Optimize and refactor parquet reader to improve performance.
- Improve 2x performance for small dict string by aligned copying.
- Refactor code to decrease condition(if) checking.
- Don't call skip(0).
- Don't read page index if no condition.

**ssb-flat-100**: (single-machine, single-thread)
| Query        | before opt           | after opt  |
| ------------- |:-------------:| ---------:|
| SELECT count(lo_revenue) FROM lineorder_flat       | 9.23   | 9.12 |
| SELECT count(lo_linenumber) FROM lineorder_flat | 4.50    | 4.36 |
| SELECT count(c_name) FROM lineorder_flat             | 18.22 | 17.88| 
| **SELECT count(lo_shipmode) FROM lineorder_flat**     |**10.09** | **6.15**|
This commit is contained in:
Qi Chen
2023-02-20 11:42:29 +08:00
committed by GitHub
parent 2bc014d83a
commit ef2fdb79bb
23 changed files with 2005 additions and 1092 deletions

View File

@ -281,6 +281,11 @@ set(VEC_FILES
exec/format/parquet/schema_desc.cpp
exec/format/parquet/vparquet_column_reader.cpp
exec/format/parquet/level_decoder.cpp
exec/format/parquet/decoder.cpp
exec/format/parquet/fix_length_plain_decoder.cpp
exec/format/parquet/byte_array_plain_decoder.cpp
exec/format/parquet/byte_array_dict_decoder.cpp
exec/format/parquet/bool_plain_decoder.cpp
exec/format/parquet/parquet_common.cpp
exec/scan/vscan_node.cpp
exec/scan/vscanner.cpp

View File

@ -265,6 +265,11 @@ public:
LOG(FATAL) << "Method insert_many_binary_data is not supported for " << get_name();
}
virtual void insert_many_strings_overflow(const StringRef* strings, size_t num,
size_t max_length) {
LOG(FATAL) << "Method insert_many_strings_overflow is not supported for " << get_name();
}
// Here `pos` points to the memory data type is the same as the data type of the column.
// This function is used by `insert_keys_into_columns` in AggregationNode.
virtual void insert_many_raw_data(const char* pos, size_t num) {

View File

@ -280,6 +280,52 @@ public:
}
}
#define MAX_STRINGS_OVERFLOW_SIZE 128
template <typename T, size_t copy_length>
void insert_many_strings_fixed_length(const StringRef* strings, size_t num)
__attribute__((noinline));
template <size_t copy_length>
void insert_many_strings_fixed_length(const StringRef* strings, size_t num) {
size_t new_size = 0;
for (size_t i = 0; i < num; i++) {
new_size += strings[i].size;
}
const size_t old_size = chars.size();
check_chars_length(old_size + new_size, offsets.size() + num);
chars.resize(old_size + new_size + copy_length);
Char* data = chars.data();
size_t offset = old_size;
for (size_t i = 0; i < num; i++) {
uint32_t len = strings[i].size;
if (len) {
memcpy(data + offset, strings[i].data, copy_length);
offset += len;
}
offsets.push_back(offset);
}
chars.resize(old_size + new_size);
}
void insert_many_strings_overflow(const StringRef* strings, size_t num,
size_t max_length) override {
if (max_length <= 8) {
insert_many_strings_fixed_length<8>(strings, num);
} else if (max_length <= 16) {
insert_many_strings_fixed_length<16>(strings, num);
} else if (max_length <= 32) {
insert_many_strings_fixed_length<32>(strings, num);
} else if (max_length <= 64) {
insert_many_strings_fixed_length<64>(strings, num);
} else if (max_length <= 128) {
insert_many_strings_fixed_length<128>(strings, num);
} else {
insert_many_strings(strings, num);
}
}
void insert_many_dict_data(const int32_t* data_array, size_t start_index, const StringRef* dict,
size_t num, uint32_t /*dict_num*/) override {
size_t offset_size = offsets.size();

View File

@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/format/parquet/bool_plain_decoder.h"
namespace doris::vectorized {
Status BoolPlainDecoder::skip_values(size_t num_values) {
int skip_cached = std::min(num_unpacked_values_ - unpacked_value_idx_, (int)num_values);
unpacked_value_idx_ += skip_cached;
if (skip_cached == num_values) {
return Status::OK();
}
int num_remaining = num_values - skip_cached;
int num_to_skip = BitUtil::RoundDownToPowerOf2(num_remaining, 32);
if (num_to_skip > 0) {
bool_values_.SkipBatch(1, num_to_skip);
}
num_remaining -= num_to_skip;
if (num_remaining > 0) {
DCHECK_LE(num_remaining, UNPACKED_BUFFER_LEN);
num_unpacked_values_ =
bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
if (UNLIKELY(num_unpacked_values_ < num_remaining)) {
return Status::IOError("Can't skip enough booleans in plain decoder");
}
unpacked_value_idx_ = num_remaining;
}
return Status::OK();
}
Status BoolPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<UInt8>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
bool value;
for (size_t i = 0; i < run_length; ++i) {
if (UNLIKELY(!_decode_value(&value))) {
return Status::IOError("Can't read enough booleans in plain decoder");
}
column_data[data_index++] = (UInt8)value;
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
bool value;
for (int i = 0; i < run_length; ++i) {
if (UNLIKELY(!_decode_value(&value))) {
return Status::IOError("Can't read enough booleans in plain decoder");
}
}
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "util/bit_stream_utils.inline.h"
#include "vec/exec/format/parquet/decoder.h"
namespace doris::vectorized {
/// Decoder bit-packed boolean-encoded values.
/// Implementation from https://github.com/apache/impala/blob/master/be/src/exec/parquet/parquet-bool-decoder.h
class BoolPlainDecoder final : public Decoder {
public:
BoolPlainDecoder() = default;
~BoolPlainDecoder() override = default;
// Set the data to be decoded
void set_data(Slice* data) override {
bool_values_.Reset((const uint8_t*)data->data, data->size);
num_unpacked_values_ = 0;
unpacked_value_idx_ = 0;
_offset = 0;
}
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) override;
Status skip_values(size_t num_values) override;
protected:
inline bool _decode_value(bool* value) {
if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) {
*value = unpacked_values_[unpacked_value_idx_++];
} else {
num_unpacked_values_ =
bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
if (UNLIKELY(num_unpacked_values_ == 0)) {
return false;
}
*value = unpacked_values_[0];
unpacked_value_idx_ = 1;
}
return true;
}
/// A buffer to store unpacked values. Must be a multiple of 32 size to use the
/// batch-oriented interface of BatchedBitReader. We use uint8_t instead of bool because
/// bit unpacking is only supported for unsigned integers. The values are converted to
/// bool when returned to the user.
static const int UNPACKED_BUFFER_LEN = 128;
uint8_t unpacked_values_[UNPACKED_BUFFER_LEN];
/// The number of valid values in 'unpacked_values_'.
int num_unpacked_values_ = 0;
/// The next value to return from 'unpacked_values_'.
int unpacked_value_idx_ = 0;
/// Bit packed decoder, used if 'encoding_' is PLAIN.
BatchedBitReader bool_values_;
};
} // namespace doris::vectorized

View File

@ -0,0 +1,131 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/format/parquet/byte_array_dict_decoder.h"
#include "util/coding.h"
#include "vec/columns/column_dictionary.h"
#include "vec/data_types/data_type_nullable.h"
namespace doris::vectorized {
Status ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length,
size_t num_values) {
_dict = std::move(dict);
_dict_items.reserve(num_values);
uint32_t offset_cursor = 0;
char* dict_item_address = reinterpret_cast<char*>(_dict.get());
size_t total_length = 0;
for (int i = 0; i < num_values; ++i) {
uint32_t l = decode_fixed32_le(_dict.get() + offset_cursor);
offset_cursor += 4;
offset_cursor += l;
total_length += l;
}
// For insert_many_strings_overflow
_dict_data.resize(total_length + MAX_STRINGS_OVERFLOW_SIZE);
_max_value_length = 0;
size_t offset = 0;
offset_cursor = 0;
for (int i = 0; i < num_values; ++i) {
uint32_t l = decode_fixed32_le(_dict.get() + offset_cursor);
offset_cursor += 4;
memcpy(&_dict_data[offset], dict_item_address + offset_cursor, l);
_dict_items.emplace_back(&_dict_data[offset], l);
offset_cursor += l;
offset += l;
if (offset_cursor > length) {
return Status::Corruption("Wrong data length in dictionary");
}
if (l > _max_value_length) {
_max_value_length = l;
}
}
if (offset_cursor != length) {
return Status::Corruption("Wrong dictionary data for byte array type");
}
return Status::OK();
}
Status ByteArrayDictDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
size_t non_null_size = select_vector.num_values() - select_vector.num_nulls();
if (doris_column->is_column_dictionary() &&
assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
assert_cast<ColumnDictI32&>(*doris_column)
.insert_many_dict_data(&_dict_items[0], _dict_items.size());
}
_indexes.resize(non_null_size);
_index_batch_decoder->GetBatch(&_indexes[0], non_null_size);
if (doris_column->is_column_dictionary()) {
return _decode_dict_values(doris_column, select_vector);
}
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
case TypeIndex::String:
case TypeIndex::FixedString: {
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
std::vector<StringRef> string_values;
string_values.reserve(run_length);
for (size_t i = 0; i < run_length; ++i) {
string_values.emplace_back(_dict_items[_indexes[dict_index++]]);
}
doris_column->insert_many_strings_overflow(&string_values[0], run_length,
_max_value_length);
break;
}
case ColumnSelectVector::NULL_DATA: {
doris_column->insert_many_defaults(run_length);
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
dict_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
case TypeIndex::Decimal32:
return _decode_binary_decimal<Int32>(doris_column, data_type, select_vector);
case TypeIndex::Decimal64:
return _decode_binary_decimal<Int64>(doris_column, data_type, select_vector);
case TypeIndex::Decimal128:
return _decode_binary_decimal<Int128>(doris_column, data_type, select_vector);
case TypeIndex::Decimal128I:
return _decode_binary_decimal<Int128>(doris_column, data_type, select_vector);
default:
break;
}
return Status::InvalidArgument(
"Can't decode parquet physical type BYTE_ARRAY to doris logical type {}",
getTypeName(logical_type));
}
} // namespace doris::vectorized

View File

@ -0,0 +1,100 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include "gutil/endian.h"
#include "util/rle_encoding.h"
#include "vec/exec/format/parquet/decoder.h"
namespace doris::vectorized {
class ByteArrayDictDecoder final : public BaseDictDecoder {
public:
ByteArrayDictDecoder() = default;
~ByteArrayDictDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) override;
Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) override;
protected:
template <typename DecimalPrimitiveType>
Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector);
// For dictionary encoding
std::vector<StringRef> _dict_items;
std::vector<uint8_t> _dict_data;
size_t _max_value_length;
};
template <typename DecimalPrimitiveType>
Status ByteArrayDictDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
init_decimal_converter<DecimalPrimitiveType>(data_type);
auto& column_data =
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
size_t dict_index = 0;
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
StringRef& slice = _dict_items[_indexes[dict_index++]];
char* buf_start = const_cast<char*>(slice.data);
uint32_t length = (uint32_t)slice.size;
// When Decimal in parquet is stored in byte arrays, binary and fixed,
// the unscaled number must be encoded as two's complement using big-endian byte order.
Int128 value = buf_start[0] & 0x80 ? -1 : 0;
memcpy(reinterpret_cast<char*>(&value) + sizeof(Int128) - length, buf_start,
length);
value = BigEndian::ToHost128(value);
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
value *= scale_params.scale_factor;
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) {
value /= scale_params.scale_factor;
}
auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[data_index++]);
v = (DecimalPrimitiveType)value;
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
dict_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/format/parquet/byte_array_plain_decoder.h"
#include "vec/data_types/data_type_nullable.h"
namespace doris::vectorized {
Status ByteArrayPlainDecoder::skip_values(size_t num_values) {
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
uint32_t length =
decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
return Status::IOError("Can't skip enough bytes in plain decoder");
}
_offset += length;
}
return Status::OK();
}
Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
case TypeIndex::String:
case TypeIndex::FixedString: {
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
std::vector<StringRef> string_values;
string_values.reserve(run_length);
for (size_t i = 0; i < run_length; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
uint32_t length = decode_fixed32_le(
reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
return Status::IOError("Can't read enough bytes in plain decoder");
}
string_values.emplace_back(_data->data + _offset, length);
_offset += length;
}
doris_column->insert_many_strings(&string_values[0], run_length);
break;
}
case ColumnSelectVector::NULL_DATA: {
doris_column->insert_many_defaults(run_length);
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
for (int i = 0; i < run_length; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
uint32_t length = decode_fixed32_le(
reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
return Status::IOError("Can't read enough bytes in plain decoder");
}
_offset += length;
}
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
case TypeIndex::Decimal32:
return _decode_binary_decimal<Int32>(doris_column, data_type, select_vector);
case TypeIndex::Decimal64:
return _decode_binary_decimal<Int64>(doris_column, data_type, select_vector);
case TypeIndex::Decimal128:
return _decode_binary_decimal<Int128>(doris_column, data_type, select_vector);
case TypeIndex::Decimal128I:
return _decode_binary_decimal<Int128>(doris_column, data_type, select_vector);
default:
break;
}
return Status::InvalidArgument(
"Can't decode parquet physical type BYTE_ARRAY to doris logical type {}",
getTypeName(logical_type));
}
} // namespace doris::vectorized

View File

@ -0,0 +1,98 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include "util/coding.h"
#include "vec/exec/format/parquet/decoder.h"
namespace doris::vectorized {
class ByteArrayPlainDecoder final : public Decoder {
public:
ByteArrayPlainDecoder() = default;
~ByteArrayPlainDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) override;
Status skip_values(size_t num_values) override;
protected:
template <typename DecimalPrimitiveType>
Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector);
};
template <typename DecimalPrimitiveType>
Status ByteArrayPlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
init_decimal_converter<DecimalPrimitiveType>(data_type);
auto& column_data =
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
uint32_t length =
decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
char* buf_start = _data->data + _offset;
_offset += length;
// When Decimal in parquet is stored in byte arrays, binary and fixed,
// the unscaled number must be encoded as two's complement using big-endian byte order.
Int128 value = buf_start[0] & 0x80 ? -1 : 0;
memcpy(reinterpret_cast<char*>(&value) + sizeof(Int128) - length, buf_start,
length);
value = BigEndian::ToHost128(value);
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
value *= scale_params.scale_factor;
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) {
value /= scale_params.scale_factor;
}
auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[data_index++]);
v = (DecimalPrimitiveType)value;
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
_offset += _type_length * run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -0,0 +1,155 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/format/parquet/decoder.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exec/format/parquet/bool_plain_decoder.h"
#include "vec/exec/format/parquet/byte_array_dict_decoder.h"
#include "vec/exec/format/parquet/byte_array_plain_decoder.h"
#include "vec/exec/format/parquet/fix_length_dict_decoder.hpp"
#include "vec/exec/format/parquet/fix_length_plain_decoder.h"
namespace doris::vectorized {
const cctz::time_zone DecodeParams::utc0 = cctz::utc_time_zone();
Status Decoder::get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding,
std::unique_ptr<Decoder>& decoder) {
switch (encoding) {
case tparquet::Encoding::PLAIN:
switch (type) {
case tparquet::Type::BOOLEAN:
decoder.reset(new BoolPlainDecoder());
break;
case tparquet::Type::BYTE_ARRAY:
decoder.reset(new ByteArrayPlainDecoder());
break;
case tparquet::Type::INT32:
case tparquet::Type::INT64:
case tparquet::Type::INT96:
case tparquet::Type::FLOAT:
case tparquet::Type::DOUBLE:
case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
decoder.reset(new FixLengthPlainDecoder(type));
break;
default:
return Status::InternalError("Unsupported type {}(encoding={}) in parquet decoder",
tparquet::to_string(type), tparquet::to_string(encoding));
}
break;
case tparquet::Encoding::RLE_DICTIONARY:
switch (type) {
case tparquet::Type::BOOLEAN:
if (encoding != tparquet::Encoding::PLAIN) {
return Status::InternalError("Bool type can't has dictionary page");
}
case tparquet::Type::BYTE_ARRAY:
decoder.reset(new ByteArrayDictDecoder());
break;
case tparquet::Type::INT32:
decoder.reset(new FixLengthDictDecoder<Int32>(type));
break;
case tparquet::Type::INT64:
decoder.reset(new FixLengthDictDecoder<Int64>(type));
break;
case tparquet::Type::INT96:
decoder.reset(new FixLengthDictDecoder<ParquetInt96>(type));
break;
case tparquet::Type::FLOAT:
decoder.reset(new FixLengthDictDecoder<Float32>(type));
break;
case tparquet::Type::DOUBLE:
decoder.reset(new FixLengthDictDecoder<Float64>(type));
break;
case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
decoder.reset(new FixLengthDictDecoder<char*>(type));
break;
default:
return Status::InternalError("Unsupported type {}(encoding={}) in parquet decoder",
tparquet::to_string(type), tparquet::to_string(encoding));
}
break;
default:
return Status::InternalError("Unsupported encoding {}(type={}) in parquet decoder",
tparquet::to_string(encoding), tparquet::to_string(type));
}
return Status::OK();
}
void Decoder::init(FieldSchema* field_schema, cctz::time_zone* ctz) {
_field_schema = field_schema;
if (_decode_params == nullptr) {
_decode_params.reset(new DecodeParams());
}
if (ctz != nullptr) {
_decode_params->ctz = ctz;
}
const auto& schema = field_schema->parquet_schema;
if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) {
const auto& timestamp_info = schema.logicalType.TIMESTAMP;
if (!timestamp_info.isAdjustedToUTC) {
// should set timezone to utc+0
_decode_params->ctz = const_cast<cctz::time_zone*>(&_decode_params->utc0);
}
const auto& time_unit = timestamp_info.unit;
if (time_unit.__isset.MILLIS) {
_decode_params->second_mask = 1000;
_decode_params->scale_to_nano_factor = 1000000;
} else if (time_unit.__isset.MICROS) {
_decode_params->second_mask = 1000000;
_decode_params->scale_to_nano_factor = 1000;
} else if (time_unit.__isset.NANOS) {
_decode_params->second_mask = 1000000000;
_decode_params->scale_to_nano_factor = 1;
}
} else if (schema.__isset.converted_type) {
const auto& converted_type = schema.converted_type;
if (converted_type == tparquet::ConvertedType::TIMESTAMP_MILLIS) {
_decode_params->second_mask = 1000;
_decode_params->scale_to_nano_factor = 1000000;
} else if (converted_type == tparquet::ConvertedType::TIMESTAMP_MICROS) {
_decode_params->second_mask = 1000000;
_decode_params->scale_to_nano_factor = 1000;
}
}
}
template <typename DecimalPrimitiveType>
void Decoder::init_decimal_converter(DataTypePtr& data_type) {
if (_decode_params == nullptr || _field_schema == nullptr ||
_decode_params->decimal_scale.scale_type != DecimalScaleParams::NOT_INIT) {
return;
}
auto scale = _field_schema->parquet_schema.scale;
auto* decimal_type = reinterpret_cast<DataTypeDecimal<Decimal<DecimalPrimitiveType>>*>(
const_cast<IDataType*>(remove_nullable(data_type).get()));
auto dest_scale = decimal_type->get_scale();
if (dest_scale > scale) {
_decode_params->decimal_scale.scale_type = DecimalScaleParams::SCALE_UP;
_decode_params->decimal_scale.scale_factor =
DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(dest_scale - scale);
} else if (dest_scale < scale) {
_decode_params->decimal_scale.scale_type = DecimalScaleParams::SCALE_DOWN;
_decode_params->decimal_scale.scale_factor =
DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(scale - dest_scale);
} else {
_decode_params->decimal_scale.scale_type = DecimalScaleParams::NO_SCALE;
_decode_params->decimal_scale.scale_factor = 1;
}
}
} // namespace doris::vectorized

View File

@ -0,0 +1,157 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include "common/status.h"
#include "gen_cpp/parquet_types.h"
#include "schema_desc.h"
#include "util/rle_encoding.h"
#include "vec/columns/column_dictionary.h"
#include "vec/data_types/data_type.h"
#include "vec/exec/format/format_common.h"
#include "vec/exec/format/parquet/parquet_common.h"
namespace doris::vectorized {
#define FOR_LOGICAL_NUMERIC_TYPES(M) \
M(TypeIndex::Int8, Int8, Int32) \
M(TypeIndex::UInt8, UInt8, Int32) \
M(TypeIndex::Int16, Int16, Int32) \
M(TypeIndex::UInt16, UInt16, Int32) \
M(TypeIndex::Int32, Int32, Int32) \
M(TypeIndex::UInt32, UInt32, Int32) \
M(TypeIndex::Int64, Int64, Int64) \
M(TypeIndex::UInt64, UInt64, Int64) \
M(TypeIndex::Float32, Float32, Float32) \
M(TypeIndex::Float64, Float64, Float64)
struct DecodeParams {
// schema.logicalType.TIMESTAMP.isAdjustedToUTC == false
static const cctz::time_zone utc0;
// schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set the time zone
cctz::time_zone* ctz = nullptr;
int64_t second_mask = 1;
int64_t scale_to_nano_factor = 1;
DecimalScaleParams decimal_scale;
};
class Decoder {
public:
Decoder() = default;
virtual ~Decoder() = default;
static Status get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding,
std::unique_ptr<Decoder>& decoder);
// The type with fix length
void set_type_length(int32_t type_length) { _type_length = type_length; }
// Set the data to be decoded
virtual void set_data(Slice* data) {
_data = data;
_offset = 0;
}
void init(FieldSchema* field_schema, cctz::time_zone* ctz);
template <typename DecimalPrimitiveType>
void init_decimal_converter(DataTypePtr& data_type);
// Write the decoded values batch to doris's column
virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) = 0;
virtual Status skip_values(size_t num_values) = 0;
virtual Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) {
return Status::NotSupported("set_dict is not supported");
}
protected:
int32_t _type_length;
Slice* _data = nullptr;
uint32_t _offset = 0;
FieldSchema* _field_schema = nullptr;
std::unique_ptr<DecodeParams> _decode_params = nullptr;
};
class BaseDictDecoder : public Decoder {
public:
BaseDictDecoder() = default;
virtual ~BaseDictDecoder() override = default;
// Set the data to be decoded
virtual void set_data(Slice* data) override {
_data = data;
_offset = 0;
uint8_t bit_width = *data->data;
_index_batch_decoder.reset(
new RleBatchDecoder<uint32_t>(reinterpret_cast<uint8_t*>(data->data) + 1,
static_cast<int>(data->size) - 1, bit_width));
}
protected:
/**
* Decode dictionary-coded values into doris_column, ensure that doris_column is ColumnDictI32 type,
* and the coded values must be read into _indexes previously.
*/
Status _decode_dict_values(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) {
DCHECK(doris_column->is_column_dictionary());
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
auto& column_data = assert_cast<ColumnDictI32&>(*doris_column).get_data();
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
uint32_t* start_index = &_indexes[0];
column_data.insert(start_index + dict_index, start_index + dict_index + run_length);
dict_index += run_length;
break;
}
case ColumnSelectVector::NULL_DATA: {
doris_column->insert_many_defaults(run_length);
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
dict_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
break;
}
}
}
return Status::OK();
}
Status skip_values(size_t num_values) override {
_indexes.resize(num_values);
_index_batch_decoder->GetBatch(&_indexes[0], num_values);
return Status::OK();
}
protected:
// For dictionary encoding
std::unique_ptr<uint8_t[]> _dict = nullptr;
std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
std::vector<uint32_t> _indexes;
};
} // namespace doris::vectorized

View File

@ -0,0 +1,531 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "vec/columns/column_dictionary.h"
#include "vec/columns/column_nullable.h"
#include "vec/data_types/data_type_nullable.h"
namespace doris::vectorized {
template <typename T>
class FixLengthDictDecoder final : public BaseDictDecoder {
public:
FixLengthDictDecoder(tparquet::Type::type physical_type)
: BaseDictDecoder(), _physical_type(physical_type) {};
~FixLengthDictDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) override {
size_t non_null_size = select_vector.num_values() - select_vector.num_nulls();
if (doris_column->is_column_dictionary() &&
assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
std::vector<StringRef> dict_items;
dict_items.reserve(_dict_items.size());
for (int i = 0; i < _dict_items.size(); ++i) {
dict_items.emplace_back((char*)(&_dict_items[i]), _type_length);
}
assert_cast<ColumnDictI32&>(*doris_column)
.insert_many_dict_data(&dict_items[0], dict_items.size());
}
_indexes.resize(non_null_size);
_index_batch_decoder->GetBatch(&_indexes[0], non_null_size);
if (doris_column->is_column_dictionary()) {
return _decode_dict_values(doris_column, select_vector);
}
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \
case NUMERIC_TYPE: \
if constexpr (std::is_same_v<T, PHYSICAL_TYPE>) { \
return _decode_numeric<CPP_NUMERIC_TYPE>(doris_column, select_vector); \
}
FOR_LOGICAL_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
case TypeIndex::Date:
if constexpr (std::is_same_v<T, Int32>) {
return _decode_date<VecDateTimeValue, Int64>(doris_column, select_vector);
}
break;
case TypeIndex::DateV2:
if constexpr (std::is_same_v<T, Int32>) {
return _decode_date<DateV2Value<DateV2ValueType>, UInt32>(doris_column,
select_vector);
}
break;
case TypeIndex::DateTime:
if constexpr (std::is_same_v<T, ParquetInt96>) {
return _decode_datetime96<VecDateTimeValue, Int64>(doris_column, select_vector);
} else if constexpr (std::is_same_v<T, Int64>) {
return _decode_datetime64<VecDateTimeValue, Int64>(doris_column, select_vector);
}
break;
case TypeIndex::DateTimeV2:
// Spark can set the timestamp precision by the following configuration:
// spark.sql.parquet.outputTimestampType = INT96(NANOS), TIMESTAMP_MICROS, TIMESTAMP_MILLIS
if constexpr (std::is_same_v<T, ParquetInt96>) {
return _decode_datetime96<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column,
select_vector);
} else if constexpr (std::is_same_v<T, Int64>) {
return _decode_datetime64<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column,
select_vector);
}
break;
case TypeIndex::Decimal32:
if constexpr (std::is_same_v<T, Int32>) {
return _decode_primitive_decimal<Int32, Int32>(doris_column, data_type,
select_vector);
} else if constexpr (std::is_same_v<T, Int64>) {
return _decode_primitive_decimal<Int32, Int64>(doris_column, data_type,
select_vector);
}
break;
case TypeIndex::Decimal64:
if constexpr (std::is_same_v<T, Int32>) {
return _decode_primitive_decimal<Int64, Int32>(doris_column, data_type,
select_vector);
} else if constexpr (std::is_same_v<T, Int64>) {
return _decode_primitive_decimal<Int64, Int64>(doris_column, data_type,
select_vector);
}
break;
case TypeIndex::Decimal128:
if constexpr (std::is_same_v<T, Int32>) {
return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type,
select_vector);
} else if constexpr (std::is_same_v<T, Int64>) {
return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type,
select_vector);
}
break;
case TypeIndex::Decimal128I:
if constexpr (std::is_same_v<T, Int32>) {
return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type,
select_vector);
} else if constexpr (std::is_same_v<T, Int64>) {
return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type,
select_vector);
}
break;
case TypeIndex::String:
case TypeIndex::FixedString:
break;
default:
break;
}
return Status::InvalidArgument(
"Can't decode parquet physical type {} to doris logical type {}",
tparquet::to_string(_physical_type), getTypeName(logical_type));
}
Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) override {
if (num_values * _type_length != length) {
return Status::Corruption("Wrong dictionary data for fixed length type");
}
_dict = std::move(dict);
char* dict_item_address = reinterpret_cast<char*>(_dict.get());
_dict_items.resize(num_values);
for (size_t i = 0; i < num_values; ++i) {
_dict_items[i] = *(T*)dict_item_address;
dict_item_address += _type_length;
}
return Status::OK();
}
protected:
template <typename Numeric>
Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
column_data[data_index++] =
static_cast<Numeric>(_dict_items[_indexes[dict_index++]]);
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
dict_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
template <typename CppType, typename ColumnType>
Status _decode_date(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
int64_t date_value = _dict_items[_indexes[dict_index++]];
auto& v = reinterpret_cast<CppType&>(column_data[data_index++]);
v.from_unixtime(date_value * 24 * 60 * 60,
*_decode_params->ctz); // day to seconds
if constexpr (std::is_same_v<CppType, VecDateTimeValue>) {
// we should cast to date if using date v1.
v.cast_to_date();
}
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
dict_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
template <typename CppType, typename ColumnType>
Status _decode_datetime64(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
int64_t date_value = _dict_items[_indexes[dict_index++]];
auto& v = reinterpret_cast<CppType&>(column_data[data_index++]);
v.from_unixtime(date_value / _decode_params->second_mask, *_decode_params->ctz);
if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) {
// nanoseconds will be ignored.
v.set_microsecond((date_value % _decode_params->second_mask) *
_decode_params->scale_to_nano_factor / 1000);
// TODO: the precision of datetime v1
}
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
dict_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
template <typename CppType, typename ColumnType>
Status _decode_datetime96(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
ParquetInt96& datetime96 = _dict_items[_indexes[dict_index++]];
auto& v = reinterpret_cast<CppType&>(column_data[data_index++]);
int64_t micros = datetime96.to_timestamp_micros();
v.from_unixtime(micros / 1000000, *_decode_params->ctz);
if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) {
// spark.sql.parquet.outputTimestampType = INT96(NANOS) will lost precision.
// only keep microseconds.
v.set_microsecond(micros % 1000000);
}
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
dict_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
template <typename DecimalPrimitiveType, typename DecimalPhysicalType>
Status _decode_primitive_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
init_decimal_converter<DecimalPrimitiveType>(data_type);
auto& column_data =
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column)
.get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
size_t dict_index = 0;
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
// we should use decimal128 to scale up/down
Int128 value = static_cast<Int128>(_dict_items[_indexes[dict_index++]]);
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
value *= scale_params.scale_factor;
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) {
value /= scale_params.scale_factor;
}
auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[data_index++]);
v = (DecimalPrimitiveType)value;
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
dict_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
tparquet::Type::type _physical_type;
// For dictionary encoding
std::vector<T> _dict_items;
};
template <>
class FixLengthDictDecoder<char*> final : public BaseDictDecoder {
public:
FixLengthDictDecoder(tparquet::Type::type physical_type)
: BaseDictDecoder(), _physical_type(physical_type) {};
~FixLengthDictDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) override {
size_t non_null_size = select_vector.num_values() - select_vector.num_nulls();
if (doris_column->is_column_dictionary() &&
assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
std::vector<StringRef> dict_items;
dict_items.reserve(_dict_items.size());
for (int i = 0; i < _dict_items.size(); ++i) {
dict_items.emplace_back(_dict_items[i], _type_length);
}
assert_cast<ColumnDictI32&>(*doris_column)
.insert_many_dict_data(&dict_items[0], dict_items.size());
}
_indexes.resize(non_null_size);
_index_batch_decoder->GetBatch(&_indexes[0], non_null_size);
if (doris_column->is_column_dictionary()) {
return _decode_dict_values(doris_column, select_vector);
}
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
case TypeIndex::Decimal32:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
return _decode_binary_decimal<Int32>(doris_column, data_type, select_vector);
}
break;
case TypeIndex::Decimal64:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
return _decode_binary_decimal<Int64>(doris_column, data_type, select_vector);
}
break;
case TypeIndex::Decimal128:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
return _decode_binary_decimal<Int128>(doris_column, data_type, select_vector);
}
break;
case TypeIndex::Decimal128I:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
return _decode_binary_decimal<Int128>(doris_column, data_type, select_vector);
}
break;
case TypeIndex::String:
case TypeIndex::FixedString:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
return _decode_string(doris_column, select_vector);
}
break;
default:
break;
}
return Status::InvalidArgument(
"Can't decode parquet physical type {} to doris logical type {}",
tparquet::to_string(_physical_type), getTypeName(logical_type));
}
Status skip_values(size_t num_values) override {
_indexes.resize(num_values);
_index_batch_decoder->GetBatch(&_indexes[0], num_values);
return Status::OK();
}
Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) override {
if (num_values * _type_length != length) {
return Status::Corruption("Wrong dictionary data for fixed length type");
}
_dict = std::move(dict);
char* dict_item_address = reinterpret_cast<char*>(_dict.get());
_dict_items.resize(num_values);
for (size_t i = 0; i < num_values; ++i) {
_dict_items[i] = dict_item_address;
dict_item_address += _type_length;
}
return Status::OK();
}
protected:
template <typename DecimalPrimitiveType>
Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
init_decimal_converter<DecimalPrimitiveType>(data_type);
auto& column_data =
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column)
.get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
size_t dict_index = 0;
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _dict_items[_indexes[dict_index++]];
// When Decimal in parquet is stored in byte arrays, binary and fixed,
// the unscaled number must be encoded as two's complement using big-endian byte order.
Int128 value = buf_start[0] & 0x80 ? -1 : 0;
memcpy(reinterpret_cast<char*>(&value) + sizeof(Int128) - _type_length,
buf_start, _type_length);
value = BigEndian::ToHost128(value);
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
value *= scale_params.scale_factor;
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) {
value /= scale_params.scale_factor;
}
auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[data_index++]);
v = (DecimalPrimitiveType)value;
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
dict_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
Status _decode_string(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector) {
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
std::vector<StringRef> string_values;
string_values.reserve(run_length);
for (size_t i = 0; i < run_length; ++i) {
string_values.emplace_back(_dict_items[_indexes[dict_index++]], _type_length);
}
doris_column->insert_many_strings(&string_values[0], run_length);
break;
}
case ColumnSelectVector::NULL_DATA: {
doris_column->insert_many_defaults(run_length);
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
dict_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
tparquet::Type::type _physical_type;
// For dictionary encoding
std::vector<char*> _dict_items;
};
} // namespace doris::vectorized

View File

@ -0,0 +1,412 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/exec/format/parquet/fix_length_plain_decoder.h"
#include "gutil/endian.h"
#include "vec/columns/column_nullable.h"
#include "vec/data_types/data_type_nullable.h"
namespace doris::vectorized {
Status FixLengthPlainDecoder::skip_values(size_t num_values) {
_offset += _type_length * num_values;
if (UNLIKELY(_offset > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
}
return Status::OK();
}
Status FixLengthPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
size_t non_null_size = select_vector.num_values() - select_vector.num_nulls();
if (UNLIKELY(_offset + _type_length * non_null_size > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
}
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \
case NUMERIC_TYPE: \
return _decode_numeric<CPP_NUMERIC_TYPE>(doris_column, select_vector);
FOR_LOGICAL_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
case TypeIndex::Date:
if (_physical_type == tparquet::Type::INT32) {
return _decode_date<VecDateTimeValue, Int64>(doris_column, select_vector);
}
break;
case TypeIndex::DateV2:
if (_physical_type == tparquet::Type::INT32) {
return _decode_date<DateV2Value<DateV2ValueType>, UInt32>(doris_column, select_vector);
}
break;
case TypeIndex::DateTime:
if (_physical_type == tparquet::Type::INT96) {
return _decode_datetime96<VecDateTimeValue, Int64>(doris_column, select_vector);
} else if (_physical_type == tparquet::Type::INT64) {
return _decode_datetime64<VecDateTimeValue, Int64>(doris_column, select_vector);
}
break;
case TypeIndex::DateTimeV2:
// Spark can set the timestamp precision by the following configuration:
// spark.sql.parquet.outputTimestampType = INT96(NANOS), TIMESTAMP_MICROS, TIMESTAMP_MILLIS
if (_physical_type == tparquet::Type::INT96) {
return _decode_datetime96<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column,
select_vector);
} else if (_physical_type == tparquet::Type::INT64) {
return _decode_datetime64<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column,
select_vector);
}
break;
case TypeIndex::Decimal32:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
return _decode_binary_decimal<Int32>(doris_column, data_type, select_vector);
} else if (_physical_type == tparquet::Type::INT32) {
return _decode_primitive_decimal<Int32, Int32>(doris_column, data_type, select_vector);
} else if (_physical_type == tparquet::Type::INT64) {
return _decode_primitive_decimal<Int32, Int64>(doris_column, data_type, select_vector);
}
break;
case TypeIndex::Decimal64:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
return _decode_binary_decimal<Int64>(doris_column, data_type, select_vector);
} else if (_physical_type == tparquet::Type::INT32) {
return _decode_primitive_decimal<Int64, Int32>(doris_column, data_type, select_vector);
} else if (_physical_type == tparquet::Type::INT64) {
return _decode_primitive_decimal<Int64, Int64>(doris_column, data_type, select_vector);
}
break;
case TypeIndex::Decimal128:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
return _decode_binary_decimal<Int128>(doris_column, data_type, select_vector);
} else if (_physical_type == tparquet::Type::INT32) {
return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, select_vector);
} else if (_physical_type == tparquet::Type::INT64) {
return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, select_vector);
}
break;
case TypeIndex::Decimal128I:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
return _decode_binary_decimal<Int128>(doris_column, data_type, select_vector);
} else if (_physical_type == tparquet::Type::INT32) {
return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, select_vector);
} else if (_physical_type == tparquet::Type::INT64) {
return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, select_vector);
}
break;
case TypeIndex::String:
case TypeIndex::FixedString:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
return _decode_string(doris_column, select_vector);
}
break;
default:
break;
}
return Status::InvalidArgument("Can't decode parquet physical type {} to doris logical type {}",
tparquet::to_string(_physical_type), getTypeName(logical_type));
}
Status FixLengthPlainDecoder::_decode_string(MutableColumnPtr& doris_column,
ColumnSelectVector& select_vector) {
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
std::vector<StringRef> string_values;
string_values.reserve(run_length);
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _data->data + _offset;
string_values.emplace_back(buf_start, _type_length);
_offset += _type_length;
}
doris_column->insert_many_strings(&string_values[0], run_length);
break;
}
case ColumnSelectVector::NULL_DATA: {
doris_column->insert_many_defaults(run_length);
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
_offset += _type_length * run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
template <typename Numeric>
Status FixLengthPlainDecoder::_decode_numeric(MutableColumnPtr& doris_column,
ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _data->data + _offset;
column_data[data_index++] = *(Numeric*)buf_start;
_offset += _type_length;
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
_offset += _type_length * run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
template <typename CppType, typename ColumnType>
Status FixLengthPlainDecoder::_decode_date(MutableColumnPtr& doris_column,
ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _data->data + _offset;
int64_t date_value = static_cast<int64_t>(*reinterpret_cast<int32_t*>(buf_start));
auto& v = reinterpret_cast<CppType&>(column_data[data_index++]);
v.from_unixtime(date_value * 24 * 60 * 60, *_decode_params->ctz); // day to seconds
if constexpr (std::is_same_v<CppType, VecDateTimeValue>) {
// we should cast to date if using date v1.
v.cast_to_date();
}
_offset += _type_length;
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
_offset += _type_length * run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
template <typename CppType, typename ColumnType>
Status FixLengthPlainDecoder::_decode_datetime64(MutableColumnPtr& doris_column,
ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _data->data + _offset;
int64_t& date_value = *reinterpret_cast<int64_t*>(buf_start);
auto& v = reinterpret_cast<CppType&>(column_data[data_index++]);
v.from_unixtime(date_value / _decode_params->second_mask, *_decode_params->ctz);
if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) {
// nanoseconds will be ignored.
v.set_microsecond((date_value % _decode_params->second_mask) *
_decode_params->scale_to_nano_factor / 1000);
// TODO: the precision of datetime v1
}
_offset += _type_length;
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
_offset += _type_length * run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
template <typename CppType, typename ColumnType>
Status FixLengthPlainDecoder::_decode_datetime96(MutableColumnPtr& doris_column,
ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _data->data + _offset;
ParquetInt96& datetime96 = *reinterpret_cast<ParquetInt96*>(buf_start);
auto& v = reinterpret_cast<CppType&>(column_data[data_index++]);
int64_t micros = datetime96.to_timestamp_micros();
v.from_unixtime(micros / 1000000, *_decode_params->ctz);
if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) {
// spark.sql.parquet.outputTimestampType = INT96(NANOS) will lost precision.
// only keep microseconds.
v.set_microsecond(micros % 1000000);
}
_offset += _type_length;
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
_offset += _type_length * run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
template <typename DecimalPrimitiveType>
Status FixLengthPlainDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
init_decimal_converter<DecimalPrimitiveType>(data_type);
auto& column_data =
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _data->data + _offset;
// When Decimal in parquet is stored in byte arrays, binary and fixed,
// the unscaled number must be encoded as two's complement using big-endian byte order.
Int128 value = buf_start[0] & 0x80 ? -1 : 0;
memcpy(reinterpret_cast<char*>(&value) + sizeof(Int128) - _type_length, buf_start,
_type_length);
value = BigEndian::ToHost128(value);
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
value *= scale_params.scale_factor;
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) {
value /= scale_params.scale_factor;
}
auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[data_index++]);
v = (DecimalPrimitiveType)value;
_offset += _type_length;
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
_offset += _type_length * run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
template <typename DecimalPrimitiveType, typename DecimalPhysicalType>
Status FixLengthPlainDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
init_decimal_converter<DecimalPrimitiveType>(data_type);
auto& column_data =
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _data->data + _offset;
// we should use decimal128 to scale up/down
Int128 value = *reinterpret_cast<DecimalPhysicalType*>(buf_start);
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
value *= scale_params.scale_factor;
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) {
value /= scale_params.scale_factor;
}
auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[data_index++]);
v = (DecimalPrimitiveType)value;
_offset += _type_length;
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
_offset += _type_length * run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "common/status.h"
#include "gen_cpp/parquet_types.h"
#include "util/slice.h"
#include "vec/columns/column.h"
#include "vec/data_types/data_type.h"
#include "vec/exec/format/parquet/decoder.h"
#include "vec/exec/format/parquet/parquet_common.h"
namespace doris::vectorized {
class FixLengthPlainDecoder final : public Decoder {
public:
FixLengthPlainDecoder(tparquet::Type::type physical_type) : _physical_type(physical_type) {};
~FixLengthPlainDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) override;
Status skip_values(size_t num_values) override;
protected:
template <typename Numeric>
Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector);
template <typename CppType, typename ColumnType>
Status _decode_date(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector);
template <typename CppType, typename ColumnType>
Status _decode_datetime64(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector);
template <typename CppType, typename ColumnType>
Status _decode_datetime96(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector);
template <typename DecimalPrimitiveType>
Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector);
template <typename DecimalPrimitiveType, typename DecimalPhysicalType>
Status _decode_primitive_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector);
Status _decode_string(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector);
tparquet::Type::type _physical_type;
};
} // namespace doris::vectorized

View File

@ -23,24 +23,10 @@
namespace doris::vectorized {
const cctz::time_zone DecodeParams::utc0 = cctz::utc_time_zone();
const uint32_t ParquetInt96::JULIAN_EPOCH_OFFSET_DAYS = 2440588;
const uint64_t ParquetInt96::MICROS_IN_DAY = 86400000000;
const uint64_t ParquetInt96::NANOS_PER_MICROSECOND = 1000;
#define FOR_LOGICAL_NUMERIC_TYPES(M) \
M(TypeIndex::Int8, Int8) \
M(TypeIndex::UInt8, UInt8) \
M(TypeIndex::Int16, Int16) \
M(TypeIndex::UInt16, UInt16) \
M(TypeIndex::Int32, Int32) \
M(TypeIndex::UInt32, UInt32) \
M(TypeIndex::Int64, Int64) \
M(TypeIndex::UInt64, UInt64) \
M(TypeIndex::Float32, Float32) \
M(TypeIndex::Float64, Float64)
ColumnSelectVector::ColumnSelectVector(const uint8_t* filter_map, size_t filter_map_size,
bool filter_all) {
build(filter_map, filter_map_size, filter_all);
@ -200,511 +186,4 @@ size_t ColumnSelectVector::get_next_run(DataReadType* data_read_type) {
return run_length;
}
}
Status Decoder::get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding,
std::unique_ptr<Decoder>& decoder) {
switch (encoding) {
case tparquet::Encoding::PLAIN:
case tparquet::Encoding::RLE_DICTIONARY:
switch (type) {
case tparquet::Type::BOOLEAN:
if (encoding != tparquet::Encoding::PLAIN) {
return Status::InternalError("Bool type can't has dictionary page");
}
decoder.reset(new BoolPlainDecoder());
break;
case tparquet::Type::BYTE_ARRAY:
decoder.reset(new ByteArrayDecoder());
break;
case tparquet::Type::INT32:
case tparquet::Type::INT64:
case tparquet::Type::INT96:
case tparquet::Type::FLOAT:
case tparquet::Type::DOUBLE:
case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
decoder.reset(new FixLengthDecoder(type));
break;
default:
return Status::InternalError("Unsupported type {}(encoding={}) in parquet decoder",
tparquet::to_string(type), tparquet::to_string(encoding));
}
break;
default:
return Status::InternalError("Unsupported encoding {}(type={}) in parquet decoder",
tparquet::to_string(encoding), tparquet::to_string(type));
}
return Status::OK();
}
void Decoder::init(FieldSchema* field_schema, cctz::time_zone* ctz) {
_field_schema = field_schema;
if (_decode_params == nullptr) {
_decode_params.reset(new DecodeParams());
}
if (ctz != nullptr) {
_decode_params->ctz = ctz;
}
const auto& schema = field_schema->parquet_schema;
if (schema.__isset.logicalType && schema.logicalType.__isset.TIMESTAMP) {
const auto& timestamp_info = schema.logicalType.TIMESTAMP;
if (!timestamp_info.isAdjustedToUTC) {
// should set timezone to utc+0
_decode_params->ctz = const_cast<cctz::time_zone*>(&_decode_params->utc0);
}
const auto& time_unit = timestamp_info.unit;
if (time_unit.__isset.MILLIS) {
_decode_params->second_mask = 1000;
_decode_params->scale_to_nano_factor = 1000000;
} else if (time_unit.__isset.MICROS) {
_decode_params->second_mask = 1000000;
_decode_params->scale_to_nano_factor = 1000;
} else if (time_unit.__isset.NANOS) {
_decode_params->second_mask = 1000000000;
_decode_params->scale_to_nano_factor = 1;
}
} else if (schema.__isset.converted_type) {
const auto& converted_type = schema.converted_type;
if (converted_type == tparquet::ConvertedType::TIMESTAMP_MILLIS) {
_decode_params->second_mask = 1000;
_decode_params->scale_to_nano_factor = 1000000;
} else if (converted_type == tparquet::ConvertedType::TIMESTAMP_MICROS) {
_decode_params->second_mask = 1000000;
_decode_params->scale_to_nano_factor = 1000;
}
}
}
Status Decoder::_decode_dict_values(MutableColumnPtr& doris_column,
ColumnSelectVector& select_vector) {
DCHECK(doris_column->is_column_dictionary());
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
auto& column_data = assert_cast<ColumnDictI32&>(*doris_column).get_data();
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
uint32_t* start_index = &_indexes[0];
column_data.insert(start_index + dict_index, start_index + dict_index + run_length);
dict_index += run_length;
break;
}
case ColumnSelectVector::NULL_DATA: {
doris_column->insert_many_defaults(run_length);
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
dict_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_NULL: {
break;
}
}
}
return Status::OK();
}
Status FixLengthDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length,
size_t num_values) {
if (num_values * _type_length != length) {
return Status::Corruption("Wrong dictionary data for fixed length type");
}
_has_dict = true;
_dict = std::move(dict);
char* dict_item_address = reinterpret_cast<char*>(_dict.get());
_dict_items.resize(num_values);
for (size_t i = 0; i < num_values; ++i) {
_dict_items[i] = dict_item_address;
dict_item_address += _type_length;
}
return Status::OK();
}
void FixLengthDecoder::set_data(Slice* data) {
_data = data;
_offset = 0;
if (_has_dict) {
uint8_t bit_width = *data->data;
_index_batch_decoder.reset(
new RleBatchDecoder<uint32_t>(reinterpret_cast<uint8_t*>(data->data) + 1,
static_cast<int>(data->size) - 1, bit_width));
}
}
Status FixLengthDecoder::skip_values(size_t num_values) {
if (_has_dict) {
_indexes.resize(num_values);
_index_batch_decoder->GetBatch(&_indexes[0], num_values);
} else {
_offset += _type_length * num_values;
if (UNLIKELY(_offset > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
}
}
return Status::OK();
}
Status FixLengthDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
size_t non_null_size = select_vector.num_values() - select_vector.num_nulls();
if (_has_dict) {
if (doris_column->is_column_dictionary() &&
assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
std::vector<StringRef> dict_items;
dict_items.reserve(_dict_items.size());
for (int i = 0; i < _dict_items.size(); ++i) {
dict_items.emplace_back(_dict_items[i], _type_length);
}
assert_cast<ColumnDictI32&>(*doris_column)
.insert_many_dict_data(&dict_items[0], dict_items.size());
}
_indexes.resize(non_null_size);
_index_batch_decoder->GetBatch(&_indexes[0], non_null_size);
} else if (UNLIKELY(_offset + _type_length * non_null_size > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
}
if (doris_column->is_column_dictionary()) {
return _decode_dict_values(doris_column, select_vector);
}
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
case NUMERIC_TYPE: \
return _decode_numeric<CPP_NUMERIC_TYPE>(doris_column, select_vector);
FOR_LOGICAL_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
case TypeIndex::Date:
if (_physical_type == tparquet::Type::INT32) {
return _decode_date<VecDateTimeValue, Int64>(doris_column, select_vector);
}
break;
case TypeIndex::DateV2:
if (_physical_type == tparquet::Type::INT32) {
return _decode_date<DateV2Value<DateV2ValueType>, UInt32>(doris_column, select_vector);
}
break;
case TypeIndex::DateTime:
if (_physical_type == tparquet::Type::INT96) {
return _decode_datetime96<VecDateTimeValue, Int64>(doris_column, select_vector);
} else if (_physical_type == tparquet::Type::INT64) {
return _decode_datetime64<VecDateTimeValue, Int64>(doris_column, select_vector);
}
break;
case TypeIndex::DateTimeV2:
// Spark can set the timestamp precision by the following configuration:
// spark.sql.parquet.outputTimestampType = INT96(NANOS), TIMESTAMP_MICROS, TIMESTAMP_MILLIS
if (_physical_type == tparquet::Type::INT96) {
return _decode_datetime96<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column,
select_vector);
} else if (_physical_type == tparquet::Type::INT64) {
return _decode_datetime64<DateV2Value<DateTimeV2ValueType>, UInt64>(doris_column,
select_vector);
}
break;
case TypeIndex::Decimal32:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
return _decode_binary_decimal<Int32>(doris_column, data_type, select_vector);
} else if (_physical_type == tparquet::Type::INT32) {
return _decode_primitive_decimal<Int32, Int32>(doris_column, data_type, select_vector);
} else if (_physical_type == tparquet::Type::INT64) {
return _decode_primitive_decimal<Int32, Int64>(doris_column, data_type, select_vector);
}
break;
case TypeIndex::Decimal64:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
return _decode_binary_decimal<Int64>(doris_column, data_type, select_vector);
} else if (_physical_type == tparquet::Type::INT32) {
return _decode_primitive_decimal<Int64, Int32>(doris_column, data_type, select_vector);
} else if (_physical_type == tparquet::Type::INT64) {
return _decode_primitive_decimal<Int64, Int64>(doris_column, data_type, select_vector);
}
break;
case TypeIndex::Decimal128:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
return _decode_binary_decimal<Int128>(doris_column, data_type, select_vector);
} else if (_physical_type == tparquet::Type::INT32) {
return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, select_vector);
} else if (_physical_type == tparquet::Type::INT64) {
return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, select_vector);
}
break;
case TypeIndex::Decimal128I:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
return _decode_binary_decimal<Int128>(doris_column, data_type, select_vector);
} else if (_physical_type == tparquet::Type::INT32) {
return _decode_primitive_decimal<Int128, Int32>(doris_column, data_type, select_vector);
} else if (_physical_type == tparquet::Type::INT64) {
return _decode_primitive_decimal<Int128, Int64>(doris_column, data_type, select_vector);
}
break;
case TypeIndex::String:
case TypeIndex::FixedString:
if (_physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
return _decode_string(doris_column, select_vector);
}
break;
default:
break;
}
return Status::InvalidArgument("Can't decode parquet physical type {} to doris logical type {}",
tparquet::to_string(_physical_type), getTypeName(logical_type));
}
Status FixLengthDecoder::_decode_string(MutableColumnPtr& doris_column,
ColumnSelectVector& select_vector) {
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
std::vector<StringRef> string_values;
string_values.reserve(run_length);
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
string_values.emplace_back(buf_start, _type_length);
_FIXED_SHIFT_DATA_OFFSET();
}
doris_column->insert_many_strings(&string_values[0], run_length);
break;
}
case ColumnSelectVector::NULL_DATA: {
doris_column->insert_many_defaults(run_length);
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
if (_has_dict) {
dict_index += run_length;
} else {
_offset += _type_length * run_length;
}
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
Status ByteArrayDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length,
size_t num_values) {
_has_dict = true;
_dict = std::move(dict);
_dict_items.reserve(num_values);
uint32_t offset_cursor = 0;
char* dict_item_address = reinterpret_cast<char*>(_dict.get());
for (int i = 0; i < num_values; ++i) {
uint32_t l = decode_fixed32_le(_dict.get() + offset_cursor);
offset_cursor += 4;
_dict_items.emplace_back(dict_item_address + offset_cursor, l);
offset_cursor += l;
if (offset_cursor > length) {
return Status::Corruption("Wrong data length in dictionary");
}
}
if (offset_cursor != length) {
return Status::Corruption("Wrong dictionary data for byte array type");
}
return Status::OK();
}
void ByteArrayDecoder::set_data(Slice* data) {
_data = data;
_offset = 0;
if (_has_dict) {
uint8_t bit_width = *data->data;
_index_batch_decoder.reset(
new RleBatchDecoder<uint32_t>(reinterpret_cast<uint8_t*>(data->data) + 1,
static_cast<int>(data->size) - 1, bit_width));
}
}
Status ByteArrayDecoder::skip_values(size_t num_values) {
if (_has_dict) {
_indexes.resize(num_values);
_index_batch_decoder->GetBatch(&_indexes[0], num_values);
} else {
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
uint32_t length =
decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
return Status::IOError("Can't skip enough bytes in plain decoder");
}
_offset += length;
}
}
return Status::OK();
}
Status ByteArrayDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
size_t non_null_size = select_vector.num_values() - select_vector.num_nulls();
if (_has_dict) {
if (doris_column->is_column_dictionary() &&
assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
assert_cast<ColumnDictI32&>(*doris_column)
.insert_many_dict_data(&_dict_items[0], _dict_items.size());
}
_indexes.resize(non_null_size);
_index_batch_decoder->GetBatch(&_indexes[0], non_null_size);
}
if (doris_column->is_column_dictionary()) {
return _decode_dict_values(doris_column, select_vector);
}
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
case TypeIndex::String:
case TypeIndex::FixedString: {
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
std::vector<StringRef> string_values;
string_values.reserve(run_length);
for (size_t i = 0; i < run_length; ++i) {
if (_has_dict) {
string_values.emplace_back(_dict_items[_indexes[dict_index++]]);
} else {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError(
"Can't read byte array length from plain decoder");
}
uint32_t length = decode_fixed32_le(
reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
return Status::IOError("Can't read enough bytes in plain decoder");
}
string_values.emplace_back(_data->data + _offset, length);
_offset += length;
}
}
doris_column->insert_many_strings(&string_values[0], run_length);
break;
}
case ColumnSelectVector::NULL_DATA: {
doris_column->insert_many_defaults(run_length);
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
if (_has_dict) {
dict_index += run_length;
} else {
for (int i = 0; i < run_length; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError(
"Can't read byte array length from plain decoder");
}
uint32_t length = decode_fixed32_le(
reinterpret_cast<const uint8_t*>(_data->data) + _offset);
_offset += 4;
if (UNLIKELY(_offset + length) > _data->size) {
return Status::IOError("Can't read enough bytes in plain decoder");
}
_offset += length;
}
}
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
case TypeIndex::Decimal32:
return _decode_binary_decimal<Int32>(doris_column, data_type, select_vector);
case TypeIndex::Decimal64:
return _decode_binary_decimal<Int64>(doris_column, data_type, select_vector);
case TypeIndex::Decimal128:
return _decode_binary_decimal<Int128>(doris_column, data_type, select_vector);
case TypeIndex::Decimal128I:
return _decode_binary_decimal<Int128>(doris_column, data_type, select_vector);
default:
break;
}
return Status::InvalidArgument(
"Can't decode parquet physical type BYTE_ARRAY to doris logical type {}",
getTypeName(logical_type));
}
Status BoolPlainDecoder::skip_values(size_t num_values) {
int skip_cached = std::min(num_unpacked_values_ - unpacked_value_idx_, (int)num_values);
unpacked_value_idx_ += skip_cached;
if (skip_cached == num_values) {
return Status::OK();
}
int num_remaining = num_values - skip_cached;
int num_to_skip = BitUtil::RoundDownToPowerOf2(num_remaining, 32);
if (num_to_skip > 0) {
bool_values_.SkipBatch(1, num_to_skip);
}
num_remaining -= num_to_skip;
if (num_remaining > 0) {
DCHECK_LE(num_remaining, UNPACKED_BUFFER_LEN);
num_unpacked_values_ =
bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
if (UNLIKELY(num_unpacked_values_ < num_remaining)) {
return Status::IOError("Can't skip enough booleans in plain decoder");
}
unpacked_value_idx_ = num_remaining;
}
return Status::OK();
}
Status BoolPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<UInt8>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
bool value;
for (size_t i = 0; i < run_length; ++i) {
if (UNLIKELY(!_decode_value(&value))) {
return Status::IOError("Can't read enough booleans in plain decoder");
}
column_data[data_index++] = (UInt8)value;
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
bool value;
for (int i = 0; i < run_length; ++i) {
if (UNLIKELY(!_decode_value(&value))) {
return Status::IOError("Can't read enough booleans in plain decoder");
}
}
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
} // namespace doris::vectorized

View File

@ -28,6 +28,7 @@
#include "util/rle_encoding.h"
#include "util/simd/bits.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_dictionary.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/common/int_exp.h"
@ -58,12 +59,13 @@ struct RowRange {
struct ParquetReadColumn {
ParquetReadColumn(int parquet_col_id, const std::string& file_slot_name)
: _parquet_col_id(parquet_col_id), _file_slot_name(file_slot_name) {}
: _parquet_col_id(parquet_col_id), _file_slot_name(file_slot_name) {};
int _parquet_col_id;
const std::string& _file_slot_name;
};
#pragma pack(1)
struct ParquetInt96 {
uint64_t lo; // time of nanoseconds in a day
uint32_t hi; // days from julian epoch
@ -76,16 +78,8 @@ struct ParquetInt96 {
static const uint64_t MICROS_IN_DAY;
static const uint64_t NANOS_PER_MICROSECOND;
};
struct DecodeParams {
// schema.logicalType.TIMESTAMP.isAdjustedToUTC == false
static const cctz::time_zone utc0;
// schema.logicalType.TIMESTAMP.isAdjustedToUTC == true, we should set the time zone
cctz::time_zone* ctz = nullptr;
int64_t second_mask = 1;
int64_t scale_to_nano_factor = 1;
DecimalScaleParams decimal_scale;
};
#pragma pack()
static_assert(sizeof(ParquetInt96) == 12, "The size of ParquetInt96 is not 12.");
class ColumnSelectVector {
public:
@ -146,561 +140,4 @@ private:
size_t _num_filtered;
size_t _read_index;
};
class Decoder {
public:
Decoder() = default;
virtual ~Decoder() = default;
static Status get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding,
std::unique_ptr<Decoder>& decoder);
// The type with fix length
void set_type_length(int32_t type_length) { _type_length = type_length; }
// Set the data to be decoded
virtual void set_data(Slice* data) {
_data = data;
_offset = 0;
}
void init(FieldSchema* field_schema, cctz::time_zone* ctz);
template <typename DecimalPrimitiveType>
void init_decimal_converter(DataTypePtr& data_type);
// Write the decoded values batch to doris's column
virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) = 0;
virtual Status skip_values(size_t num_values) = 0;
virtual Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) {
return Status::NotSupported("set_dict is not supported");
}
protected:
/**
* Decode dictionary-coded values into doris_column, ensure that doris_column is ColumnDictI32 type,
* and the coded values must be read into _indexes previously.
*/
Status _decode_dict_values(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector);
int32_t _type_length;
Slice* _data = nullptr;
uint32_t _offset = 0;
FieldSchema* _field_schema = nullptr;
std::unique_ptr<DecodeParams> _decode_params = nullptr;
// For dictionary encoding
bool _has_dict = false;
std::unique_ptr<uint8_t[]> _dict = nullptr;
std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
std::vector<uint32_t> _indexes;
};
template <typename DecimalPrimitiveType>
void Decoder::init_decimal_converter(DataTypePtr& data_type) {
if (_decode_params == nullptr || _field_schema == nullptr ||
_decode_params->decimal_scale.scale_type != DecimalScaleParams::NOT_INIT) {
return;
}
auto scale = _field_schema->parquet_schema.scale;
auto* decimal_type = reinterpret_cast<DataTypeDecimal<Decimal<DecimalPrimitiveType>>*>(
const_cast<IDataType*>(remove_nullable(data_type).get()));
auto dest_scale = decimal_type->get_scale();
if (dest_scale > scale) {
_decode_params->decimal_scale.scale_type = DecimalScaleParams::SCALE_UP;
_decode_params->decimal_scale.scale_factor =
DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(dest_scale - scale);
} else if (dest_scale < scale) {
_decode_params->decimal_scale.scale_type = DecimalScaleParams::SCALE_DOWN;
_decode_params->decimal_scale.scale_factor =
DecimalScaleParams::get_scale_factor<DecimalPrimitiveType>(scale - dest_scale);
} else {
_decode_params->decimal_scale.scale_type = DecimalScaleParams::NO_SCALE;
_decode_params->decimal_scale.scale_factor = 1;
}
}
class FixLengthDecoder final : public Decoder {
public:
FixLengthDecoder(tparquet::Type::type physical_type) : _physical_type(physical_type) {}
~FixLengthDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) override;
Status skip_values(size_t num_values) override;
Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) override;
void set_data(Slice* data) override;
protected:
template <typename Numeric>
Status _decode_numeric(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector);
template <typename CppType, typename ColumnType>
Status _decode_date(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector);
template <typename CppType, typename ColumnType>
Status _decode_datetime64(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector);
template <typename CppType, typename ColumnType>
Status _decode_datetime96(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector);
template <typename DecimalPrimitiveType>
Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector);
template <typename DecimalPrimitiveType, typename DecimalPhysicalType>
Status _decode_primitive_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector);
Status _decode_string(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector);
#define _FIXED_GET_DATA_OFFSET(index) \
_has_dict ? _dict_items[_indexes[index]] : _data->data + _offset
#define _FIXED_SHIFT_DATA_OFFSET() \
if (!_has_dict) _offset += _type_length
tparquet::Type::type _physical_type;
// For dictionary encoding
std::vector<char*> _dict_items;
};
template <typename Numeric>
Status FixLengthDecoder::_decode_numeric(MutableColumnPtr& doris_column,
ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
column_data[data_index++] = *(Numeric*)buf_start;
_FIXED_SHIFT_DATA_OFFSET();
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
if (_has_dict) {
dict_index += run_length;
} else {
_offset += _type_length * run_length;
}
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
template <typename CppType, typename ColumnType>
Status FixLengthDecoder::_decode_date(MutableColumnPtr& doris_column,
ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
int64_t date_value = static_cast<int64_t>(*reinterpret_cast<int32_t*>(buf_start));
auto& v = reinterpret_cast<CppType&>(column_data[data_index++]);
v.from_unixtime(date_value * 24 * 60 * 60, *_decode_params->ctz); // day to seconds
if constexpr (std::is_same_v<CppType, VecDateTimeValue>) {
// we should cast to date if using date v1.
v.cast_to_date();
}
_FIXED_SHIFT_DATA_OFFSET();
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
if (_has_dict) {
dict_index += run_length;
} else {
_offset += _type_length * run_length;
}
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
template <typename CppType, typename ColumnType>
Status FixLengthDecoder::_decode_datetime64(MutableColumnPtr& doris_column,
ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
int64_t& date_value = *reinterpret_cast<int64_t*>(buf_start);
auto& v = reinterpret_cast<CppType&>(column_data[data_index++]);
v.from_unixtime(date_value / _decode_params->second_mask, *_decode_params->ctz);
if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) {
// nanoseconds will be ignored.
v.set_microsecond((date_value % _decode_params->second_mask) *
_decode_params->scale_to_nano_factor / 1000);
// TODO: the precision of datetime v1
}
_FIXED_SHIFT_DATA_OFFSET();
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
if (_has_dict) {
dict_index += run_length;
} else {
_offset += _type_length * run_length;
}
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
template <typename CppType, typename ColumnType>
Status FixLengthDecoder::_decode_datetime96(MutableColumnPtr& doris_column,
ColumnSelectVector& select_vector) {
auto& column_data = static_cast<ColumnVector<ColumnType>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
size_t dict_index = 0;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
ParquetInt96& datetime96 = *reinterpret_cast<ParquetInt96*>(buf_start);
auto& v = reinterpret_cast<CppType&>(column_data[data_index++]);
int64_t micros = datetime96.to_timestamp_micros();
v.from_unixtime(micros / 1000000, *_decode_params->ctz);
if constexpr (std::is_same_v<CppType, DateV2Value<DateTimeV2ValueType>>) {
// spark.sql.parquet.outputTimestampType = INT96(NANOS) will lost precision.
// only keep microseconds.
v.set_microsecond(micros % 1000000);
}
_FIXED_SHIFT_DATA_OFFSET();
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
if (_has_dict) {
dict_index += run_length;
} else {
_offset += _type_length * run_length;
}
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
template <typename DecimalPrimitiveType>
Status FixLengthDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
init_decimal_converter<DecimalPrimitiveType>(data_type);
auto& column_data =
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
size_t dict_index = 0;
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
// When Decimal in parquet is stored in byte arrays, binary and fixed,
// the unscaled number must be encoded as two's complement using big-endian byte order.
Int128 value = buf_start[0] & 0x80 ? -1 : 0;
memcpy(reinterpret_cast<char*>(&value) + sizeof(Int128) - _type_length, buf_start,
_type_length);
value = BigEndian::ToHost128(value);
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
value *= scale_params.scale_factor;
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) {
value /= scale_params.scale_factor;
}
auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[data_index++]);
v = (DecimalPrimitiveType)value;
_FIXED_SHIFT_DATA_OFFSET();
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
if (_has_dict) {
dict_index += run_length;
} else {
_offset += _type_length * run_length;
}
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
template <typename DecimalPrimitiveType, typename DecimalPhysicalType>
Status FixLengthDecoder::_decode_primitive_decimal(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
init_decimal_converter<DecimalPrimitiveType>(data_type);
auto& column_data =
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
size_t dict_index = 0;
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _FIXED_GET_DATA_OFFSET(dict_index++);
// we should use decimal128 to scale up/down
Int128 value = *reinterpret_cast<DecimalPhysicalType*>(buf_start);
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
value *= scale_params.scale_factor;
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) {
value /= scale_params.scale_factor;
}
auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[data_index++]);
v = (DecimalPrimitiveType)value;
_FIXED_SHIFT_DATA_OFFSET();
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
if (_has_dict) {
dict_index += run_length;
} else {
_offset += _type_length * run_length;
}
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
class ByteArrayDecoder final : public Decoder {
public:
ByteArrayDecoder() = default;
~ByteArrayDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) override;
Status skip_values(size_t num_values) override;
void set_data(Slice* data) override;
Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) override;
protected:
template <typename DecimalPrimitiveType>
Status _decode_binary_decimal(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector);
// For dictionary encoding
std::vector<StringRef> _dict_items;
};
template <typename DecimalPrimitiveType>
Status ByteArrayDecoder::_decode_binary_decimal(MutableColumnPtr& doris_column,
DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
init_decimal_converter<DecimalPrimitiveType>(data_type);
auto& column_data =
static_cast<ColumnDecimal<Decimal<DecimalPrimitiveType>>&>(*doris_column).get_data();
size_t data_index = column_data.size();
column_data.resize(data_index + select_vector.num_values() - select_vector.num_filtered());
size_t dict_index = 0;
DecimalScaleParams& scale_params = _decode_params->decimal_scale;
ColumnSelectVector::DataReadType read_type;
while (size_t run_length = select_vector.get_next_run(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
char* buf_start;
uint32_t length;
if (_has_dict) {
StringRef& slice = _dict_items[_indexes[dict_index++]];
buf_start = const_cast<char*>(slice.data);
length = (uint32_t)slice.size;
} else {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
}
length = decode_fixed32_le(reinterpret_cast<const uint8_t*>(_data->data) +
_offset);
_offset += 4;
buf_start = _data->data + _offset;
_offset += length;
}
// When Decimal in parquet is stored in byte arrays, binary and fixed,
// the unscaled number must be encoded as two's complement using big-endian byte order.
Int128 value = buf_start[0] & 0x80 ? -1 : 0;
memcpy(reinterpret_cast<char*>(&value) + sizeof(Int128) - length, buf_start,
length);
value = BigEndian::ToHost128(value);
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) {
value *= scale_params.scale_factor;
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) {
value /= scale_params.scale_factor;
}
auto& v = reinterpret_cast<DecimalPrimitiveType&>(column_data[data_index++]);
v = (DecimalPrimitiveType)value;
}
break;
}
case ColumnSelectVector::NULL_DATA: {
data_index += run_length;
break;
}
case ColumnSelectVector::FILTERED_CONTENT: {
if (_has_dict) {
dict_index += run_length;
} else {
_offset += _type_length * run_length;
}
break;
}
case ColumnSelectVector::FILTERED_NULL: {
// do nothing
break;
}
}
}
return Status::OK();
}
/// Decoder bit-packed boolean-encoded values.
/// Implementation from https://github.com/apache/impala/blob/master/be/src/exec/parquet/parquet-bool-decoder.h
class BoolPlainDecoder final : public Decoder {
public:
BoolPlainDecoder() = default;
~BoolPlainDecoder() override = default;
// Set the data to be decoded
void set_data(Slice* data) override {
bool_values_.Reset((const uint8_t*)data->data, data->size);
num_unpacked_values_ = 0;
unpacked_value_idx_ = 0;
_offset = 0;
}
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) override;
Status skip_values(size_t num_values) override;
protected:
inline bool _decode_value(bool* value) {
if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) {
*value = unpacked_values_[unpacked_value_idx_++];
} else {
num_unpacked_values_ =
bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
if (UNLIKELY(num_unpacked_values_ == 0)) {
return false;
}
*value = unpacked_values_[0];
unpacked_value_idx_ = 1;
}
return true;
}
/// A buffer to store unpacked values. Must be a multiple of 32 size to use the
/// batch-oriented interface of BatchedBitReader. We use uint8_t instead of bool because
/// bit unpacking is only supported for unsigned integers. The values are converted to
/// bool when returned to the user.
static const int UNPACKED_BUFFER_LEN = 128;
uint8_t unpacked_values_[UNPACKED_BUFFER_LEN];
/// The number of valid values in 'unpacked_values_'.
int num_unpacked_values_ = 0;
/// The next value to return from 'unpacked_values_'.
int unpacked_value_idx_ = 0;
/// Bit packed decoder, used if 'encoding_' is PLAIN.
BatchedBitReader bool_values_;
};
} // namespace doris::vectorized

View File

@ -21,7 +21,10 @@
#include <vector>
#include "exec/olap_common.h"
#include "gutil/endian.h"
#include "parquet_common.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/exec/format/format_common.h"
namespace doris::vectorized {

View File

@ -23,10 +23,10 @@
#include <vector>
#include "common/status.h"
#include "decoder.h"
#include "gen_cpp/parquet_types.h"
#include "io/buffered_reader.h"
#include "level_decoder.h"
#include "parquet_common.h"
#include "schema_desc.h"
#include "util/block_compression.h"
#include "vparquet_page_reader.h"

View File

@ -21,7 +21,10 @@
#include <gen_cpp/parquet_types.h>
#include "schema_desc.h"
#include "vec/columns/column_array.h"
#include "vec/columns/column_nullable.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_nullable.h"
#include "vparquet_column_chunk_reader.h"
namespace doris::vectorized {
@ -96,6 +99,9 @@ Status ScalarColumnReader::init(io::FileReaderSPtr file, FieldSchema* field,
}
Status ScalarColumnReader::_skip_values(size_t num_values) {
if (num_values == 0) {
return Status::OK();
}
if (_chunk_reader->max_def_level() > 0) {
LevelDecoder& def_decoder = _chunk_reader->def_level_decoder();
size_t skipped = 0;
@ -114,8 +120,12 @@ Status ScalarColumnReader::_skip_values(size_t num_values) {
}
skipped += loop_skip;
}
RETURN_IF_ERROR(_chunk_reader->skip_values(null_size, false));
RETURN_IF_ERROR(_chunk_reader->skip_values(nonnull_size, true));
if (null_size > 0) {
RETURN_IF_ERROR(_chunk_reader->skip_values(null_size, false));
}
if (nonnull_size > 0) {
RETURN_IF_ERROR(_chunk_reader->skip_values(nonnull_size, true));
}
} else {
RETURN_IF_ERROR(_chunk_reader->skip_values(num_values));
}

View File

@ -18,6 +18,7 @@
#include "vparquet_group_reader.h"
#include "schema_desc.h"
#include "util/simd/bits.h"
#include "vec/columns/column_const.h"
#include "vparquet_column_reader.h"

View File

@ -21,6 +21,7 @@
#include "exec/olap_common.h"
#include "parquet_common.h"
#include "schema_desc.h"
namespace doris::vectorized {

View File

@ -544,6 +544,11 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
_statistics.read_rows += row_group.num_rows;
};
if (_lazy_read_ctx.vconjunct_ctx == nullptr) {
read_whole_row_group();
return Status::OK();
}
if (_colname_to_value_range == nullptr || _colname_to_value_range->empty()) {
read_whole_row_group();
return Status::OK();

View File

@ -21,6 +21,7 @@
#include "table_format_reader.h"
#include "vec/columns/column_dictionary.h"
#include "vec/exec/format/format_common.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/format/parquet/parquet_common.h"
#include "vec/exprs/vexpr.h"