diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h index b5177fcbec..a98d678fde 100644 --- a/be/src/vec/exec/format/generic_reader.h +++ b/be/src/vec/exec/format/generic_reader.h @@ -18,6 +18,7 @@ #pragma once #include "common/status.h" +#include "runtime/types.h" namespace doris::vectorized { diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp b/be/src/vec/exec/format/parquet/schema_desc.cpp index 7f6f36f023..b0d449f604 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.cpp +++ b/be/src/vec/exec/format/parquet/schema_desc.cpp @@ -17,6 +17,8 @@ #include "schema_desc.h" +#include "common/logging.h" + namespace doris::vectorized { static bool is_group_node(const tparquet::SchemaElement& schema) { @@ -150,6 +152,84 @@ void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physic physical_field->physical_type = physical_schema.type; _physical_fields.push_back(physical_field); physical_field->physical_column_index = _physical_fields.size() - 1; + if (physical_schema.__isset.logicalType) { + physical_field->type = convert_to_doris_type(physical_schema.logicalType); + } else if (physical_schema.__isset.converted_type) { + physical_field->type = convert_to_doris_type(physical_schema.converted_type); + } +} + +TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logicalType) { + TypeDescriptor type; + if (logicalType.__isset.STRING) { + type.type = TYPE_STRING; + } else if (logicalType.__isset.DECIMAL) { + type.type = TYPE_DECIMALV2; + type.precision = 27; + type.scale = 9; + } else if (logicalType.__isset.DATE) { + type.type = TYPE_DATEV2; + } else if (logicalType.__isset.INTEGER) { + if (logicalType.INTEGER.isSigned) { + if (logicalType.INTEGER.bitWidth <= 32) { + type.type = TYPE_INT; + } else { + type.type = TYPE_BIGINT; + } + } else { + if (logicalType.INTEGER.bitWidth <= 16) { + type.type = TYPE_INT; + } else { + type.type = TYPE_BIGINT; + } + } + } else if (logicalType.__isset.TIME) { + type.type = TYPE_TIMEV2; + } else if (logicalType.__isset.TIMESTAMP) { + type.type = TYPE_DATETIMEV2; + } else { + LOG(WARNING) << "Not supported parquet LogicalType"; + } + return type; +} + +TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::ConvertedType::type convertedType) { + TypeDescriptor type; + switch (convertedType) { + case tparquet::ConvertedType::type::UTF8: + type.type = TYPE_STRING; + case tparquet::ConvertedType::type::DECIMAL: + type.type = TYPE_DECIMALV2; + type.precision = 27; + type.scale = 9; + break; + case tparquet::ConvertedType::type::DATE: + type.type = TYPE_DATEV2; + break; + case tparquet::ConvertedType::type::TIME_MILLIS: + case tparquet::ConvertedType::type::TIME_MICROS: + type.type = TYPE_TIMEV2; + break; + case tparquet::ConvertedType::type::TIMESTAMP_MILLIS: + case tparquet::ConvertedType::type::TIMESTAMP_MICROS: + type.type = TYPE_DATETIMEV2; + break; + case tparquet::ConvertedType::type::UINT_8: + case tparquet::ConvertedType::type::UINT_16: + case tparquet::ConvertedType::type::INT_8: + case tparquet::ConvertedType::type::INT_16: + case tparquet::ConvertedType::type::INT_32: + type.type = TYPE_INT; + case tparquet::ConvertedType::type::UINT_32: + case tparquet::ConvertedType::type::UINT_64: + case tparquet::ConvertedType::type::INT_64: + type.type = TYPE_BIGINT; + break; + default: + LOG(WARNING) << "Not supported parquet ConvertedType: " << convertedType; + break; + } + return type; } Status FieldDescriptor::parse_group_field(const std::vector& t_schemas, diff --git a/be/src/vec/exec/format/parquet/schema_desc.h b/be/src/vec/exec/format/parquet/schema_desc.h index bfd7931422..12db2b7011 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.h +++ b/be/src/vec/exec/format/parquet/schema_desc.h @@ -17,6 +17,8 @@ #pragma once +#include + #include #include @@ -76,6 +78,10 @@ private: Status parse_node_field(const std::vector& t_schemas, size_t curr_pos, FieldSchema* node_field); + TypeDescriptor convert_to_doris_type(tparquet::LogicalType logicalType); + + TypeDescriptor convert_to_doris_type(tparquet::ConvertedType::type convertedType); + public: FieldDescriptor() = default; ~FieldDescriptor() = default; diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 5a0181dd21..e8c59de8d5 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -82,36 +82,11 @@ Status ParquetReader::_init_read_columns() { std::unordered_map ParquetReader::get_name_to_type() { std::unordered_map map; auto schema_desc = _file_metadata->schema(); - for (auto& it : _map_column) { - TypeDescriptor type; - if (it.first == "p_partkey") { - type.type = TYPE_INT; - } else if (it.first == "p_name") { - type.type = TYPE_VARCHAR; - type.len = 55; - } else if (it.first == "p_mfgr") { - type.type = TYPE_VARCHAR; - type.len = 25; - } else if (it.first == "p_brand") { - type.type = TYPE_VARCHAR; - type.len = 10; - } else if (it.first == "p_type") { - type.type = TYPE_VARCHAR; - type.len = 25; - } else if (it.first == "p_size") { - type.type = TYPE_INT; - } else if (it.first == "p_container") { - type.type = TYPE_VARCHAR; - type.len = 10; - } else if (it.first == "p_retailprice") { - type.type = TYPE_DECIMALV2; - type.precision = 27; - type.scale = 9; - } else if (it.first == "p_comment") { - type.type = TYPE_VARCHAR; - type.len = 23; - } - map.emplace(it.first, type); + std::unordered_set column_names; + schema_desc.get_column_names(&column_names); + for (auto name : column_names) { + auto field = schema_desc.get_column(name); + map.emplace(name, field->type); } return map; }