diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp b/be/src/vec/exec/format/parquet/schema_desc.cpp index 1b830689c0..c9283c6288 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.cpp +++ b/be/src/vec/exec/format/parquet/schema_desc.cpp @@ -26,6 +26,7 @@ #include "common/logging.h" #include "runtime/define_primitive_type.h" #include "util/slice.h" +#include "util/string_util.h" namespace doris::vectorized { @@ -239,6 +240,72 @@ TypeDescriptor FieldDescriptor::get_doris_type(const tparquet::SchemaElement& ph return type; } +// Copy from org.apache.iceberg.avro.AvroSchemaUtil#validAvroName +static bool is_valid_avro_name(const std::string& name) { + int length = name.length(); + char first = name[0]; + if (!isalpha(first) && first != '_') { + return false; + } + + for (int i = 1; i < length; i++) { + char character = name[i]; + if (!isalpha(character) && !isdigit(character) && character != '_') { + return false; + } + } + return true; +} + +// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize +static void sanitize_avro_name(std::ostringstream& buf, char character) { + if (isdigit(character)) { + buf << '_' << character; + } else { + std::stringstream ss; + ss << std::hex << (int)character; + std::string hex_str = ss.str(); + buf << "_x" << doris::to_lower(hex_str); + } +} + +// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize +static std::string sanitize_avro_name(const std::string& name) { + std::ostringstream buf; + int length = name.length(); + char first = name[0]; + if (!isalpha(first) && first != '_') { + sanitize_avro_name(buf, first); + } else { + buf << first; + } + + for (int i = 1; i < length; i++) { + char character = name[i]; + if (!isalpha(character) && !isdigit(character) && character != '_') { + sanitize_avro_name(buf, character); + } else { + buf << character; + } + } + return buf.str(); +} + +void FieldDescriptor::iceberg_sanitize(const std::vector& read_columns) { + for (const std::string& col : read_columns) { + if (!is_valid_avro_name(col)) { + std::string sanitize_name = sanitize_avro_name(col); + auto it = _name_to_field.find(sanitize_name); + if (it != _name_to_field.end()) { + FieldSchema* schema = const_cast(it->second); + schema->name = col; + _name_to_field.emplace(col, schema); + _name_to_field.erase(sanitize_name); + } + } + } +} + TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logicalType) { TypeDescriptor type; if (logicalType.__isset.STRING) { diff --git a/be/src/vec/exec/format/parquet/schema_desc.h b/be/src/vec/exec/format/parquet/schema_desc.h index 8e8f735056..50e526bd73 100644 --- a/be/src/vec/exec/format/parquet/schema_desc.h +++ b/be/src/vec/exec/format/parquet/schema_desc.h @@ -91,6 +91,10 @@ private: public: TypeDescriptor get_doris_type(const tparquet::SchemaElement& physical_schema); + // org.apache.iceberg.avro.AvroSchemaUtil#sanitize will encode special characters, + // we have to decode these characters + void iceberg_sanitize(const std::vector& read_columns); + FieldDescriptor() = default; ~FieldDescriptor() = default; diff --git a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h index 6f52ef5b4a..5d745a0db6 100644 --- a/be/src/vec/exec/format/parquet/vparquet_file_metadata.h +++ b/be/src/vec/exec/format/parquet/vparquet_file_metadata.h @@ -32,6 +32,9 @@ public: Status init_schema(); const FieldDescriptor& schema() const { return _schema; } const tparquet::FileMetaData& to_thrift(); + void iceberg_sanitize(const std::vector& read_columns) { + _schema.iceberg_sanitize(read_columns); + } std::string debug_string() const; private: diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 6eb139a7b0..9db803789f 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -306,6 +306,12 @@ void ParquetReader::_init_file_description() { } } +void ParquetReader::iceberg_sanitize(const std::vector& read_columns) { + if (_file_metadata != nullptr) { + _file_metadata->iceberg_sanitize(read_columns); + } +} + Status ParquetReader::init_reader( const std::vector& all_column_names, const std::vector& missing_column_names, diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 0e9bf4907f..aa4f0014b2 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -138,6 +138,9 @@ public: const tparquet::FileMetaData* get_meta_data() const { return _t_metadata; } + // Only for iceberg reader to sanitize invalid column names + void iceberg_sanitize(const std::vector& read_columns); + Status set_fill_columns( const std::unordered_map>& partition_columns, diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index fb3b385270..5ad1bdd315 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -128,6 +128,7 @@ Status IcebergTableReader::init_reader( _gen_file_col_names(); _gen_new_colname_to_value_range(); parquet_reader->set_table_to_file_col_map(_table_col_to_file_col); + parquet_reader->iceberg_sanitize(_all_required_col_names); Status status = parquet_reader->init_reader( _all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range, conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id, diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index 7eb93b5984..697859f464 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -467,7 +467,7 @@ Status VFileScanner::_cast_to_input_block(Block* block) { Status VFileScanner::_fill_columns_from_path(size_t rows) { DataTypeSerDe::FormatOptions _text_formatOptions; - for (auto& kv : *_partition_columns) { + for (auto& kv : _partition_col_descs) { auto doris_column = _src_block_ptr->get_by_name(kv.first).column; IColumn* col_ptr = const_cast(doris_column.get()); auto& [value, slot_desc] = kv.second; @@ -500,7 +500,7 @@ Status VFileScanner::_fill_missing_columns(size_t rows) { } SCOPED_TIMER(_fill_missing_columns_timer); - for (auto& kv : *_missing_columns) { + for (auto& kv : _missing_col_descs) { if (kv.second == nullptr) { // no default column, fill with null auto nullable_column = reinterpret_cast( @@ -930,9 +930,8 @@ Status VFileScanner::_get_next_reader() { } Status VFileScanner::_generate_fill_columns() { - _partition_columns.reset( - new std::unordered_map>()); - _missing_columns.reset(new std::unordered_map()); + _partition_col_descs.clear(); + _missing_col_descs.clear(); const TFileRangeDesc& range = _ranges.at(_next_range - 1); if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) { @@ -949,8 +948,8 @@ Status VFileScanner::_generate_fill_columns() { if (size == 4 && memcmp(data, "null", 4) == 0) { data = const_cast("\\N"); } - _partition_columns->emplace(slot_desc->col_name(), - std::make_tuple(data, slot_desc)); + _partition_col_descs.emplace(slot_desc->col_name(), + std::make_tuple(data, slot_desc)); } } } @@ -969,16 +968,11 @@ Status VFileScanner::_generate_fill_columns() { return Status::InternalError("failed to find default value expr for slot: {}", slot_desc->col_name()); } - _missing_columns->emplace(slot_desc->col_name(), it->second); + _missing_col_descs.emplace(slot_desc->col_name(), it->second); } } - RETURN_IF_ERROR(_cur_reader->set_fill_columns(*_partition_columns, *_missing_columns)); - if (_cur_reader->fill_all_columns()) { - _partition_columns.reset(nullptr); - _missing_columns.reset(nullptr); - } - return Status::OK(); + return _cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs); } Status VFileScanner::_init_expr_ctxes() { diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index 4524abb1fd..eeed2145ee 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -162,9 +162,9 @@ protected: std::unique_ptr _file_cache_statistics; std::unique_ptr _io_ctx; - std::unique_ptr>> - _partition_columns; - std::unique_ptr> _missing_columns; + std::unordered_map> + _partition_col_descs; + std::unordered_map _missing_col_descs; private: RuntimeProfile::Counter* _get_block_timer = nullptr; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index fb1c100024..0243ad12f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -435,7 +435,7 @@ public class HMSExternalTable extends ExternalTable { } else { List tmpSchema = Lists.newArrayListWithCapacity(schema.size()); for (FieldSchema field : schema) { - tmpSchema.add(new Column(field.getName(), + tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT), HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null, true, field.getComment(), true, -1)); } @@ -484,7 +484,7 @@ public class HMSExternalTable extends ExternalTable { Schema schema = icebergTable.schema(); List tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size()); for (FieldSchema field : hmsSchema) { - tmpSchema.add(new Column(field.getName(), + tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT), HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType(), IcebergExternalTable.ICEBERG_DATETIME_SCALE_MS), true, null, true, false, null, field.getComment(), true, null, @@ -500,7 +500,7 @@ public class HMSExternalTable extends ExternalTable { for (String partitionKey : partitionKeys) { // Do not use "getColumn()", which will cause dead loop for (Column column : schema) { - if (partitionKey.equals(column.getName())) { + if (partitionKey.equalsIgnoreCase(column.getName())) { // For partition column, if it is string type, change it to varchar(65535) // to be same as doris managed table. // This is to avoid some unexpected behavior such as different partition pruning result @@ -524,7 +524,7 @@ public class HMSExternalTable extends ExternalTable { return getHiveColumnStats(colName); case ICEBERG: return StatisticsUtil.getIcebergColumnStats(colName, - Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this)); + Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this)); default: LOG.warn("get column stats for dlaType {} is not supported.", dlaType); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java index bede9b99e4..7398ff19c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java @@ -36,6 +36,7 @@ import org.apache.iceberg.types.Types; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Optional; public class IcebergExternalTable extends ExternalTable { @@ -66,7 +67,7 @@ public class IcebergExternalTable extends ExternalTable { List columns = schema.columns(); List tmpSchema = Lists.newArrayListWithCapacity(columns.size()); for (Types.NestedField field : columns) { - tmpSchema.add(new Column(field.name(), + tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT), icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true, schema.caseInsensitiveFindField(field.name()).fieldId())); } diff --git a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_common.out b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_common.out index 9554f1d21f..a51bac0e1b 100644 --- a/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_common.out +++ b/regression-test/data/external_table_p2/iceberg/test_external_catalog_iceberg_common.out @@ -1,3 +1,11 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !q01 -- -599715 \ No newline at end of file +599715 + +-- !sanitize_mara -- +MATNR1 3.140 /DSD/SV_CNT_GRP1 +MATNR2 3.240 /DSD/SV_CNT_GRP2 +MATNR4 3.440 /DSD/SV_CNT_GRP4 +MATNR5 3.540 /DSD/SV_CNT_GRP5 +MATNR6 3.640 /DSD/SV_CNT_GRP6 + diff --git a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_common.groovy b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_common.groovy index a035ea6d1b..577a4e6702 100644 --- a/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_common.groovy +++ b/regression-test/suites/external_table_p2/iceberg/test_external_catalog_iceberg_common.groovy @@ -46,5 +46,8 @@ suite("test_external_catalog_iceberg_common", "p2,external,iceberg,external_remo } sql """ use `iceberg_catalog`; """ q01_parquet() + + // test the special characters in table fields + qt_sanitize_mara """select MaTnR, NtgEW, `/dsd/Sv_cnt_grP` from sanitize_mara order by mAtNr""" } }