[feature-wip](file-scanner)Get column type from parquet schema (#12833)
Get schema from parquet reader. The new VFileScanner need to get file schema (column name to type map) from parquet file while processing load job, this pr is to set the type information for parquet columns.
This commit is contained in:
@ -18,6 +18,7 @@
|
||||
#pragma once
|
||||
|
||||
#include "common/status.h"
|
||||
#include "runtime/types.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
|
||||
@ -17,6 +17,8 @@
|
||||
|
||||
#include "schema_desc.h"
|
||||
|
||||
#include "common/logging.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
static bool is_group_node(const tparquet::SchemaElement& schema) {
|
||||
@ -150,6 +152,84 @@ void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physic
|
||||
physical_field->physical_type = physical_schema.type;
|
||||
_physical_fields.push_back(physical_field);
|
||||
physical_field->physical_column_index = _physical_fields.size() - 1;
|
||||
if (physical_schema.__isset.logicalType) {
|
||||
physical_field->type = convert_to_doris_type(physical_schema.logicalType);
|
||||
} else if (physical_schema.__isset.converted_type) {
|
||||
physical_field->type = convert_to_doris_type(physical_schema.converted_type);
|
||||
}
|
||||
}
|
||||
|
||||
TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logicalType) {
|
||||
TypeDescriptor type;
|
||||
if (logicalType.__isset.STRING) {
|
||||
type.type = TYPE_STRING;
|
||||
} else if (logicalType.__isset.DECIMAL) {
|
||||
type.type = TYPE_DECIMALV2;
|
||||
type.precision = 27;
|
||||
type.scale = 9;
|
||||
} else if (logicalType.__isset.DATE) {
|
||||
type.type = TYPE_DATEV2;
|
||||
} else if (logicalType.__isset.INTEGER) {
|
||||
if (logicalType.INTEGER.isSigned) {
|
||||
if (logicalType.INTEGER.bitWidth <= 32) {
|
||||
type.type = TYPE_INT;
|
||||
} else {
|
||||
type.type = TYPE_BIGINT;
|
||||
}
|
||||
} else {
|
||||
if (logicalType.INTEGER.bitWidth <= 16) {
|
||||
type.type = TYPE_INT;
|
||||
} else {
|
||||
type.type = TYPE_BIGINT;
|
||||
}
|
||||
}
|
||||
} else if (logicalType.__isset.TIME) {
|
||||
type.type = TYPE_TIMEV2;
|
||||
} else if (logicalType.__isset.TIMESTAMP) {
|
||||
type.type = TYPE_DATETIMEV2;
|
||||
} else {
|
||||
LOG(WARNING) << "Not supported parquet LogicalType";
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::ConvertedType::type convertedType) {
|
||||
TypeDescriptor type;
|
||||
switch (convertedType) {
|
||||
case tparquet::ConvertedType::type::UTF8:
|
||||
type.type = TYPE_STRING;
|
||||
case tparquet::ConvertedType::type::DECIMAL:
|
||||
type.type = TYPE_DECIMALV2;
|
||||
type.precision = 27;
|
||||
type.scale = 9;
|
||||
break;
|
||||
case tparquet::ConvertedType::type::DATE:
|
||||
type.type = TYPE_DATEV2;
|
||||
break;
|
||||
case tparquet::ConvertedType::type::TIME_MILLIS:
|
||||
case tparquet::ConvertedType::type::TIME_MICROS:
|
||||
type.type = TYPE_TIMEV2;
|
||||
break;
|
||||
case tparquet::ConvertedType::type::TIMESTAMP_MILLIS:
|
||||
case tparquet::ConvertedType::type::TIMESTAMP_MICROS:
|
||||
type.type = TYPE_DATETIMEV2;
|
||||
break;
|
||||
case tparquet::ConvertedType::type::UINT_8:
|
||||
case tparquet::ConvertedType::type::UINT_16:
|
||||
case tparquet::ConvertedType::type::INT_8:
|
||||
case tparquet::ConvertedType::type::INT_16:
|
||||
case tparquet::ConvertedType::type::INT_32:
|
||||
type.type = TYPE_INT;
|
||||
case tparquet::ConvertedType::type::UINT_32:
|
||||
case tparquet::ConvertedType::type::UINT_64:
|
||||
case tparquet::ConvertedType::type::INT_64:
|
||||
type.type = TYPE_BIGINT;
|
||||
break;
|
||||
default:
|
||||
LOG(WARNING) << "Not supported parquet ConvertedType: " << convertedType;
|
||||
break;
|
||||
}
|
||||
return type;
|
||||
}
|
||||
|
||||
Status FieldDescriptor::parse_group_field(const std::vector<tparquet::SchemaElement>& t_schemas,
|
||||
|
||||
@ -17,6 +17,8 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <gen_cpp/parquet_types.h>
|
||||
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
@ -76,6 +78,10 @@ private:
|
||||
Status parse_node_field(const std::vector<tparquet::SchemaElement>& t_schemas, size_t curr_pos,
|
||||
FieldSchema* node_field);
|
||||
|
||||
TypeDescriptor convert_to_doris_type(tparquet::LogicalType logicalType);
|
||||
|
||||
TypeDescriptor convert_to_doris_type(tparquet::ConvertedType::type convertedType);
|
||||
|
||||
public:
|
||||
FieldDescriptor() = default;
|
||||
~FieldDescriptor() = default;
|
||||
|
||||
@ -82,36 +82,11 @@ Status ParquetReader::_init_read_columns() {
|
||||
std::unordered_map<std::string, TypeDescriptor> ParquetReader::get_name_to_type() {
|
||||
std::unordered_map<std::string, TypeDescriptor> map;
|
||||
auto schema_desc = _file_metadata->schema();
|
||||
for (auto& it : _map_column) {
|
||||
TypeDescriptor type;
|
||||
if (it.first == "p_partkey") {
|
||||
type.type = TYPE_INT;
|
||||
} else if (it.first == "p_name") {
|
||||
type.type = TYPE_VARCHAR;
|
||||
type.len = 55;
|
||||
} else if (it.first == "p_mfgr") {
|
||||
type.type = TYPE_VARCHAR;
|
||||
type.len = 25;
|
||||
} else if (it.first == "p_brand") {
|
||||
type.type = TYPE_VARCHAR;
|
||||
type.len = 10;
|
||||
} else if (it.first == "p_type") {
|
||||
type.type = TYPE_VARCHAR;
|
||||
type.len = 25;
|
||||
} else if (it.first == "p_size") {
|
||||
type.type = TYPE_INT;
|
||||
} else if (it.first == "p_container") {
|
||||
type.type = TYPE_VARCHAR;
|
||||
type.len = 10;
|
||||
} else if (it.first == "p_retailprice") {
|
||||
type.type = TYPE_DECIMALV2;
|
||||
type.precision = 27;
|
||||
type.scale = 9;
|
||||
} else if (it.first == "p_comment") {
|
||||
type.type = TYPE_VARCHAR;
|
||||
type.len = 23;
|
||||
}
|
||||
map.emplace(it.first, type);
|
||||
std::unordered_set<std::string> column_names;
|
||||
schema_desc.get_column_names(&column_names);
|
||||
for (auto name : column_names) {
|
||||
auto field = schema_desc.get_column(name);
|
||||
map.emplace(name, field->type);
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user