[fix](iceberg) iceberg use customer method to encode special characters of field name (#27108)

Fix two bugs:
1. Missing column is case sensitive, change the column name to lower case in FE for hive/iceberg/hudi
2. Iceberg use custom method to encode special characters in column name. Decode the column name to match the right column in parquet reader.
This commit is contained in:
Ashin Gau
2023-11-17 18:38:55 +08:00
committed by GitHub
parent f8b61d3d8e
commit 52995c528e
12 changed files with 113 additions and 23 deletions

View File

@ -26,6 +26,7 @@
#include "common/logging.h"
#include "runtime/define_primitive_type.h"
#include "util/slice.h"
#include "util/string_util.h"
namespace doris::vectorized {
@ -239,6 +240,72 @@ TypeDescriptor FieldDescriptor::get_doris_type(const tparquet::SchemaElement& ph
return type;
}
// Copy from org.apache.iceberg.avro.AvroSchemaUtil#validAvroName
static bool is_valid_avro_name(const std::string& name) {
int length = name.length();
char first = name[0];
if (!isalpha(first) && first != '_') {
return false;
}
for (int i = 1; i < length; i++) {
char character = name[i];
if (!isalpha(character) && !isdigit(character) && character != '_') {
return false;
}
}
return true;
}
// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize
static void sanitize_avro_name(std::ostringstream& buf, char character) {
if (isdigit(character)) {
buf << '_' << character;
} else {
std::stringstream ss;
ss << std::hex << (int)character;
std::string hex_str = ss.str();
buf << "_x" << doris::to_lower(hex_str);
}
}
// Copy from org.apache.iceberg.avro.AvroSchemaUtil#sanitize
static std::string sanitize_avro_name(const std::string& name) {
std::ostringstream buf;
int length = name.length();
char first = name[0];
if (!isalpha(first) && first != '_') {
sanitize_avro_name(buf, first);
} else {
buf << first;
}
for (int i = 1; i < length; i++) {
char character = name[i];
if (!isalpha(character) && !isdigit(character) && character != '_') {
sanitize_avro_name(buf, character);
} else {
buf << character;
}
}
return buf.str();
}
void FieldDescriptor::iceberg_sanitize(const std::vector<std::string>& read_columns) {
for (const std::string& col : read_columns) {
if (!is_valid_avro_name(col)) {
std::string sanitize_name = sanitize_avro_name(col);
auto it = _name_to_field.find(sanitize_name);
if (it != _name_to_field.end()) {
FieldSchema* schema = const_cast<FieldSchema*>(it->second);
schema->name = col;
_name_to_field.emplace(col, schema);
_name_to_field.erase(sanitize_name);
}
}
}
}
TypeDescriptor FieldDescriptor::convert_to_doris_type(tparquet::LogicalType logicalType) {
TypeDescriptor type;
if (logicalType.__isset.STRING) {

View File

@ -91,6 +91,10 @@ private:
public:
TypeDescriptor get_doris_type(const tparquet::SchemaElement& physical_schema);
// org.apache.iceberg.avro.AvroSchemaUtil#sanitize will encode special characters,
// we have to decode these characters
void iceberg_sanitize(const std::vector<std::string>& read_columns);
FieldDescriptor() = default;
~FieldDescriptor() = default;

View File

@ -32,6 +32,9 @@ public:
Status init_schema();
const FieldDescriptor& schema() const { return _schema; }
const tparquet::FileMetaData& to_thrift();
void iceberg_sanitize(const std::vector<std::string>& read_columns) {
_schema.iceberg_sanitize(read_columns);
}
std::string debug_string() const;
private:

View File

@ -306,6 +306,12 @@ void ParquetReader::_init_file_description() {
}
}
void ParquetReader::iceberg_sanitize(const std::vector<std::string>& read_columns) {
if (_file_metadata != nullptr) {
_file_metadata->iceberg_sanitize(read_columns);
}
}
Status ParquetReader::init_reader(
const std::vector<std::string>& all_column_names,
const std::vector<std::string>& missing_column_names,

View File

@ -138,6 +138,9 @@ public:
const tparquet::FileMetaData* get_meta_data() const { return _t_metadata; }
// Only for iceberg reader to sanitize invalid column names
void iceberg_sanitize(const std::vector<std::string>& read_columns);
Status set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns,

View File

@ -128,6 +128,7 @@ Status IcebergTableReader::init_reader(
_gen_file_col_names();
_gen_new_colname_to_value_range();
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
parquet_reader->iceberg_sanitize(_all_required_col_names);
Status status = parquet_reader->init_reader(
_all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range,
conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,

View File

@ -467,7 +467,7 @@ Status VFileScanner::_cast_to_input_block(Block* block) {
Status VFileScanner::_fill_columns_from_path(size_t rows) {
DataTypeSerDe::FormatOptions _text_formatOptions;
for (auto& kv : *_partition_columns) {
for (auto& kv : _partition_col_descs) {
auto doris_column = _src_block_ptr->get_by_name(kv.first).column;
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
auto& [value, slot_desc] = kv.second;
@ -500,7 +500,7 @@ Status VFileScanner::_fill_missing_columns(size_t rows) {
}
SCOPED_TIMER(_fill_missing_columns_timer);
for (auto& kv : *_missing_columns) {
for (auto& kv : _missing_col_descs) {
if (kv.second == nullptr) {
// no default column, fill with null
auto nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
@ -930,9 +930,8 @@ Status VFileScanner::_get_next_reader() {
}
Status VFileScanner::_generate_fill_columns() {
_partition_columns.reset(
new std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>());
_missing_columns.reset(new std::unordered_map<std::string, VExprContextSPtr>());
_partition_col_descs.clear();
_missing_col_descs.clear();
const TFileRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.columns_from_path && !_partition_slot_descs.empty()) {
@ -949,8 +948,8 @@ Status VFileScanner::_generate_fill_columns() {
if (size == 4 && memcmp(data, "null", 4) == 0) {
data = const_cast<char*>("\\N");
}
_partition_columns->emplace(slot_desc->col_name(),
std::make_tuple(data, slot_desc));
_partition_col_descs.emplace(slot_desc->col_name(),
std::make_tuple(data, slot_desc));
}
}
}
@ -969,16 +968,11 @@ Status VFileScanner::_generate_fill_columns() {
return Status::InternalError("failed to find default value expr for slot: {}",
slot_desc->col_name());
}
_missing_columns->emplace(slot_desc->col_name(), it->second);
_missing_col_descs.emplace(slot_desc->col_name(), it->second);
}
}
RETURN_IF_ERROR(_cur_reader->set_fill_columns(*_partition_columns, *_missing_columns));
if (_cur_reader->fill_all_columns()) {
_partition_columns.reset(nullptr);
_missing_columns.reset(nullptr);
}
return Status::OK();
return _cur_reader->set_fill_columns(_partition_col_descs, _missing_col_descs);
}
Status VFileScanner::_init_expr_ctxes() {

View File

@ -162,9 +162,9 @@ protected:
std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
std::unique_ptr<io::IOContext> _io_ctx;
std::unique_ptr<std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>>
_partition_columns;
std::unique_ptr<std::unordered_map<std::string, VExprContextSPtr>> _missing_columns;
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
_partition_col_descs;
std::unordered_map<std::string, VExprContextSPtr> _missing_col_descs;
private:
RuntimeProfile::Counter* _get_block_timer = nullptr;

View File

@ -435,7 +435,7 @@ public class HMSExternalTable extends ExternalTable {
} else {
List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
for (FieldSchema field : schema) {
tmpSchema.add(new Column(field.getName(),
tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true, field.getComment(), true, -1));
}
@ -484,7 +484,7 @@ public class HMSExternalTable extends ExternalTable {
Schema schema = icebergTable.schema();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size());
for (FieldSchema field : hmsSchema) {
tmpSchema.add(new Column(field.getName(),
tmpSchema.add(new Column(field.getName().toLowerCase(Locale.ROOT),
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType(),
IcebergExternalTable.ICEBERG_DATETIME_SCALE_MS),
true, null, true, false, null, field.getComment(), true, null,
@ -500,7 +500,7 @@ public class HMSExternalTable extends ExternalTable {
for (String partitionKey : partitionKeys) {
// Do not use "getColumn()", which will cause dead loop
for (Column column : schema) {
if (partitionKey.equals(column.getName())) {
if (partitionKey.equalsIgnoreCase(column.getName())) {
// For partition column, if it is string type, change it to varchar(65535)
// to be same as doris managed table.
// This is to avoid some unexpected behavior such as different partition pruning result
@ -524,7 +524,7 @@ public class HMSExternalTable extends ExternalTable {
return getHiveColumnStats(colName);
case ICEBERG:
return StatisticsUtil.getIcebergColumnStats(colName,
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this));
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(this));
default:
LOG.warn("get column stats for dlaType {} is not supported.", dlaType);
}

View File

@ -36,6 +36,7 @@ import org.apache.iceberg.types.Types;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
public class IcebergExternalTable extends ExternalTable {
@ -66,7 +67,7 @@ public class IcebergExternalTable extends ExternalTable {
List<Types.NestedField> columns = schema.columns();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(columns.size());
for (Types.NestedField field : columns) {
tmpSchema.add(new Column(field.name(),
tmpSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
icebergTypeToDorisType(field.type()), true, null, true, field.doc(), true,
schema.caseInsensitiveFindField(field.name()).fieldId()));
}

View File

@ -1,3 +1,11 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !q01 --
599715
599715
-- !sanitize_mara --
MATNR1 3.140 /DSD/SV_CNT_GRP1
MATNR2 3.240 /DSD/SV_CNT_GRP2
MATNR4 3.440 /DSD/SV_CNT_GRP4
MATNR5 3.540 /DSD/SV_CNT_GRP5
MATNR6 3.640 /DSD/SV_CNT_GRP6

View File

@ -46,5 +46,8 @@ suite("test_external_catalog_iceberg_common", "p2,external,iceberg,external_remo
}
sql """ use `iceberg_catalog`; """
q01_parquet()
// test the special characters in table fields
qt_sanitize_mara """select MaTnR, NtgEW, `/dsd/Sv_cnt_grP` from sanitize_mara order by mAtNr"""
}
}