[opt](parquet-reader) Opt parquet decimal type reading. (#29825)

This commit is contained in:
Qi Chen
2024-01-11 20:45:53 +08:00
committed by yiguolei
parent ad986a78ae
commit d494674ff4
5 changed files with 137 additions and 50 deletions

View File

@ -103,14 +103,29 @@ Status FixLengthPlainDecoder<PhysicalType>::_decode_string(MutableColumnPtr& dor
while (size_t run_length = select_vector.get_next_run<has_filter>(&read_type)) {
switch (read_type) {
case ColumnSelectVector::CONTENT: {
std::vector<StringRef> string_values;
string_values.reserve(run_length);
for (size_t i = 0; i < run_length; ++i) {
char* buf_start = _data->data + _offset;
string_values.emplace_back(buf_start, _type_length);
_offset += _type_length;
auto* column_string = assert_cast<ColumnString*>(doris_column.get());
auto& chars = column_string->get_chars();
auto& offsets = column_string->get_offsets();
size_t bytes_size = chars.size();
// copy chars
size_t data_size = run_length * _type_length;
size_t old_size = chars.size();
chars.resize(old_size + data_size);
memcpy(chars.data() + old_size, _data->data, data_size);
// copy offsets
offsets.resize(offsets.size() + run_length);
auto* offsets_data = offsets.data() + offsets.size() - run_length;
int i = 0;
for (; i < run_length; i++) {
bytes_size += _type_length;
*(offsets_data++) = bytes_size;
}
doris_column->insert_many_strings(&string_values[0], run_length);
//doris_column->insert_many_strings_fixed_length<_type_length>(&string_values[0], run_length);
_offset += data_size;
break;
}
case ColumnSelectVector::NULL_DATA: {

View File

@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/parquet_types.h>
@ -219,7 +221,7 @@ struct ColumnConvert {
}
public:
ConvertParams* _convert_params = nullptr;
std::unique_ptr<ConvertParams> _convert_params;
};
template <tparquet::Type::type parquet_physical_type, typename dst_type>
@ -349,6 +351,91 @@ public:
}
};
template <typename DecimalType, DecimalScaleParams::ScaleType ScaleType>
class FixedStringToDecimal : public ColumnConvert {
public:
FixedStringToDecimal(int32_t type_length) : ColumnConvert(), _type_length(type_length) {}
Status convert(ColumnPtr& src_col, MutableColumnPtr& dst_col) override {
convert_null(src_col, dst_col);
#define M(FixedTypeLength, ValueCopyType) \
case FixedTypeLength: \
return _convert_internal<FixedTypeLength, ValueCopyType>(src_col, dst_col);
#define APPLY_FOR_DECIMALS() \
M(1, int64_t) \
M(2, int64_t) \
M(3, int64_t) \
M(4, int64_t) \
M(5, int64_t) \
M(6, int64_t) \
M(7, int64_t) \
M(8, int64_t) \
M(9, int128_t) \
M(10, int128_t) \
M(11, int128_t) \
M(12, int128_t) \
M(13, int128_t) \
M(14, int128_t) \
M(15, int128_t) \
M(16, int128_t)
switch (_type_length) {
APPLY_FOR_DECIMALS()
default:
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
}
return Status::OK();
#undef APPLY_FOR_DECIMALS
#undef M
}
template <int fixed_type_length, typename ValueCopyType>
Status _convert_internal(ColumnPtr& src_col, MutableColumnPtr& dst_col) {
size_t rows = src_col->size();
DecimalScaleParams& scale_params = _convert_params->decimal_scale;
auto buf = static_cast<const ColumnString*>(src_col.get())->get_chars().data();
auto& offset = static_cast<const ColumnString*>(src_col.get())->get_offsets();
size_t start_idx = dst_col->size();
dst_col->resize(start_idx + rows);
auto& data = static_cast<ColumnDecimal<DecimalType>*>(dst_col.get())->get_data();
for (int i = 0; i < rows; i++) {
size_t len = offset[i] - offset[i - 1];
if (len == 0) {
continue;
}
// When Decimal in parquet is stored in byte arrays, binary and fixed,
// the unscaled number must be encoded as two's complement using big-endian byte order.
ValueCopyType value = 0;
//memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], fixed_type_length);
// For performance, we can copy sizeof(value) because `ColumnDecimal::Container` use `PaddedPODArray` which has 15 bytes pad_right and the max fixed_type_length is 16 bytes.
memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], sizeof(value));
//memcpy(reinterpret_cast<char*>(&value), buf + offset[i - 1], len);
value = BitUtil::big_endian_to_host(value);
value = value >> ((sizeof(value) - fixed_type_length) * 8);
if constexpr (ScaleType == DecimalScaleParams::SCALE_UP) {
value *= scale_params.scale_factor;
} else if constexpr (ScaleType == DecimalScaleParams::SCALE_DOWN) {
value /= scale_params.scale_factor;
} else if constexpr (ScaleType == DecimalScaleParams::NO_SCALE) {
// do nothing
} else {
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
}
auto& v = reinterpret_cast<DecimalType&>(data[start_idx + i]);
v = (DecimalType)value;
}
return Status::OK();
}
private:
int32_t _type_length;
};
template <typename DecimalType, typename ValueCopyType, DecimalScaleParams::ScaleType ScaleType>
class StringToDecimal : public ColumnConvert {
public:
@ -497,8 +584,11 @@ public:
inline Status get_converter(tparquet::Type::type parquet_physical_type, PrimitiveType show_type,
std::shared_ptr<const IDataType> dst_data_type,
std::unique_ptr<ColumnConvert>* converter,
ConvertParams* convert_params) {
std::unique_ptr<ColumnConvert>* converter, FieldSchema* field_schema,
cctz::time_zone* ctz) {
std::unique_ptr<ParquetConvert::ConvertParams> convert_params =
std::make_unique<ParquetConvert::ConvertParams>();
convert_params->init(field_schema, ctz);
auto dst_type = remove_nullable(dst_data_type)->get_type_id();
switch (dst_type) {
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE, PHYSICAL_TYPE) \
@ -592,34 +682,18 @@ inline Status get_converter(tparquet::Type::type parquet_physical_type, Primitiv
DecimalScaleParams& scale_params = convert_params->decimal_scale; \
if (tparquet::Type::FIXED_LEN_BYTE_ARRAY == parquet_physical_type) { \
size_t string_length = convert_params->field_schema->parquet_schema.type_length; \
if (string_length <= 8) { \
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { \
*converter = \
std::make_unique<StringToDecimal<DECIMAL_TYPE, int64_t, \
DecimalScaleParams::SCALE_UP>>(); \
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { \
*converter = \
std::make_unique<StringToDecimal<DECIMAL_TYPE, int64_t, \
DecimalScaleParams::SCALE_DOWN>>(); \
} else { \
*converter = \
std::make_unique<StringToDecimal<DECIMAL_TYPE, int64_t, \
DecimalScaleParams::NO_SCALE>>(); \
} \
} else if (string_length <= 16) { \
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { \
*converter = \
std::make_unique<StringToDecimal<DECIMAL_TYPE, int128_t, \
DecimalScaleParams::SCALE_UP>>(); \
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { \
*converter = \
std::make_unique<StringToDecimal<DECIMAL_TYPE, int128_t, \
DecimalScaleParams::SCALE_DOWN>>(); \
} else { \
*converter = \
std::make_unique<StringToDecimal<DECIMAL_TYPE, int128_t, \
DecimalScaleParams::NO_SCALE>>(); \
} \
if (scale_params.scale_type == DecimalScaleParams::SCALE_UP) { \
*converter = std::make_unique< \
FixedStringToDecimal<DECIMAL_TYPE, DecimalScaleParams::SCALE_UP>>( \
string_length); \
} else if (scale_params.scale_type == DecimalScaleParams::SCALE_DOWN) { \
*converter = std::make_unique< \
FixedStringToDecimal<DECIMAL_TYPE, DecimalScaleParams::SCALE_DOWN>>( \
string_length); \
} else { \
*converter = std::make_unique< \
FixedStringToDecimal<DECIMAL_TYPE, DecimalScaleParams::NO_SCALE>>( \
string_length); \
} \
} else if (tparquet::Type::BYTE_ARRAY == parquet_physical_type) { \
convert_params->init_decimal_converter<PRIMARY_TYPE>(dst_data_type); \
@ -674,7 +748,7 @@ inline Status get_converter(tparquet::Type::type parquet_physical_type, Primitiv
tparquet::to_string(parquet_physical_type),
getTypeName(dst_type));
}
(*converter)->_convert_params = convert_params;
(*converter)->_convert_params = std::move(convert_params);
return Status::OK();
}

View File

@ -25,7 +25,6 @@
#include <algorithm>
#include <utility>
#include "parquet_column_convert.h"
#include "runtime/define_primitive_type.h"
#include "schema_desc.h"
#include "util/runtime_profile.h"
@ -575,13 +574,12 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
} while (false);
if (need_convert) {
std::unique_ptr<ParquetConvert::ColumnConvert> converter;
ParquetConvert::ConvertParams convert_params;
convert_params.init(_field_schema, _ctz);
RETURN_IF_ERROR(ParquetConvert::get_converter(parquet_physical_type, show_type, type,
&converter, &convert_params));
if (_converter == nullptr) {
RETURN_IF_ERROR(ParquetConvert::get_converter(parquet_physical_type, show_type, type,
&_converter, _field_schema, _ctz));
}
auto x = doris_column->assume_mutable();
RETURN_IF_ERROR(converter->convert(src_column, x));
RETURN_IF_ERROR(_converter->convert(src_column, x));
}
return Status::OK();

View File

@ -29,6 +29,7 @@
#include "io/fs/buffered_reader.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "parquet_column_convert.h"
#include "vec/columns/columns_number.h"
#include "vec/data_types/data_type.h"
#include "vec/exec/format/parquet/parquet_common.h"
@ -183,6 +184,7 @@ private:
std::unique_ptr<ColumnChunkReader> _chunk_reader;
std::vector<level_t> _rep_levels;
std::vector<level_t> _def_levels;
std::unique_ptr<ParquetConvert::ColumnConvert> _converter = nullptr;
Status _skip_values(size_t num_values);
Status _read_values(size_t num_values, ColumnPtr& doris_column, DataTypePtr& type,
@ -288,4 +290,4 @@ private:
std::vector<std::unique_ptr<ParquetColumnReader>> _child_readers;
};
}; // namespace doris::vectorized
}; // namespace doris::vectorized

View File

@ -266,10 +266,8 @@ static Status get_column_values(io::FileReaderSPtr file_reader, tparquet::Column
}
if (need_convert) {
std::unique_ptr<ParquetConvert::ColumnConvert> converter;
ParquetConvert::ConvertParams convert_params;
convert_params.init(field_schema, &ctz);
RETURN_IF_ERROR(ParquetConvert::get_converter(parquet_physical_type, show_type, data_type,
&converter, &convert_params));
&converter, field_schema, &ctz));
auto x = doris_column->assume_mutable();
RETURN_IF_ERROR(converter->convert(src_column, x));
}