From 4b95b4e41d4de3d5128d949e5113af37a3d64f0e Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Thu, 22 Sep 2022 09:35:37 +0800 Subject: [PATCH] [feature-wip](file-scanner)Get column type from parquet schema (#12833) Get schema from parquet reader. The new VFileScanner need to get file schema (column name to type map) from parquet file while processing load job, this pr is to set the type information for parquet columns. --- be/src/vec/exec/format/generic_reader.h | 1 + .../vec/exec/format/parquet/schema_desc.cpp | 80 +++++++++++++++++++ be/src/vec/exec/format/parquet/schema_desc.h | 6 ++ .../exec/format/parquet/vparquet_reader.cpp | 35 ++------ 4 files changed, 92 insertions(+), 30 deletions(-) diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index b5177fcbec..a98d678fde 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -18,6 +18,7 @@ #pragma once #include "common/status.h" +#include "runtime/types.h" namespace doris::vectorized { diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp b/be/src/vec/exec/format/parquet/schema_desc.cpp index 7f6f36f023..b0d449f604 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.cpp +++ b/be/src/vec/exec/format/parquet/schema_desc.cpp @@ -17,6 +17,8 @@ #include "schema_desc.h" +#include "common/logging.h" + namespace doris::vectorized { static bool is_group_node(const tparquet::SchemaElement& schema) { @@ -150,6 +152,84 @@ void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physic physical_field->physical_type = physical_schema.type; _physical_fields.push_back(physical_field); physical_field->physical_column_index = _physical_fields.size() - 1; + if (physical_schema.__isset.logicalType) { + physical_field->type = convert_to_doris_type(physical_schema.logicalType); + } else if (physical_schema.__isset.converted_type) { + physical_field->type = convert_to_doris_type(physical_schema.converted_type); + } +} + +TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logicalType) { + TypeDescriptor type; + if (logicalType.__isset.STRING) { + type.type = TYPE_STRING; + } else if (logicalType.__isset.DECIMAL) { + type.type = TYPE_DECIMALV2; + type.precision = 27; + type.scale = 9; + } else if (logicalType.__isset.DATE) { + type.type = TYPE_DATEV2; + } else if (logicalType.__isset.INTEGER) { + if (logicalType.INTEGER.isSigned) { + if (logicalType.INTEGER.bitWidth <= 32) { + type.type = TYPE_INT; + } else { + type.type = TYPE_BIGINT; + } + } else { + if (logicalType.INTEGER.bitWidth <= 16) { + type.type = TYPE_INT; + } else { + type.type = TYPE_BIGINT; + } + } + } else if (logicalType.__isset.TIME) { + type.type = TYPE_TIMEV2; + } else if (logicalType.__isset.TIMESTAMP) { + type.type = TYPE_DATETIMEV2; + } else { + LOG(WARNING) << "Not supported parquet LogicalType"; + } + return type; +} + +TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::ConvertedType::type convertedType) { + TypeDescriptor type; + switch (convertedType) { + case tparquet::ConvertedType::type::UTF8: + type.type = TYPE_STRING; + case tparquet::ConvertedType::type::DECIMAL: + type.type = TYPE_DECIMALV2; + type.precision = 27; + type.scale = 9; + break; + case tparquet::ConvertedType::type::DATE: + type.type = TYPE_DATEV2; + break; + case tparquet::ConvertedType::type::TIME_MILLIS: + case tparquet::ConvertedType::type::TIME_MICROS: + type.type = TYPE_TIMEV2; + break; + case tparquet::ConvertedType::type::TIMESTAMP_MILLIS: + case tparquet::ConvertedType::type::TIMESTAMP_MICROS: + type.type = TYPE_DATETIMEV2; + break; + case tparquet::ConvertedType::type::UINT_8: + case tparquet::ConvertedType::type::UINT_16: + case tparquet::ConvertedType::type::INT_8: + case tparquet::ConvertedType::type::INT_16: + case tparquet::ConvertedType::type::INT_32: + type.type = TYPE_INT; + case tparquet::ConvertedType::type::UINT_32: + case tparquet::ConvertedType::type::UINT_64: + case tparquet::ConvertedType::type::INT_64: + type.type = TYPE_BIGINT; + break; + default: + LOG(WARNING) << "Not supported parquet ConvertedType: " << convertedType; + break; + } + return type; } Status FieldDescriptor::parse_group_field(const std::vector& t_schemas, diff --git a/be/src/vec/exec/format/parquet/schema_desc.h b/be/src/vec/exec/format/parquet/schema_desc.h index bfd7931422..12db2b7011 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.h +++ b/be/src/vec/exec/format/parquet/schema_desc.h @@ -17,6 +17,8 @@ #pragma once +#include + #include #include @@ -76,6 +78,10 @@ private: Status parse_node_field(const std::vector& t_schemas, size_t curr_pos, FieldSchema* node_field); + TypeDescriptor convert_to_doris_type(tparquet::LogicalType logicalType); + + TypeDescriptor convert_to_doris_type(tparquet::ConvertedType::type convertedType); + public: FieldDescriptor() = default; ~FieldDescriptor() = default; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 5a0181dd21..e8c59de8d5 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -82,36 +82,11 @@ Status ParquetReader::_init_read_columns() { std::unordered_map ParquetReader::get_name_to_type() { std::unordered_map map; auto schema_desc = _file_metadata->schema(); - for (auto& it : _map_column) { - TypeDescriptor type; - if (it.first == "p_partkey") { - type.type = TYPE_INT; - } else if (it.first == "p_name") { - type.type = TYPE_VARCHAR; - type.len = 55; - } else if (it.first == "p_mfgr") { - type.type = TYPE_VARCHAR; - type.len = 25; - } else if (it.first == "p_brand") { - type.type = TYPE_VARCHAR; - type.len = 10; - } else if (it.first == "p_type") { - type.type = TYPE_VARCHAR; - type.len = 25; - } else if (it.first == "p_size") { - type.type = TYPE_INT; - } else if (it.first == "p_container") { - type.type = TYPE_VARCHAR; - type.len = 10; - } else if (it.first == "p_retailprice") { - type.type = TYPE_DECIMALV2; - type.precision = 27; - type.scale = 9; - } else if (it.first == "p_comment") { - type.type = TYPE_VARCHAR; - type.len = 23; - } - map.emplace(it.first, type); + std::unordered_set column_names; + schema_desc.get_column_names(&column_names); + for (auto name : column_names) { + auto field = schema_desc.get_column(name); + map.emplace(name, field->type); } return map; }