From 3ebc98228dff8299fb6d8167b48fa5baedd38f9a Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Fri, 20 Jan 2023 12:57:36 +0800 Subject: [PATCH] [feature wip](multi catalog)Support iceberg schema evolution. (#15836) Support iceberg schema evolution for parquet file format. Iceberg use unique id for each column to support schema evolution. To support this feature in Doris, FE side need to get the current column id for each column and send the ids to be side. Be read column id from parquet key_value_metadata, set the changed column name in Block to match the name in parquet file before reading data. And set the name back after reading data. --- be/src/vec/core/block.h | 3 +- .../exec/format/parquet/vparquet_reader.cpp | 43 +++++- .../vec/exec/format/parquet/vparquet_reader.h | 16 +- .../vec/exec/format/table/iceberg_reader.cpp | 143 +++++++++++++++++- be/src/vec/exec/format/table/iceberg_reader.h | 26 ++++ be/src/vec/exec/scan/vfile_scanner.cpp | 14 +- be/src/vec/exec/scan/vfile_scanner.h | 6 +- .../vec/exec/parquet/parquet_reader_test.cpp | 4 +- .../catalog/HiveMetaStoreClientHelper.java | 27 ++++ .../doris/datasource/HMSExternalCatalog.java | 27 ++++ .../planner/external/HiveScanProvider.java | 10 -- .../planner/external/IcebergScanProvider.java | 20 +-- .../iceberg/iceberg_schema_evolution.out | 79 ++++++++++ .../iceberg/iceberg_schema_evolution.groovy | 67 ++++++++ 14 files changed, 431 insertions(+), 54 deletions(-) create mode 100644 regression-test/data/external_catalog_p0/iceberg/iceberg_schema_evolution.out create mode 100644 regression-test/suites/external_catalog_p0/iceberg/iceberg_schema_evolution.groovy diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h index 4ebc018b5b..86138a63f8 100644 --- a/be/src/vec/core/block.h +++ b/be/src/vec/core/block.h @@ -102,6 +102,8 @@ public: std::swap(data, new_data); } + void initialize_index_by_name(); + /// References are invalidated after calling functions above. ColumnWithTypeAndName& get_by_position(size_t position) { return data[position]; } const ColumnWithTypeAndName& get_by_position(size_t position) const { return data[position]; } @@ -367,7 +369,6 @@ public: private: void erase_impl(size_t position); - void initialize_index_by_name(); bool is_column_data_null(const doris::TypeDescriptor& type_desc, const StringRef& data_ref, const IColumn* column_with_type_and_name, int row); void deep_copy_slot(void* dst, MemPool* pool, const doris::TypeDescriptor& type_desc, diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp index 0c6bfc085a..1bedf556c0 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp @@ -24,6 +24,7 @@ #include "olap/iterators.h" #include "parquet_pred_cmp.h" #include "parquet_thrift_util.h" +#include "rapidjson/document.h" #include "vec/exprs/vbloom_predicate.h" #include "vec/exprs/vin_predicate.h" #include "vec/exprs/vruntimefilter_wrapper.h" @@ -167,21 +168,46 @@ Status ParquetReader::_open_file() { return Status::OK(); } +// Get iceberg col id to col name map stored in parquet metadata key values. +// This is for iceberg schema evolution. +std::vector ParquetReader::get_metadata_key_values() { + return _t_metadata->key_value_metadata; +} + +Status ParquetReader::open() { + SCOPED_RAW_TIMER(&_statistics.parse_meta_time); + RETURN_IF_ERROR(_open_file()); + _t_metadata = &_file_metadata->to_thrift(); + return Status::OK(); +} + Status ParquetReader::init_reader( - const std::vector& column_names, + const std::vector& all_column_names, + const std::vector& missing_column_names, std::unordered_map* colname_to_value_range, VExprContext* vconjunct_ctx, bool filter_groups) { SCOPED_RAW_TIMER(&_statistics.parse_meta_time); - RETURN_IF_ERROR(_open_file()); - _column_names = &column_names; - _t_metadata = &_file_metadata->to_thrift(); _total_groups = _t_metadata->row_groups.size(); if (_total_groups == 0) { return Status::EndOfFile("Empty Parquet File"); } + // all_column_names are all the columns required by user sql. + // missing_column_names are the columns required by user sql but not in the parquet file, + // e.g. table added a column after this parquet file was written. + _column_names = &all_column_names; auto schema_desc = _file_metadata->schema(); for (int i = 0; i < schema_desc.size(); ++i) { - _map_column.emplace(schema_desc.get_column(i)->name, i); + auto name = schema_desc.get_column(i)->name; + // If the column in parquet file is included in all_column_names and not in missing_column_names, + // add it to _map_column, which means the reader should read the data of this column. + // Here to check against missing_column_names is to for the 'Add a column with back to the table + // with the same column name' case. Shouldn't read this column data in this case. + if (find(all_column_names.begin(), all_column_names.end(), name) != + all_column_names.end() && + find(missing_column_names.begin(), missing_column_names.end(), name) == + missing_column_names.end()) { + _map_column.emplace(name, i); + } } _colname_to_value_range = colname_to_value_range; RETURN_IF_ERROR(_init_read_columns()); @@ -199,7 +225,12 @@ Status ParquetReader::set_fill_columns( std::unordered_map predicate_columns; std::function visit_slot = [&](VExpr* expr) { if (VSlotRef* slot_ref = typeid_cast(expr)) { - predicate_columns.emplace(slot_ref->expr_name(), slot_ref->column_id()); + auto expr_name = slot_ref->expr_name(); + auto iter = _table_col_to_file_col.find(expr_name); + if (iter != _table_col_to_file_col.end()) { + expr_name = iter->second; + } + predicate_columns.emplace(expr_name, slot_ref->column_id()); if (slot_ref->column_id() == 0) { _lazy_read_ctx.resize_first_column = false; } diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h index 3caa12d14e..904f528b7b 100644 --- a/be/src/vec/exec/format/parquet/vparquet_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_reader.h @@ -66,13 +66,11 @@ public: // for test void set_file_reader(io::FileReaderSPtr file_reader) { _file_reader = file_reader; } - Status init_reader(const std::vector& column_names, bool filter_groups = true) { - // without predicate - return init_reader(column_names, nullptr, nullptr, filter_groups); - } + Status open(); Status init_reader( - const std::vector& column_names, + const std::vector& all_column_names, + const std::vector& missing_column_names, std::unordered_map* colname_to_value_range, VExprContext* vconjunct_ctx, bool filter_groups = true); @@ -103,6 +101,11 @@ public: partition_columns, const std::unordered_map& missing_columns) override; + std::vector get_metadata_key_values(); + void set_table_to_file_col_map(std::unordered_map& map) { + _table_col_to_file_col = map; + } + private: struct ParquetProfile { RuntimeProfile::Counter* filtered_row_groups; @@ -165,6 +168,8 @@ private: bool _row_group_eof = true; int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file std::map _map_column; // column-name <---> column-index + // table column name to file column name map. For iceberg schema evolution. + std::unordered_map _table_col_to_file_col; std::unordered_map* _colname_to_value_range; std::vector _read_columns; RowRange _whole_range = RowRange(0, 0); @@ -189,7 +194,6 @@ private: ParquetColumnReader::Statistics _column_statistics; ParquetProfile _parquet_profile; bool _closed = false; - IOContext* _io_ctx; }; } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp index 712d43324b..4848b0b1c6 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.cpp +++ b/be/src/vec/exec/format/table/iceberg_reader.cpp @@ -59,8 +59,52 @@ IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, Runtim ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile); } +Status IcebergTableReader::init_reader( + std::vector& file_col_names, + std::unordered_map& col_id_name_map, + std::unordered_map* colname_to_value_range, + VExprContext* vconjunct_ctx) { + ParquetReader* parquet_reader = static_cast(_file_format_reader.get()); + _col_id_name_map = col_id_name_map; + _file_col_names = file_col_names; + _colname_to_value_range = colname_to_value_range; + auto parquet_meta_kv = parquet_reader->get_metadata_key_values(); + _gen_col_name_maps(parquet_meta_kv); + _gen_file_col_names(); + _gen_new_colname_to_value_range(); + parquet_reader->set_table_to_file_col_map(_table_col_to_file_col); + Status status = parquet_reader->init_reader(_all_required_col_names, _not_in_file_col_names, + &_new_colname_to_value_range, vconjunct_ctx); + return status; +} + Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { - return _file_format_reader->get_next_block(block, read_rows, eof); + // To support iceberg schema evolution. We change the column name in block to + // make it match with the column name in parquet file before reading data. and + // Set the name back to table column name before return this block. + if (_has_schema_change) { + for (int i = 0; i < block->columns(); i++) { + ColumnWithTypeAndName& col = block->get_by_position(i); + auto iter = _table_col_to_file_col.find(col.name); + if (iter != _table_col_to_file_col.end()) { + col.name = iter->second; + } + } + block->initialize_index_by_name(); + } + auto res = _file_format_reader->get_next_block(block, read_rows, eof); + // Set the name back to table column name before return this block. + if (_has_schema_change) { + for (int i = 0; i < block->columns(); i++) { + ColumnWithTypeAndName& col = block->get_by_position(i); + auto iter = _file_col_to_table_col.find(col.name); + if (iter != _file_col_to_table_col.end()) { + col.name = iter->second; + } + } + block->initialize_index_by_name(); + } + return res; } Status IcebergTableReader::set_fill_columns( @@ -138,8 +182,8 @@ Status IcebergTableReader::_position_delete( delete_reader.get_parsed_schema(&delete_file_col_names, &delete_file_col_types); init_schema = true; } - create_status = - delete_reader.init_reader(delete_file_col_names, nullptr, nullptr, false); + create_status = delete_reader.init_reader(delete_file_col_names, _not_in_file_col_names, + nullptr, nullptr, false); if (!create_status.ok()) { return nullptr; } @@ -355,4 +399,97 @@ void IcebergTableReader::_sort_delete_rows(std::vector*>& d } } +/* + * To support schema evolution, Iceberg write the column id to column name map to + * parquet file key_value_metadata. + * This function is to compare the table schema from FE (_col_id_name_map) with + * the schema in key_value_metadata for the current parquet file and generate two maps + * for future use: + * 1. table column name to parquet column name. + * 2. parquet column name to table column name. + * For example, parquet file has a column 'col1', + * after this file was written, iceberg changed the column name to 'col1_new'. + * The two maps would contain: + * 1. col1_new -> col1 + * 2. col1 -> col1_new + */ +Status IcebergTableReader::_gen_col_name_maps(std::vector parquet_meta_kv) { + for (int i = 0; i < parquet_meta_kv.size(); ++i) { + tparquet::KeyValue kv = parquet_meta_kv[i]; + if (kv.key == "iceberg.schema") { + std::string schema = kv.value; + rapidjson::Document json; + json.Parse(schema.c_str()); + + if (json.HasMember("fields")) { + rapidjson::Value& fields = json["fields"]; + if (fields.IsArray()) { + for (int j = 0; j < fields.Size(); j++) { + rapidjson::Value& e = fields[j]; + rapidjson::Value& id = e["id"]; + rapidjson::Value& name = e["name"]; + std::string name_string = name.GetString(); + transform(name_string.begin(), name_string.end(), name_string.begin(), + ::tolower); + auto iter = _col_id_name_map.find(id.GetInt()); + if (iter != _col_id_name_map.end()) { + _table_col_to_file_col.emplace(iter->second, name_string); + _file_col_to_table_col.emplace(name_string, iter->second); + if (name_string != iter->second) { + _has_schema_change = true; + } + } else { + _has_schema_change = true; + } + } + } + } + break; + } + } + return Status::OK(); +} + +/* + * Generate _all_required_col_names and _not_in_file_col_names. + * + * _all_required_col_names is all the columns required by user sql. + * If the column name has been modified after the data file was written, + * put the old name in data file to _all_required_col_names. + * + * _not_in_file_col_names is all the columns required by user sql but not in the data file. + * e.g. New columns added after this data file was written. + * The columns added with names used by old dropped columns should consider as a missing column, + * which should be in _not_in_file_col_names. + */ +void IcebergTableReader::_gen_file_col_names() { + _all_required_col_names.clear(); + _not_in_file_col_names.clear(); + for (int i = 0; i < _file_col_names.size(); ++i) { + auto name = _file_col_names[i]; + auto iter = _table_col_to_file_col.find(name); + if (iter == _table_col_to_file_col.end()) { + _all_required_col_names.emplace_back(name); + _not_in_file_col_names.emplace_back(name); + } else { + _all_required_col_names.emplace_back(iter->second); + } + } +} + +/* + * Generate _new_colname_to_value_range, by replacing the column name in + * _colname_to_value_range with column name in data file. + */ +void IcebergTableReader::_gen_new_colname_to_value_range() { + for (auto it = _colname_to_value_range->begin(); it != _colname_to_value_range->end(); it++) { + auto iter = _table_col_to_file_col.find(it->first); + if (iter == _table_col_to_file_col.end()) { + _new_colname_to_value_range.emplace(it->first, it->second); + } else { + _new_colname_to_value_range.emplace(iter->second, it->second); + } + } +} + } // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h index e4757ed804..767cecb726 100644 --- a/be/src/vec/exec/format/table/iceberg_reader.h +++ b/be/src/vec/exec/format/table/iceberg_reader.h @@ -56,6 +56,12 @@ public: Status get_columns(std::unordered_map* name_to_type, std::unordered_set* missing_cols) override; + Status init_reader( + std::vector& file_col_names, + std::unordered_map& col_id_name_map, + std::unordered_map* colname_to_value_range, + VExprContext* vconjunct_ctx); + enum { DATA, POSITION_DELETE, EQUALITY_DELETE }; private: @@ -81,6 +87,10 @@ private: PositionDeleteRange _get_range(const ColumnString& file_path_column); + Status _gen_col_name_maps(std::vector parquet_meta_kv); + void _gen_file_col_names(); + void _gen_new_colname_to_value_range(); + RuntimeProfile* _profile; RuntimeState* _state; const TFileScanRangeParams& _params; @@ -88,8 +98,24 @@ private: KVCache& _kv_cache; IcebergProfile _iceberg_profile; std::vector _delete_rows; + // col names from _file_slot_descs + std::vector _file_col_names; + // file column name to table column name map. For iceberg schema evolution. + std::unordered_map _file_col_to_table_col; + // table column name to file column name map. For iceberg schema evolution. + std::unordered_map _table_col_to_file_col; + std::unordered_map* _colname_to_value_range; + // copy from _colname_to_value_range with new column name that is in parquet file, to support schema evolution. + std::unordered_map _new_colname_to_value_range; + // column id to name map. Collect from FE slot descriptor. + std::unordered_map _col_id_name_map; + // col names in the parquet file + std::vector _all_required_col_names; + // col names in table but not in parquet file + std::vector _not_in_file_col_names; IOContext* _io_ctx; + bool _has_schema_change = false; }; } // namespace vectorized } // namespace doris diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index c6fb71bb58..29a64edbe4 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -491,20 +491,24 @@ Status VFileScanner::_get_next_reader() { ParquetReader* parquet_reader = new ParquetReader( _profile, _params, range, _state->query_options().batch_size, const_cast(&_state->timezone_obj()), _io_ctx.get()); + RETURN_IF_ERROR(parquet_reader->open()); if (!_is_load && _push_down_expr == nullptr && _vconjunct_ctx != nullptr) { RETURN_IF_ERROR(_vconjunct_ctx->clone(_state, &_push_down_expr)); _discard_conjuncts(); } - init_status = parquet_reader->init_reader(_file_col_names, _colname_to_value_range, - _push_down_expr); if (range.__isset.table_format_params && range.table_format_params.table_format_type == "iceberg") { IcebergTableReader* iceberg_reader = new IcebergTableReader((GenericReader*)parquet_reader, _profile, _state, _params, range, _kv_cache, _io_ctx.get()); + iceberg_reader->init_reader(_file_col_names, _col_id_name_map, + _colname_to_value_range, _push_down_expr); RETURN_IF_ERROR(iceberg_reader->init_row_filters(range)); _cur_reader.reset((GenericReader*)iceberg_reader); } else { + std::vector place_holder; + init_status = parquet_reader->init_reader(_file_col_names, place_holder, + _colname_to_value_range, _push_down_expr); _cur_reader.reset((GenericReader*)parquet_reader); } break; @@ -646,10 +650,10 @@ Status VFileScanner::_init_expr_ctxes() { } if (slot_info.is_file_slot) { _file_slot_descs.emplace_back(it->second); - auto iti = full_src_index_map.find(slot_id); - _file_slot_index_map.emplace(slot_id, iti->second); - _file_slot_name_map.emplace(it->second->col_name(), iti->second); _file_col_names.push_back(it->second->col_name()); + if (it->second->col_unique_id() > 0) { + _col_id_name_map.emplace(it->second->col_unique_id(), it->second->col_name()); + } } else { _partition_slot_descs.emplace_back(it->second); if (_is_load) { diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h index a4afca8233..8ba5174733 100644 --- a/be/src/vec/exec/scan/vfile_scanner.h +++ b/be/src/vec/exec/scan/vfile_scanner.h @@ -68,12 +68,10 @@ protected: std::unordered_map* _colname_to_value_range; // File source slot descriptors std::vector _file_slot_descs; - // File slot id to index in _file_slot_descs - std::unordered_map _file_slot_index_map; - // file col name to index in _file_slot_descs - std::map _file_slot_name_map; // col names from _file_slot_descs std::vector _file_col_names; + // column id to name map. Collect from FE slot descriptor. + std::unordered_map _col_id_name_map; // Partition source slot descriptors std::vector _partition_slot_descs; diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp b/be/test/vec/exec/parquet/parquet_reader_test.cpp index 6379513e3b..42ff7cf538 100644 --- a/be/test/vec/exec/parquet/parquet_reader_test.cpp +++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp @@ -98,6 +98,7 @@ TEST_F(ParquetReaderTest, normal) { TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz); auto tuple_desc = desc_tbl->get_tuple_descriptor(0); std::vector column_names; + std::vector missing_column_names; for (int i = 0; i < slot_descs.size(); i++) { column_names.push_back(slot_descs[i]->col_name()); } @@ -114,7 +115,8 @@ TEST_F(ParquetReaderTest, normal) { runtime_state.init_mem_trackers(); std::unordered_map colname_to_value_range; - p_reader->init_reader(column_names, nullptr, nullptr); + p_reader->open(); + p_reader->init_reader(column_names, missing_column_names, nullptr, nullptr); std::unordered_map> partition_columns; std::unordered_map missing_columns; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index 38ab733e7f..adf5dccc0a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -32,6 +32,7 @@ import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.backup.BlobStorage; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; @@ -43,8 +44,10 @@ import com.google.common.base.Strings; import com.google.common.collect.Maps; import com.google.common.collect.Queues; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; @@ -64,6 +67,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -74,6 +78,7 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.Deque; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -813,6 +818,28 @@ public class HiveMetaStoreClientHelper { } return output.toString(); } + + public static org.apache.iceberg.Table getIcebergTable(HMSExternalTable table) { + String metastoreUri = table.getMetastoreUri(); + org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); + Configuration conf = getConfiguration(table); + hiveCatalog.setConf(conf); + // initialize hive catalog + Map catalogProperties = new HashMap<>(); + catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, metastoreUri); + catalogProperties.put("uri", metastoreUri); + hiveCatalog.initialize("hive", catalogProperties); + + return hiveCatalog.loadTable(TableIdentifier.of(table.getDbName(), table.getName())); + } + + public static Configuration getConfiguration(HMSExternalTable table) { + Configuration conf = new HdfsConfiguration(); + for (Map.Entry entry : table.getHadoopProperties().entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + return conf; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java index 70ba9f9aa9..967a28c9ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.external.ExternalDatabase; import org.apache.doris.catalog.external.HMSExternalDatabase; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.Config; import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient; import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException; @@ -37,12 +38,15 @@ import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NotificationEventResponse; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; /** * External catalog for hive metastore compatible data sources. @@ -173,6 +177,16 @@ public class HMSExternalCatalog extends ExternalCatalog { public List getSchema(String dbName, String tblName) { makeSureInitialized(); List schema = getClient().getSchema(dbName, tblName); + Optional db = getDb(dbName); + if (db.isPresent()) { + Optional table = db.get().getTable(tblName); + if (table.isPresent()) { + HMSExternalTable hmsTable = (HMSExternalTable) table.get(); + if (hmsTable.getDlaType().equals(HMSExternalTable.DLAType.ICEBERG)) { + return getIcebergSchema(hmsTable, schema); + } + } + } List tmpSchema = Lists.newArrayListWithCapacity(schema.size()); for (FieldSchema field : schema) { tmpSchema.add(new Column(field.getName(), @@ -182,6 +196,19 @@ public class HMSExternalCatalog extends ExternalCatalog { return tmpSchema; } + private List getIcebergSchema(HMSExternalTable table, List hmsSchema) { + Table icebergTable = HiveMetaStoreClientHelper.getIcebergTable(table); + Schema schema = icebergTable.schema(); + List tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size()); + for (FieldSchema field : hmsSchema) { + tmpSchema.add(new Column(field.getName(), + HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null, + true, null, field.getComment(), true, null, + schema.caseInsensitiveFindField(field.getName()).fieldId())); + } + return tmpSchema; + } + public void setLastSyncedEventId(long lastSyncedEventId) { this.lastSyncedEventId = lastSyncedEventId; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java index 44fa06f0f8..daf238ee8c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java @@ -52,8 +52,6 @@ import org.apache.doris.thrift.TFileType; import com.google.common.base.Joiner; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.mapred.FileSplit; @@ -211,14 +209,6 @@ public class HiveScanProvider extends HMSTableScanProvider { allFiles.addAll(files); } - protected Configuration getConfiguration() { - Configuration conf = new HdfsConfiguration(); - for (Map.Entry entry : hmsTable.getHadoopProperties().entrySet()) { - conf.set(entry.getKey(), entry.getValue()); - } - return conf; - } - public int getTotalPartitionNum() { return totalPartitionNum; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java index 21bfb6670f..4b8bdbee98 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java @@ -21,7 +21,7 @@ import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.TableSnapshot; import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.HMSResource; +import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; @@ -35,7 +35,6 @@ import org.apache.doris.thrift.TIcebergDeleteFileDesc; import org.apache.doris.thrift.TIcebergFileDesc; import org.apache.doris.thrift.TTableFormatFileDesc; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.InputSplit; import org.apache.iceberg.BaseTable; @@ -46,7 +45,6 @@ import org.apache.iceberg.HistoryEntry; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.TableProperties; import org.apache.iceberg.TableScan; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NotFoundException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.types.Conversions; @@ -55,7 +53,6 @@ import java.nio.ByteBuffer; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -139,7 +136,7 @@ public class IcebergScanProvider extends HiveScanProvider { } } - org.apache.iceberg.Table table = getIcebergTable(); + org.apache.iceberg.Table table = HiveMetaStoreClientHelper.getIcebergTable(hmsTable); TableScan scan = table.newScan(); TableSnapshot tableSnapshot = desc.getRef().getTableSnapshot(); if (tableSnapshot != null) { @@ -221,19 +218,6 @@ public class IcebergScanProvider extends HiveScanProvider { return filters; } - private org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException { - org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); - Configuration conf = getConfiguration(); - hiveCatalog.setConf(conf); - // initialize hive catalog - Map catalogProperties = new HashMap<>(); - catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, getMetaStoreUrl()); - catalogProperties.put("uri", getMetaStoreUrl()); - hiveCatalog.initialize("hive", catalogProperties); - - return hiveCatalog.loadTable(TableIdentifier.of(hmsTable.getDbName(), hmsTable.getName())); - } - @Override public List getPathPartitionKeys() throws DdlException, MetaNotFoundException { return Collections.emptyList(); diff --git a/regression-test/data/external_catalog_p0/iceberg/iceberg_schema_evolution.out b/regression-test/data/external_catalog_p0/iceberg/iceberg_schema_evolution.out new file mode 100644 index 0000000000..dba805ca6d --- /dev/null +++ b/regression-test/data/external_catalog_p0/iceberg/iceberg_schema_evolution.out @@ -0,0 +1,79 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !rename1 -- +1 orig2_1 orig3_1 +2 orig2_2 orig3_2 +3 orig2_3 orig3_3 +4 orig2_4 rename3_1 +5 orig2_5 rename3_2 +6 orig2_6 rename3_3 + +-- !rename2 -- +3 orig2_3 orig3_3 +4 orig2_4 rename3_1 + +-- !drop1 -- +1 orig3_1 +2 orig3_2 +3 orig3_3 +4 orig3_4 +5 orig3_5 +6 orig3_6 + +-- !drop2 -- +1 orig3_1 +2 orig3_2 +3 orig3_3 + +-- !drop3 -- +4 orig3_4 +5 orig3_5 +6 orig3_6 + +-- !add1 -- +1 orig2_1 orig3_1 \N +2 orig2_2 orig3_2 \N +3 orig2_3 orig3_3 \N +4 orig2_4 orig3_4 add1_1 +5 orig2_5 orig3_5 add1_2 +6 orig2_6 orig3_6 add1_3 + +-- !add2 -- +2 orig2_2 orig3_2 \N + +-- !add3 -- +5 orig2_5 orig3_5 add1_2 + +-- !reorder1 -- +1 orig3_1 orig2_1 +2 orig3_2 orig2_2 +3 orig3_3 orig2_3 +4 orig3_4 orig2_4 +5 orig3_5 orig2_5 +6 orig3_6 orig2_6 + +-- !reorder2 -- +2 orig3_2 orig2_2 + +-- !reorder3 -- +5 orig3_5 orig2_5 + +-- !readd1 -- +1 orig2_1 \N +2 orig2_2 \N +3 orig2_3 \N +4 orig2_4 orig3_4 +5 orig2_5 orig3_5 +6 orig2_6 orig3_6 + +-- !readd2 -- +1 orig2_1 \N +2 orig2_2 \N +3 orig2_3 \N +4 orig2_4 orig3_4 + +-- !readd3 -- +3 orig2_3 \N +4 orig2_4 orig3_4 +5 orig2_5 orig3_5 +6 orig2_6 orig3_6 + diff --git a/regression-test/suites/external_catalog_p0/iceberg/iceberg_schema_evolution.groovy b/regression-test/suites/external_catalog_p0/iceberg/iceberg_schema_evolution.groovy new file mode 100644 index 0000000000..d014cafc03 --- /dev/null +++ b/regression-test/suites/external_catalog_p0/iceberg/iceberg_schema_evolution.groovy @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("iceberg_schema_evolution", "p0") { + def rename1 = """select * from rename_test order by rename_1;""" + def rename2 = """select * from rename_test where rename_1 in (3, 4) order by rename_1;""" + def drop1 = """select * from drop_test order by orig1;""" + def drop2 = """select * from drop_test where orig1<=3 order by orig1;""" + def drop3 = """select * from drop_test where orig1>3 order by orig1;""" + def add1 = """select * from add_test order by orig1;""" + def add2 = """select * from add_test where orig1 = 2;""" + def add3 = """select * from add_test where orig1 = 5;""" + def reorder1 = """select * from reorder_test order by orig1;""" + def reorder2 = """select * from reorder_test where orig1 = 2;""" + def reorder3 = """select * from reorder_test where orig1 = 5;""" + def readd1 = """select * from readd_test order by orig1;""" + def readd2 = """select * from readd_test where orig1<5 order by orig1;""" + def readd3 = """select * from readd_test where orig1>2 order by orig1;""" + + + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + String catalog_name = "iceberg_schema_evolution" + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}' + ); + """ + logger.info("catalog " + catalog_name + " created") + sql """switch ${catalog_name};""" + logger.info("switched to catalog " + catalog_name) + sql """use iceberg_schema_evolution;""" + qt_rename1 rename1 + qt_rename2 rename2 + qt_drop1 drop1 + qt_drop2 drop2 + qt_drop3 drop3 + qt_add1 add1 + qt_add2 add2 + qt_add3 add3 + qt_reorder1 reorder1 + qt_reorder2 reorder2 + qt_reorder3 reorder3 + qt_readd1 readd1 + qt_readd2 readd2 + qt_readd3 readd3 + } +} +