[feature-wip](parquet-reader) parquet physical type to doris logical type (#11769)

Two improvements have been added:
1. Translate parquet physical type into doris logical type.
2. Decode parquet column chunk into doris ColumnPtr, and add unit tests to show how to use related API.
This commit is contained in:
Ashin Gau
2022-08-15 16:08:11 +08:00
committed by GitHub
parent 600254855c
commit 0b9bfd15b7
5 changed files with 258 additions and 153 deletions

View File

@ -18,34 +18,36 @@
#include "parquet_common.h"
#include "util/coding.h"
#include "vec/data_types/data_type_nullable.h"
namespace doris::vectorized {
Status Decoder::getDecoder(tparquet::Type::type type, tparquet::Encoding::type encoding,
std::unique_ptr<Decoder>& decoder) {
#define FOR_LOGICAL_NUMERIC_TYPES(M) \
M(TypeIndex::Int32, Int32) \
M(TypeIndex::UInt32, UInt32) \
M(TypeIndex::Int64, Int64) \
M(TypeIndex::UInt64, UInt64) \
M(TypeIndex::Float32, Float32) \
M(TypeIndex::Float64, Float64)
Status Decoder::get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding,
std::unique_ptr<Decoder>& decoder) {
switch (encoding) {
case tparquet::Encoding::PLAIN:
switch (type) {
case tparquet::Type::BOOLEAN:
decoder.reset(new BoolPlainDecoder());
break;
case tparquet::Type::INT32:
decoder.reset(new PlainDecoder<Int32>());
break;
case tparquet::Type::INT64:
decoder.reset(new PlainDecoder<Int64>());
break;
case tparquet::Type::FLOAT:
decoder.reset(new PlainDecoder<Float32>());
break;
case tparquet::Type::DOUBLE:
decoder.reset(new PlainDecoder<Float64>());
break;
case tparquet::Type::BYTE_ARRAY:
decoder.reset(new BAPlainDecoder());
decoder.reset(new ByteArrayPlainDecoder());
break;
case tparquet::Type::INT32:
case tparquet::Type::INT64:
case tparquet::Type::INT96:
case tparquet::Type::FLOAT:
case tparquet::Type::DOUBLE:
case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
decoder.reset(new FixedLengthBAPlainDecoder());
decoder.reset(new PlainDecoder(type));
break;
default:
return Status::InternalError("Unsupported plain type {} in parquet decoder",
@ -60,34 +62,28 @@ Status Decoder::getDecoder(tparquet::Type::type type, tparquet::Encoding::type e
return Status::OK();
}
Status Decoder::decode_values(ColumnPtr& doris_column, size_t num_values) {
Status Decoder::decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values) {
CHECK(doris_column->is_nullable());
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(doris_column)).mutate().get());
MutableColumnPtr data_column = nullable_column->get_nested_column_ptr();
return _decode_values(data_column, num_values);
return decode_values(data_column, data_type, num_values);
}
Status FixedLengthBAPlainDecoder::decode_values(Slice& slice, size_t num_values) {
Status PlainDecoder::decode_values(Slice& slice, size_t num_values) {
size_t to_read_bytes = _type_length * num_values;
if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
}
// insert '\0' into the end of each binary
if (UNLIKELY(to_read_bytes + num_values > slice.size)) {
if (UNLIKELY(to_read_bytes > slice.size)) {
return Status::IOError("Slice does not have enough space to write out the decoding data");
}
uint32_t slice_offset = 0;
for (int i = 0; i < num_values; ++i) {
memcpy(slice.data + slice_offset, _data->data + _offset, _type_length);
slice_offset += _type_length + 1;
slice.data[slice_offset - 1] = '\0';
_offset += _type_length;
}
memcpy(slice.data, _data->data + _offset, to_read_bytes);
_offset += to_read_bytes;
return Status::OK();
}
Status FixedLengthBAPlainDecoder::skip_values(size_t num_values) {
Status PlainDecoder::skip_values(size_t num_values) {
_offset += _type_length * num_values;
if (UNLIKELY(_offset > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
@ -95,23 +91,43 @@ Status FixedLengthBAPlainDecoder::skip_values(size_t num_values) {
return Status::OK();
}
Status FixedLengthBAPlainDecoder::_decode_values(MutableColumnPtr& doris_column,
size_t num_values) {
Status PlainDecoder::_decode_short_int(MutableColumnPtr& doris_column, size_t num_values,
size_t real_length) {
if (UNLIKELY(_offset + _type_length * num_values > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
}
auto& column_chars_t = assert_cast<ColumnString&>(*doris_column).get_chars();
auto& column_offsets = assert_cast<ColumnString&>(*doris_column).get_offsets();
for (int i = 0; i < num_values; ++i) {
column_chars_t.insert(_data->data + _offset, _data->data + _offset + _type_length);
column_chars_t.emplace_back('\0');
column_offsets.emplace_back(column_chars_t.size());
doris_column->insert_data(_data->data + _offset, real_length);
_offset += _type_length;
}
return Status::OK();
}
Status BAPlainDecoder::decode_values(Slice& slice, size_t num_values) {
Status PlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) {
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
case TypeIndex::Int8:
case TypeIndex::UInt8:
return _decode_short_int(doris_column, num_values, 1);
case TypeIndex::Int16:
case TypeIndex::UInt16:
return _decode_short_int(doris_column, num_values, 2);
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
case NUMERIC_TYPE: \
return _decode_numeric<CPP_NUMERIC_TYPE>(doris_column, num_values);
FOR_LOGICAL_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
default:
break;
}
return Status::InvalidArgument("Can't decode parquet physical type {} to doris logical type {}",
tparquet::to_string(_physical_type),
getTypeName(data_type->get_type_id()));
}
Status ByteArrayPlainDecoder::decode_values(Slice& slice, size_t num_values) {
uint32_t slice_offset = 0;
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
@ -131,7 +147,7 @@ Status BAPlainDecoder::decode_values(Slice& slice, size_t num_values) {
return Status::OK();
}
Status BAPlainDecoder::skip_values(size_t num_values) {
Status ByteArrayPlainDecoder::skip_values(size_t num_values) {
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
@ -147,9 +163,8 @@ Status BAPlainDecoder::skip_values(size_t num_values) {
return Status::OK();
}
Status BAPlainDecoder::_decode_values(MutableColumnPtr& doris_column, size_t num_values) {
auto& column_chars_t = assert_cast<ColumnString&>(*doris_column).get_chars();
auto& column_offsets = assert_cast<ColumnString&>(*doris_column).get_offsets();
Status ByteArrayPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) {
for (int i = 0; i < num_values; ++i) {
if (UNLIKELY(_offset + 4 > _data->size)) {
return Status::IOError("Can't read byte array length from plain decoder");
@ -160,9 +175,7 @@ Status BAPlainDecoder::_decode_values(MutableColumnPtr& doris_column, size_t num
if (UNLIKELY(_offset + length) > _data->size) {
return Status::IOError("Can't read enough bytes in plain decoder");
}
column_chars_t.insert(_data->data + _offset, _data->data + _offset + length);
column_chars_t.emplace_back('\0');
column_offsets.emplace_back(column_chars_t.size());
doris_column->insert_data(_data->data + _offset, length);
_offset += length;
}
return Status::OK();
@ -203,7 +216,8 @@ Status BoolPlainDecoder::skip_values(size_t num_values) {
return Status::OK();
}
Status BoolPlainDecoder::_decode_values(MutableColumnPtr& doris_column, size_t num_values) {
Status BoolPlainDecoder::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) {
auto& column_data = static_cast<ColumnVector<UInt8>&>(*doris_column).get_data();
bool value;
for (int i = 0; i < num_values; ++i) {

View File

@ -25,6 +25,7 @@
#include "vec/columns/column_array.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/data_types/data_type.h"
namespace doris::vectorized {
@ -35,8 +36,8 @@ public:
Decoder() = default;
virtual ~Decoder() = default;
static Status getDecoder(tparquet::Type::type type, tparquet::Encoding::type encoding,
std::unique_ptr<Decoder>& decoder);
static Status get_decoder(tparquet::Type::type type, tparquet::Encoding::type encoding,
std::unique_ptr<Decoder>& decoder);
// The type with fix length
void set_type_length(int32_t type_length) { _type_length = type_length; }
@ -48,88 +49,63 @@ public:
}
// Write the decoded values batch to doris's column
Status decode_values(ColumnPtr& doris_column, size_t num_values);
Status decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values);
virtual Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) = 0;
virtual Status decode_values(Slice& slice, size_t num_values) = 0;
virtual Status skip_values(size_t num_values) = 0;
protected:
virtual Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) = 0;
int32_t _type_length;
Slice* _data = nullptr;
uint32_t _offset = 0;
};
template <typename T>
class PlainDecoder final : public Decoder {
public:
PlainDecoder() = default;
PlainDecoder(tparquet::Type::type physical_type) : _physical_type(physical_type) {};
~PlainDecoder() override = default;
Status decode_values(Slice& slice, size_t num_values) override {
size_t to_read_bytes = TYPE_LENGTH * num_values;
if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
}
if (UNLIKELY(to_read_bytes > slice.size)) {
return Status::IOError(
"Slice does not have enough space to write out the decoding data");
}
memcpy(slice.data, _data->data + _offset, to_read_bytes);
_offset += to_read_bytes;
return Status::OK();
}
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) override;
Status skip_values(size_t num_values) override {
_offset += TYPE_LENGTH * num_values;
if (UNLIKELY(_offset > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
}
return Status::OK();
}
Status decode_values(Slice& slice, size_t num_values) override;
Status skip_values(size_t num_values) override;
protected:
enum { TYPE_LENGTH = sizeof(T) };
Status _decode_short_int(MutableColumnPtr& doris_column, size_t num_values, size_t real_length);
Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) override {
size_t to_read_bytes = TYPE_LENGTH * num_values;
template <typename Numeric>
Status _decode_numeric(MutableColumnPtr& doris_column, size_t num_values) {
size_t to_read_bytes = _type_length * num_values;
if (UNLIKELY(_offset + to_read_bytes > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
}
auto& column_data = static_cast<ColumnVector<T>&>(*doris_column).get_data();
const auto* raw_data = reinterpret_cast<const T*>(_data->data + _offset);
auto& column_data = static_cast<ColumnVector<Numeric>&>(*doris_column).get_data();
const auto* raw_data = reinterpret_cast<const Numeric*>(_data->data + _offset);
column_data.insert(raw_data, raw_data + num_values);
_offset += to_read_bytes;
return Status::OK();
}
tparquet::Type::type _physical_type;
};
class FixedLengthBAPlainDecoder final : public Decoder {
class ByteArrayPlainDecoder final : public Decoder {
public:
FixedLengthBAPlainDecoder() = default;
~FixedLengthBAPlainDecoder() override = default;
ByteArrayPlainDecoder() = default;
~ByteArrayPlainDecoder() override = default;
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) override;
Status decode_values(Slice& slice, size_t num_values) override;
Status skip_values(size_t num_values) override;
protected:
Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) override;
};
class BAPlainDecoder final : public Decoder {
public:
BAPlainDecoder() = default;
~BAPlainDecoder() override = default;
Status decode_values(Slice& slice, size_t num_values) override;
Status skip_values(size_t num_values) override;
protected:
Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) override;
};
/// Decoder bit-packed boolean-encoded values.
@ -147,6 +123,9 @@ public:
_offset = 0;
}
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) override;
Status decode_values(Slice& slice, size_t num_values) override;
Status skip_values(size_t num_values) override;
@ -167,8 +146,6 @@ protected:
return true;
}
Status _decode_values(MutableColumnPtr& doris_column, size_t num_values) override;
/// A buffer to store unpacked values. Must be a multiple of 32 size to use the
/// batch-oriented interface of BatchedBitReader. We use uint8_t instead of bool because
/// bit unpacking is only supported for unsigned integers. The values are converted to

View File

@ -20,13 +20,14 @@
namespace doris::vectorized {
ColumnChunkReader::ColumnChunkReader(BufferedStreamReader* reader,
tparquet::ColumnChunk* column_chunk, FieldSchema* fieldSchema)
: _max_rep_level(fieldSchema->repetition_level),
_max_def_level(fieldSchema->definition_level),
tparquet::ColumnChunk* column_chunk, FieldSchema* field_schema)
: _field_schema(field_schema),
_max_rep_level(field_schema->repetition_level),
_max_def_level(field_schema->definition_level),
_stream_reader(reader),
_metadata(column_chunk->meta_data) {}
Status ColumnChunkReader::init(size_t type_length) {
Status ColumnChunkReader::init() {
size_t start_offset = _metadata.__isset.dictionary_page_offset
? _metadata.dictionary_page_offset
: _metadata.data_page_offset;
@ -41,8 +42,6 @@ Status ColumnChunkReader::init(size_t type_length) {
// get the block compression codec
RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, _block_compress_codec));
// -1 means unfixed length type
_type_length = type_length;
return Status::OK();
}
@ -91,14 +90,13 @@ Status ColumnChunkReader::load_page_data() {
_page_decoder = _decoders[static_cast<int>(encoding)].get();
} else {
std::unique_ptr<Decoder> page_decoder;
Decoder::getDecoder(_metadata.type, encoding, page_decoder);
Decoder::get_decoder(_metadata.type, encoding, page_decoder);
_decoders[static_cast<int>(encoding)] = std::move(page_decoder);
_page_decoder = _decoders[static_cast<int>(encoding)].get();
}
_page_decoder->set_data(&_page_data);
if (_type_length > 0) {
_page_decoder->set_type_length(_type_length);
}
// Set type length
_page_decoder->set_type_length(_get_type_length());
return Status::OK();
}
@ -138,12 +136,22 @@ size_t ColumnChunkReader::get_def_levels(level_t* levels, size_t n) {
return _def_level_decoder.get_levels(levels, n);
}
Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, size_t num_values) {
Status ColumnChunkReader::decode_values(ColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) {
if (UNLIKELY(_num_values < num_values)) {
return Status::IOError("Decode too many values in current page");
}
_num_values -= num_values;
return _page_decoder->decode_values(doris_column, num_values);
return _page_decoder->decode_values(doris_column, data_type, num_values);
}
Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
size_t num_values) {
if (UNLIKELY(_num_values < num_values)) {
return Status::IOError("Decode too many values in current page");
}
_num_values -= num_values;
return _page_decoder->decode_values(doris_column, data_type, num_values);
}
Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) {
@ -153,4 +161,21 @@ Status ColumnChunkReader::decode_values(Slice& slice, size_t num_values) {
_num_values -= num_values;
return _page_decoder->decode_values(slice, num_values);
}
int32_t ColumnChunkReader::_get_type_length() {
switch (_field_schema->physical_type) {
case tparquet::Type::INT32:
case tparquet::Type::FLOAT:
return 4;
case tparquet::Type::INT64:
case tparquet::Type::DOUBLE:
return 8;
case tparquet::Type::INT96:
return 12;
case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
return _field_schema->parquet_schema.type_length;
default:
return -1;
}
}
} // namespace doris::vectorized

View File

@ -59,13 +59,11 @@ namespace doris::vectorized {
class ColumnChunkReader {
public:
ColumnChunkReader(BufferedStreamReader* reader, tparquet::ColumnChunk* column_chunk,
FieldSchema* fieldSchema);
FieldSchema* field_schema);
~ColumnChunkReader() = default;
// Initialize chunk reader, will generate the decoder and codec.
// We can set the type_length if the length of colum type if fixed,
// or not set, the decoder will try to infer the type_length.
Status init(size_t type_length = -1);
Status init();
// Whether the chunk reader has a more page to read.
bool has_next_page() { return _page_reader->has_next_page(); }
@ -86,8 +84,11 @@ public:
// Load page data into the underlying container,
// and initialize the repetition and definition level decoder for current page data.
Status load_page_data();
// The remaining number of values in current page. Decreased when reading or skipping.
// The remaining number of values in current page(including null values). Decreased when reading or skipping.
uint32_t num_values() const { return _num_values; };
// null values are not analyzing from definition levels
// the caller should maintain the consistency after analyzing null values from definition levels.
void dec_num_values(uint32_t dec_num) { _num_values -= dec_num; };
// Get the raw data of current page.
Slice& get_page_data() { return _page_data; }
@ -97,7 +98,8 @@ public:
size_t get_def_levels(level_t* levels, size_t n);
// Decode values in current page into doris column.
Status decode_values(ColumnPtr& doris_column, size_t num_values);
Status decode_values(ColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values);
Status decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, size_t num_values);
// For test, Decode values in current page into slice.
Status decode_values(Slice& slice, size_t num_values);
@ -109,7 +111,9 @@ public:
private:
Status _decode_dict_page();
void _reserve_decompress_buf(size_t size);
int32_t _get_type_length();
FieldSchema* _field_schema;
level_t _max_rep_level;
level_t _max_def_level;
@ -131,7 +135,6 @@ private:
// Map: encoding -> Decoder
// Plain or Dictionary encoding. If the dictionary grows too big, the encoding will fall back to the plain encoding
std::unordered_map<int, std::unique_ptr<Decoder>> _decoders;
size_t _type_length = -1;
};
} // namespace doris::vectorized

View File

@ -22,10 +22,15 @@
#include <string>
#include "exec/schema_scanner.h"
#include "io/buffered_reader.h"
#include "io/file_reader.h"
#include "io/local_file_reader.h"
#include "runtime/string_value.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/exec/format/parquet/parquet_thrift_util.h"
#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h"
#include "vec/exec/format/parquet/vparquet_file_metadata.h"
@ -125,7 +130,8 @@ TEST_F(ParquetThriftReaderTest, complex_nested_file) {
}
static Status get_column_values(FileReader* file_reader, tparquet::ColumnChunk* column_chunk,
FieldSchema* field_schema, Slice& slice) {
FieldSchema* field_schema, ColumnPtr& doris_column,
DataTypePtr& data_type) {
tparquet::ColumnMetaData chunk_meta = column_chunk->meta_data;
size_t start_offset = chunk_meta.__isset.dictionary_page_offset
? chunk_meta.dictionary_page_offset
@ -141,7 +147,35 @@ static Status get_column_values(FileReader* file_reader, tparquet::ColumnChunk*
// load page data into underlying container
chunk_reader.load_page_data();
// decode page data
return chunk_reader.decode_values(slice, chunk_reader.num_values());
return chunk_reader.decode_values(doris_column, data_type, chunk_reader.num_values());
}
static void create_block(std::unique_ptr<vectorized::Block>& block) {
// Current supported column type:
SchemaScanner::ColumnDesc column_descs[] = {
{"tinyint_col", TYPE_TINYINT, sizeof(int8_t), true},
{"smallint_col", TYPE_SMALLINT, sizeof(int16_t), true},
{"int_col", TYPE_INT, sizeof(int32_t), true},
{"bigint_col", TYPE_BIGINT, sizeof(int64_t), true},
{"boolean_col", TYPE_BOOLEAN, sizeof(bool), true},
{"float_col", TYPE_FLOAT, sizeof(float_t), true},
{"double_col", TYPE_DOUBLE, sizeof(double_t), true},
{"string_col", TYPE_STRING, sizeof(StringValue), true}};
SchemaScanner schema_scanner(column_descs,
sizeof(column_descs) / sizeof(SchemaScanner::ColumnDesc));
ObjectPool object_pool;
SchemaScannerParam param;
schema_scanner.init(&param, &object_pool);
auto tuple_slots = const_cast<TupleDescriptor*>(schema_scanner.tuple_desc())->slots();
block.reset(new vectorized::Block());
for (const auto& slot_desc : tuple_slots) {
auto is_nullable = slot_desc->is_nullable();
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(slot_desc->type(),
is_nullable);
MutableColumnPtr data_column = data_type->create_column();
block->insert(
ColumnWithTypeAndName(std::move(data_column), data_type, slot_desc->col_name()));
}
}
TEST_F(ParquetThriftReaderTest, type_decoder) {
@ -164,6 +198,7 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
* `date_col` date, // 13
* `list_string` array<string>) // 14
*/
LocalFileReader reader("./be/test/exec/test_data/parquet_scanner/type-decoder.parquet", 0);
/*
* Data in type-decoder.parquet:
@ -181,6 +216,8 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
auto st = reader.open();
EXPECT_TRUE(st.ok());
std::unique_ptr<vectorized::Block> block;
create_block(block);
std::shared_ptr<FileMetaData> metaData;
parse_thrift_footer(&reader, metaData);
tparquet::FileMetaData t_metadata = metaData->to_thrift_metadata();
@ -190,51 +227,98 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
// the physical_type of tinyint_col, smallint_col and int_col are all INT32
// they are distinguished by converted_type(in FieldSchema.parquet_schema.converted_type)
for (int col_idx = 0; col_idx < 3; ++col_idx) {
char data[4 * rows];
Slice slice(data, 4 * rows);
get_column_values(&reader, &t_metadata.row_groups[0].columns[col_idx],
const_cast<FieldSchema*>(schema_descriptor.get_column(col_idx)), slice);
auto out_data = reinterpret_cast<int32_t*>(data);
{
auto& column_name_with_type = block->get_by_position(0);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[0],
const_cast<FieldSchema*>(schema_descriptor.get_column(0)), data_column,
data_type);
int int_sum = 0;
for (int i = 0; i < rows; ++i) {
int_sum += out_data[i];
int_sum += (int8_t)data_column->get64(i);
}
ASSERT_EQ(int_sum, 5);
}
// `bigint_col` bigint, // 3
{
char data[8 * rows];
Slice slice(data, 8 * rows);
get_column_values(&reader, &t_metadata.row_groups[0].columns[3],
const_cast<FieldSchema*>(schema_descriptor.get_column(3)), slice);
auto out_data = reinterpret_cast<int64_t*>(data);
auto& column_name_with_type = block->get_by_position(1);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[1],
const_cast<FieldSchema*>(schema_descriptor.get_column(1)), data_column,
data_type);
int int_sum = 0;
for (int i = 0; i < rows; ++i) {
int_sum += out_data[i];
int_sum += (int16_t)data_column->get64(i);
}
ASSERT_EQ(int_sum, 5);
}
{
auto& column_name_with_type = block->get_by_position(2);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[2],
const_cast<FieldSchema*>(schema_descriptor.get_column(2)), data_column,
data_type);
int int_sum = 0;
for (int i = 0; i < rows; ++i) {
int_sum += (int32_t)data_column->get64(i);
}
ASSERT_EQ(int_sum, 5);
}
{
auto& column_name_with_type = block->get_by_position(3);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[3],
const_cast<FieldSchema*>(schema_descriptor.get_column(3)), data_column,
data_type);
int64_t int_sum = 0;
for (int i = 0; i < rows; ++i) {
int_sum += (int64_t)data_column->get64(i);
}
ASSERT_EQ(int_sum, 5);
}
// `boolean_col` boolean, // 4
{
char data[1 * rows];
Slice slice(data, 1 * rows);
auto& column_name_with_type = block->get_by_position(4);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[4],
const_cast<FieldSchema*>(schema_descriptor.get_column(4)), slice);
auto out_data = reinterpret_cast<bool*>(data);
ASSERT_FALSE(out_data[0]);
ASSERT_TRUE(out_data[1]);
ASSERT_FALSE(out_data[2]);
ASSERT_TRUE(out_data[3]);
ASSERT_FALSE(out_data[4]);
ASSERT_FALSE(out_data[5]);
ASSERT_TRUE(out_data[6]);
ASSERT_FALSE(out_data[7]);
ASSERT_FALSE(out_data[8]);
ASSERT_FALSE(out_data[9]);
const_cast<FieldSchema*>(schema_descriptor.get_column(4)), data_column,
data_type);
ASSERT_FALSE(static_cast<bool>(data_column->get64(0)));
ASSERT_TRUE(static_cast<bool>(data_column->get64(1)));
ASSERT_FALSE(static_cast<bool>(data_column->get64(2)));
ASSERT_TRUE(static_cast<bool>(data_column->get64(3)));
ASSERT_FALSE(static_cast<bool>(data_column->get64(4)));
ASSERT_FALSE(static_cast<bool>(data_column->get64(5)));
ASSERT_TRUE(static_cast<bool>(data_column->get64(6)));
ASSERT_FALSE(static_cast<bool>(data_column->get64(7)));
ASSERT_FALSE(static_cast<bool>(data_column->get64(8)));
ASSERT_FALSE(static_cast<bool>(data_column->get64(9)));
}
// `double_col` double, // 6
{
auto& column_name_with_type = block->get_by_position(6);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
get_column_values(&reader, &t_metadata.row_groups[0].columns[6],
const_cast<FieldSchema*>(schema_descriptor.get_column(6)), data_column,
data_type);
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(data_column)).mutate().get());
MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr();
ASSERT_EQ(nested_column->get_float64(0), -1.14);
ASSERT_EQ(nested_column->get_float64(1), 2.14);
ASSERT_EQ(nested_column->get_float64(2), -3.14);
ASSERT_EQ(nested_column->get_float64(3), 4.14);
}
// `string_col` string, // 7
{
auto& column_name_with_type = block->get_by_position(7);
auto& data_column = column_name_with_type.column;
auto& data_type = column_name_with_type.type;
tparquet::ColumnChunk column_chunk = t_metadata.row_groups[0].columns[7];
tparquet::ColumnMetaData chunk_meta = column_chunk.meta_data;
size_t start_offset = chunk_meta.__isset.dictionary_page_offset
@ -242,7 +326,6 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
: chunk_meta.data_page_offset;
size_t chunk_size = chunk_meta.total_compressed_size;
BufferedFileStreamReader stream_reader(&reader, start_offset, chunk_size);
ColumnChunkReader chunk_reader(&stream_reader, &column_chunk,
const_cast<FieldSchema*>(schema_descriptor.get_column(7)));
// initialize chunk reader
@ -252,8 +335,6 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
// load page data into underlying container
chunk_reader.load_page_data();
char data[50 * rows];
Slice slice(data, 50 * rows);
level_t defs[rows];
// Analyze null string
chunk_reader.get_def_levels(defs, rows);
@ -261,9 +342,14 @@ TEST_F(ParquetThriftReaderTest, type_decoder) {
ASSERT_EQ(defs[3], 0);
ASSERT_EQ(defs[7], 0);
chunk_reader.decode_values(slice, 7);
ASSERT_STREQ("s-row0", slice.data);
ASSERT_STREQ("s-row2", slice.data + 7);
chunk_reader.decode_values(data_column, data_type, 7);
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(data_column)).mutate().get());
MutableColumnPtr nested_column = nullable_column->get_nested_column_ptr();
auto row0 = nested_column->get_data_at(0).data;
auto row2 = nested_column->get_data_at(1).data;
ASSERT_STREQ("s-row0", row0);
ASSERT_STREQ("s-row2", row2);
}
}