[feature wip](multi catalog)Support iceberg schema evolution. (#15836)

Support iceberg schema evolution for parquet file format.
Iceberg use unique id for each column to support schema evolution.
To support this feature in Doris, FE side need to get the current column id for each column and send the ids to be side.
Be read column id from parquet key_value_metadata, set the changed column name in Block to match the name in parquet file before reading data. And set the name back after reading data.
This commit is contained in:
Jibing-Li
2023-01-20 12:57:36 +08:00
committed by GitHub
parent ab4127d0b2
commit 3ebc98228d
14 changed files with 431 additions and 54 deletions

View File

@ -102,6 +102,8 @@ public:
std::swap(data, new_data);
}
void initialize_index_by_name();
/// References are invalidated after calling functions above.
ColumnWithTypeAndName& get_by_position(size_t position) { return data[position]; }
const ColumnWithTypeAndName& get_by_position(size_t position) const { return data[position]; }
@ -367,7 +369,6 @@ public:
private:
void erase_impl(size_t position);
void initialize_index_by_name();
bool is_column_data_null(const doris::TypeDescriptor& type_desc, const StringRef& data_ref,
const IColumn* column_with_type_and_name, int row);
void deep_copy_slot(void* dst, MemPool* pool, const doris::TypeDescriptor& type_desc,

View File

@ -24,6 +24,7 @@
#include "olap/iterators.h"
#include "parquet_pred_cmp.h"
#include "parquet_thrift_util.h"
#include "rapidjson/document.h"
#include "vec/exprs/vbloom_predicate.h"
#include "vec/exprs/vin_predicate.h"
#include "vec/exprs/vruntimefilter_wrapper.h"
@ -167,21 +168,46 @@ Status ParquetReader::_open_file() {
return Status::OK();
}
// Get iceberg col id to col name map stored in parquet metadata key values.
// This is for iceberg schema evolution.
std::vector<tparquet::KeyValue> ParquetReader::get_metadata_key_values() {
return _t_metadata->key_value_metadata;
}
Status ParquetReader::open() {
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
RETURN_IF_ERROR(_open_file());
_t_metadata = &_file_metadata->to_thrift();
return Status::OK();
}
Status ParquetReader::init_reader(
const std::vector<std::string>& column_names,
const std::vector<std::string>& all_column_names,
const std::vector<std::string>& missing_column_names,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
VExprContext* vconjunct_ctx, bool filter_groups) {
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
RETURN_IF_ERROR(_open_file());
_column_names = &column_names;
_t_metadata = &_file_metadata->to_thrift();
_total_groups = _t_metadata->row_groups.size();
if (_total_groups == 0) {
return Status::EndOfFile("Empty Parquet File");
}
// all_column_names are all the columns required by user sql.
// missing_column_names are the columns required by user sql but not in the parquet file,
// e.g. table added a column after this parquet file was written.
_column_names = &all_column_names;
auto schema_desc = _file_metadata->schema();
for (int i = 0; i < schema_desc.size(); ++i) {
_map_column.emplace(schema_desc.get_column(i)->name, i);
auto name = schema_desc.get_column(i)->name;
// If the column in parquet file is included in all_column_names and not in missing_column_names,
// add it to _map_column, which means the reader should read the data of this column.
// Here to check against missing_column_names is to for the 'Add a column with back to the table
// with the same column name' case. Shouldn't read this column data in this case.
if (find(all_column_names.begin(), all_column_names.end(), name) !=
all_column_names.end() &&
find(missing_column_names.begin(), missing_column_names.end(), name) ==
missing_column_names.end()) {
_map_column.emplace(name, i);
}
}
_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_init_read_columns());
@ -199,7 +225,12 @@ Status ParquetReader::set_fill_columns(
std::unordered_map<std::string, uint32_t> predicate_columns;
std::function<void(VExpr * expr)> visit_slot = [&](VExpr* expr) {
if (VSlotRef* slot_ref = typeid_cast<VSlotRef*>(expr)) {
predicate_columns.emplace(slot_ref->expr_name(), slot_ref->column_id());
auto expr_name = slot_ref->expr_name();
auto iter = _table_col_to_file_col.find(expr_name);
if (iter != _table_col_to_file_col.end()) {
expr_name = iter->second;
}
predicate_columns.emplace(expr_name, slot_ref->column_id());
if (slot_ref->column_id() == 0) {
_lazy_read_ctx.resize_first_column = false;
}

View File

@ -66,13 +66,11 @@ public:
// for test
void set_file_reader(io::FileReaderSPtr file_reader) { _file_reader = file_reader; }
Status init_reader(const std::vector<std::string>& column_names, bool filter_groups = true) {
// without predicate
return init_reader(column_names, nullptr, nullptr, filter_groups);
}
Status open();
Status init_reader(
const std::vector<std::string>& column_names,
const std::vector<std::string>& all_column_names,
const std::vector<std::string>& missing_column_names,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
VExprContext* vconjunct_ctx, bool filter_groups = true);
@ -103,6 +101,11 @@ public:
partition_columns,
const std::unordered_map<std::string, VExprContext*>& missing_columns) override;
std::vector<tparquet::KeyValue> get_metadata_key_values();
void set_table_to_file_col_map(std::unordered_map<std::string, std::string>& map) {
_table_col_to_file_col = map;
}
private:
struct ParquetProfile {
RuntimeProfile::Counter* filtered_row_groups;
@ -165,6 +168,8 @@ private:
bool _row_group_eof = true;
int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
std::map<std::string, int> _map_column; // column-name <---> column-index
// table column name to file column name map. For iceberg schema evolution.
std::unordered_map<std::string, std::string> _table_col_to_file_col;
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
std::vector<ParquetReadColumn> _read_columns;
RowRange _whole_range = RowRange(0, 0);
@ -189,7 +194,6 @@ private:
ParquetColumnReader::Statistics _column_statistics;
ParquetProfile _parquet_profile;
bool _closed = false;
IOContext* _io_ctx;
};
} // namespace doris::vectorized

View File

@ -59,8 +59,52 @@ IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, Runtim
ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
}
Status IcebergTableReader::init_reader(
std::vector<std::string>& file_col_names,
std::unordered_map<int, std::string>& col_id_name_map,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
VExprContext* vconjunct_ctx) {
ParquetReader* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get());
_col_id_name_map = col_id_name_map;
_file_col_names = file_col_names;
_colname_to_value_range = colname_to_value_range;
auto parquet_meta_kv = parquet_reader->get_metadata_key_values();
_gen_col_name_maps(parquet_meta_kv);
_gen_file_col_names();
_gen_new_colname_to_value_range();
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
Status status = parquet_reader->init_reader(_all_required_col_names, _not_in_file_col_names,
&_new_colname_to_value_range, vconjunct_ctx);
return status;
}
Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
return _file_format_reader->get_next_block(block, read_rows, eof);
// To support iceberg schema evolution. We change the column name in block to
// make it match with the column name in parquet file before reading data. and
// Set the name back to table column name before return this block.
if (_has_schema_change) {
for (int i = 0; i < block->columns(); i++) {
ColumnWithTypeAndName& col = block->get_by_position(i);
auto iter = _table_col_to_file_col.find(col.name);
if (iter != _table_col_to_file_col.end()) {
col.name = iter->second;
}
}
block->initialize_index_by_name();
}
auto res = _file_format_reader->get_next_block(block, read_rows, eof);
// Set the name back to table column name before return this block.
if (_has_schema_change) {
for (int i = 0; i < block->columns(); i++) {
ColumnWithTypeAndName& col = block->get_by_position(i);
auto iter = _file_col_to_table_col.find(col.name);
if (iter != _file_col_to_table_col.end()) {
col.name = iter->second;
}
}
block->initialize_index_by_name();
}
return res;
}
Status IcebergTableReader::set_fill_columns(
@ -138,8 +182,8 @@ Status IcebergTableReader::_position_delete(
delete_reader.get_parsed_schema(&delete_file_col_names, &delete_file_col_types);
init_schema = true;
}
create_status =
delete_reader.init_reader(delete_file_col_names, nullptr, nullptr, false);
create_status = delete_reader.init_reader(delete_file_col_names, _not_in_file_col_names,
nullptr, nullptr, false);
if (!create_status.ok()) {
return nullptr;
}
@ -355,4 +399,97 @@ void IcebergTableReader::_sort_delete_rows(std::vector<std::vector<int64_t>*>& d
}
}
/*
* To support schema evolution, Iceberg write the column id to column name map to
* parquet file key_value_metadata.
* This function is to compare the table schema from FE (_col_id_name_map) with
* the schema in key_value_metadata for the current parquet file and generate two maps
* for future use:
* 1. table column name to parquet column name.
* 2. parquet column name to table column name.
* For example, parquet file has a column 'col1',
* after this file was written, iceberg changed the column name to 'col1_new'.
* The two maps would contain:
* 1. col1_new -> col1
* 2. col1 -> col1_new
*/
Status IcebergTableReader::_gen_col_name_maps(std::vector<tparquet::KeyValue> parquet_meta_kv) {
for (int i = 0; i < parquet_meta_kv.size(); ++i) {
tparquet::KeyValue kv = parquet_meta_kv[i];
if (kv.key == "iceberg.schema") {
std::string schema = kv.value;
rapidjson::Document json;
json.Parse(schema.c_str());
if (json.HasMember("fields")) {
rapidjson::Value& fields = json["fields"];
if (fields.IsArray()) {
for (int j = 0; j < fields.Size(); j++) {
rapidjson::Value& e = fields[j];
rapidjson::Value& id = e["id"];
rapidjson::Value& name = e["name"];
std::string name_string = name.GetString();
transform(name_string.begin(), name_string.end(), name_string.begin(),
::tolower);
auto iter = _col_id_name_map.find(id.GetInt());
if (iter != _col_id_name_map.end()) {
_table_col_to_file_col.emplace(iter->second, name_string);
_file_col_to_table_col.emplace(name_string, iter->second);
if (name_string != iter->second) {
_has_schema_change = true;
}
} else {
_has_schema_change = true;
}
}
}
}
break;
}
}
return Status::OK();
}
/*
* Generate _all_required_col_names and _not_in_file_col_names.
*
* _all_required_col_names is all the columns required by user sql.
* If the column name has been modified after the data file was written,
* put the old name in data file to _all_required_col_names.
*
* _not_in_file_col_names is all the columns required by user sql but not in the data file.
* e.g. New columns added after this data file was written.
* The columns added with names used by old dropped columns should consider as a missing column,
* which should be in _not_in_file_col_names.
*/
void IcebergTableReader::_gen_file_col_names() {
_all_required_col_names.clear();
_not_in_file_col_names.clear();
for (int i = 0; i < _file_col_names.size(); ++i) {
auto name = _file_col_names[i];
auto iter = _table_col_to_file_col.find(name);
if (iter == _table_col_to_file_col.end()) {
_all_required_col_names.emplace_back(name);
_not_in_file_col_names.emplace_back(name);
} else {
_all_required_col_names.emplace_back(iter->second);
}
}
}
/*
* Generate _new_colname_to_value_range, by replacing the column name in
* _colname_to_value_range with column name in data file.
*/
void IcebergTableReader::_gen_new_colname_to_value_range() {
for (auto it = _colname_to_value_range->begin(); it != _colname_to_value_range->end(); it++) {
auto iter = _table_col_to_file_col.find(it->first);
if (iter == _table_col_to_file_col.end()) {
_new_colname_to_value_range.emplace(it->first, it->second);
} else {
_new_colname_to_value_range.emplace(iter->second, it->second);
}
}
}
} // namespace doris::vectorized

View File

@ -56,6 +56,12 @@ public:
Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;
Status init_reader(
std::vector<std::string>& file_col_names,
std::unordered_map<int, std::string>& col_id_name_map,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
VExprContext* vconjunct_ctx);
enum { DATA, POSITION_DELETE, EQUALITY_DELETE };
private:
@ -81,6 +87,10 @@ private:
PositionDeleteRange _get_range(const ColumnString& file_path_column);
Status _gen_col_name_maps(std::vector<tparquet::KeyValue> parquet_meta_kv);
void _gen_file_col_names();
void _gen_new_colname_to_value_range();
RuntimeProfile* _profile;
RuntimeState* _state;
const TFileScanRangeParams& _params;
@ -88,8 +98,24 @@ private:
KVCache<std::string>& _kv_cache;
IcebergProfile _iceberg_profile;
std::vector<int64_t> _delete_rows;
// col names from _file_slot_descs
std::vector<std::string> _file_col_names;
// file column name to table column name map. For iceberg schema evolution.
std::unordered_map<std::string, std::string> _file_col_to_table_col;
// table column name to file column name map. For iceberg schema evolution.
std::unordered_map<std::string, std::string> _table_col_to_file_col;
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
// copy from _colname_to_value_range with new column name that is in parquet file, to support schema evolution.
std::unordered_map<std::string, ColumnValueRangeType> _new_colname_to_value_range;
// column id to name map. Collect from FE slot descriptor.
std::unordered_map<int, std::string> _col_id_name_map;
// col names in the parquet file
std::vector<std::string> _all_required_col_names;
// col names in table but not in parquet file
std::vector<std::string> _not_in_file_col_names;
IOContext* _io_ctx;
bool _has_schema_change = false;
};
} // namespace vectorized
} // namespace doris

View File

@ -491,20 +491,24 @@ Status VFileScanner::_get_next_reader() {
ParquetReader* parquet_reader = new ParquetReader(
_profile, _params, range, _state->query_options().batch_size,
const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx.get());
RETURN_IF_ERROR(parquet_reader->open());
if (!_is_load && _push_down_expr == nullptr && _vconjunct_ctx != nullptr) {
RETURN_IF_ERROR(_vconjunct_ctx->clone(_state, &_push_down_expr));
_discard_conjuncts();
}
init_status = parquet_reader->init_reader(_file_col_names, _colname_to_value_range,
_push_down_expr);
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
IcebergTableReader* iceberg_reader =
new IcebergTableReader((GenericReader*)parquet_reader, _profile, _state,
_params, range, _kv_cache, _io_ctx.get());
iceberg_reader->init_reader(_file_col_names, _col_id_name_map,
_colname_to_value_range, _push_down_expr);
RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
_cur_reader.reset((GenericReader*)iceberg_reader);
} else {
std::vector<std::string> place_holder;
init_status = parquet_reader->init_reader(_file_col_names, place_holder,
_colname_to_value_range, _push_down_expr);
_cur_reader.reset((GenericReader*)parquet_reader);
}
break;
@ -646,10 +650,10 @@ Status VFileScanner::_init_expr_ctxes() {
}
if (slot_info.is_file_slot) {
_file_slot_descs.emplace_back(it->second);
auto iti = full_src_index_map.find(slot_id);
_file_slot_index_map.emplace(slot_id, iti->second);
_file_slot_name_map.emplace(it->second->col_name(), iti->second);
_file_col_names.push_back(it->second->col_name());
if (it->second->col_unique_id() > 0) {
_col_id_name_map.emplace(it->second->col_unique_id(), it->second->col_name());
}
} else {
_partition_slot_descs.emplace_back(it->second);
if (_is_load) {

View File

@ -68,12 +68,10 @@ protected:
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
// File source slot descriptors
std::vector<SlotDescriptor*> _file_slot_descs;
// File slot id to index in _file_slot_descs
std::unordered_map<SlotId, int> _file_slot_index_map;
// file col name to index in _file_slot_descs
std::map<std::string, int> _file_slot_name_map;
// col names from _file_slot_descs
std::vector<std::string> _file_col_names;
// column id to name map. Collect from FE slot descriptor.
std::unordered_map<int, std::string> _col_id_name_map;
// Partition source slot descriptors
std::vector<SlotDescriptor*> _partition_slot_descs;

View File

@ -98,6 +98,7 @@ TEST_F(ParquetReaderTest, normal) {
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
std::vector<std::string> column_names;
std::vector<std::string> missing_column_names;
for (int i = 0; i < slot_descs.size(); i++) {
column_names.push_back(slot_descs[i]->col_name());
}
@ -114,7 +115,8 @@ TEST_F(ParquetReaderTest, normal) {
runtime_state.init_mem_trackers();
std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range;
p_reader->init_reader(column_names, nullptr, nullptr);
p_reader->open();
p_reader->init_reader(column_names, missing_column_names, nullptr, nullptr);
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
partition_columns;
std::unordered_map<std::string, VExprContext*> missing_columns;

View File

@ -32,6 +32,7 @@ import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.backup.BlobStorage;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
@ -43,8 +44,10 @@ import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@ -64,6 +67,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
@ -74,6 +78,7 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@ -813,6 +818,28 @@ public class HiveMetaStoreClientHelper {
}
return output.toString();
}
public static org.apache.iceberg.Table getIcebergTable(HMSExternalTable table) {
String metastoreUri = table.getMetastoreUri();
org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
Configuration conf = getConfiguration(table);
hiveCatalog.setConf(conf);
// initialize hive catalog
Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, metastoreUri);
catalogProperties.put("uri", metastoreUri);
hiveCatalog.initialize("hive", catalogProperties);
return hiveCatalog.loadTable(TableIdentifier.of(table.getDbName(), table.getName()));
}
public static Configuration getConfiguration(HMSExternalTable table) {
Configuration conf = new HdfsConfiguration();
for (Map.Entry<String, String> entry : table.getHadoopProperties().entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
return conf;
}
}

View File

@ -25,6 +25,7 @@ import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
@ -37,12 +38,15 @@ import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* External catalog for hive metastore compatible data sources.
@ -173,6 +177,16 @@ public class HMSExternalCatalog extends ExternalCatalog {
public List<Column> getSchema(String dbName, String tblName) {
makeSureInitialized();
List<FieldSchema> schema = getClient().getSchema(dbName, tblName);
Optional<ExternalDatabase> db = getDb(dbName);
if (db.isPresent()) {
Optional table = db.get().getTable(tblName);
if (table.isPresent()) {
HMSExternalTable hmsTable = (HMSExternalTable) table.get();
if (hmsTable.getDlaType().equals(HMSExternalTable.DLAType.ICEBERG)) {
return getIcebergSchema(hmsTable, schema);
}
}
}
List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
for (FieldSchema field : schema) {
tmpSchema.add(new Column(field.getName(),
@ -182,6 +196,19 @@ public class HMSExternalCatalog extends ExternalCatalog {
return tmpSchema;
}
private List<Column> getIcebergSchema(HMSExternalTable table, List<FieldSchema> hmsSchema) {
Table icebergTable = HiveMetaStoreClientHelper.getIcebergTable(table);
Schema schema = icebergTable.schema();
List<Column> tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size());
for (FieldSchema field : hmsSchema) {
tmpSchema.add(new Column(field.getName(),
HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
true, null, field.getComment(), true, null,
schema.caseInsensitiveFindField(field.getName()).fieldId()));
}
return tmpSchema;
}
public void setLastSyncedEventId(long lastSyncedEventId) {
this.lastSyncedEventId = lastSyncedEventId;
}

View File

@ -52,8 +52,6 @@ import org.apache.doris.thrift.TFileType;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.mapred.FileSplit;
@ -211,14 +209,6 @@ public class HiveScanProvider extends HMSTableScanProvider {
allFiles.addAll(files);
}
protected Configuration getConfiguration() {
Configuration conf = new HdfsConfiguration();
for (Map.Entry<String, String> entry : hmsTable.getHadoopProperties().entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
return conf;
}
public int getTotalPartitionNum() {
return totalPartitionNum;
}

View File

@ -21,7 +21,7 @@ import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
@ -35,7 +35,6 @@ import org.apache.doris.thrift.TIcebergDeleteFileDesc;
import org.apache.doris.thrift.TIcebergFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.iceberg.BaseTable;
@ -46,7 +45,6 @@ import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.types.Conversions;
@ -55,7 +53,6 @@ import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -139,7 +136,7 @@ public class IcebergScanProvider extends HiveScanProvider {
}
}
org.apache.iceberg.Table table = getIcebergTable();
org.apache.iceberg.Table table = HiveMetaStoreClientHelper.getIcebergTable(hmsTable);
TableScan scan = table.newScan();
TableSnapshot tableSnapshot = desc.getRef().getTableSnapshot();
if (tableSnapshot != null) {
@ -221,19 +218,6 @@ public class IcebergScanProvider extends HiveScanProvider {
return filters;
}
private org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException {
org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
Configuration conf = getConfiguration();
hiveCatalog.setConf(conf);
// initialize hive catalog
Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, getMetaStoreUrl());
catalogProperties.put("uri", getMetaStoreUrl());
hiveCatalog.initialize("hive", catalogProperties);
return hiveCatalog.loadTable(TableIdentifier.of(hmsTable.getDbName(), hmsTable.getName()));
}
@Override
public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
return Collections.emptyList();

View File

@ -0,0 +1,79 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !rename1 --
1 orig2_1 orig3_1
2 orig2_2 orig3_2
3 orig2_3 orig3_3
4 orig2_4 rename3_1
5 orig2_5 rename3_2
6 orig2_6 rename3_3
-- !rename2 --
3 orig2_3 orig3_3
4 orig2_4 rename3_1
-- !drop1 --
1 orig3_1
2 orig3_2
3 orig3_3
4 orig3_4
5 orig3_5
6 orig3_6
-- !drop2 --
1 orig3_1
2 orig3_2
3 orig3_3
-- !drop3 --
4 orig3_4
5 orig3_5
6 orig3_6
-- !add1 --
1 orig2_1 orig3_1 \N
2 orig2_2 orig3_2 \N
3 orig2_3 orig3_3 \N
4 orig2_4 orig3_4 add1_1
5 orig2_5 orig3_5 add1_2
6 orig2_6 orig3_6 add1_3
-- !add2 --
2 orig2_2 orig3_2 \N
-- !add3 --
5 orig2_5 orig3_5 add1_2
-- !reorder1 --
1 orig3_1 orig2_1
2 orig3_2 orig2_2
3 orig3_3 orig2_3
4 orig3_4 orig2_4
5 orig3_5 orig2_5
6 orig3_6 orig2_6
-- !reorder2 --
2 orig3_2 orig2_2
-- !reorder3 --
5 orig3_5 orig2_5
-- !readd1 --
1 orig2_1 \N
2 orig2_2 \N
3 orig2_3 \N
4 orig2_4 orig3_4
5 orig2_5 orig3_5
6 orig2_6 orig3_6
-- !readd2 --
1 orig2_1 \N
2 orig2_2 \N
3 orig2_3 \N
4 orig2_4 orig3_4
-- !readd3 --
3 orig2_3 \N
4 orig2_4 orig3_4
5 orig2_5 orig3_5
6 orig2_6 orig3_6

View File

@ -0,0 +1,67 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("iceberg_schema_evolution", "p0") {
def rename1 = """select * from rename_test order by rename_1;"""
def rename2 = """select * from rename_test where rename_1 in (3, 4) order by rename_1;"""
def drop1 = """select * from drop_test order by orig1;"""
def drop2 = """select * from drop_test where orig1<=3 order by orig1;"""
def drop3 = """select * from drop_test where orig1>3 order by orig1;"""
def add1 = """select * from add_test order by orig1;"""
def add2 = """select * from add_test where orig1 = 2;"""
def add3 = """select * from add_test where orig1 = 5;"""
def reorder1 = """select * from reorder_test order by orig1;"""
def reorder2 = """select * from reorder_test where orig1 = 2;"""
def reorder3 = """select * from reorder_test where orig1 = 5;"""
def readd1 = """select * from readd_test order by orig1;"""
def readd2 = """select * from readd_test where orig1<5 order by orig1;"""
def readd3 = """select * from readd_test where orig1>2 order by orig1;"""
String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost")
String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort")
String catalog_name = "iceberg_schema_evolution"
sql """drop catalog if exists ${catalog_name};"""
sql """
create catalog if not exists ${catalog_name} properties (
'type'='hms',
'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
);
"""
logger.info("catalog " + catalog_name + " created")
sql """switch ${catalog_name};"""
logger.info("switched to catalog " + catalog_name)
sql """use iceberg_schema_evolution;"""
qt_rename1 rename1
qt_rename2 rename2
qt_drop1 drop1
qt_drop2 drop2
qt_drop3 drop3
qt_add1 add1
qt_add2 add2
qt_add3 add3
qt_reorder1 reorder1
qt_reorder2 reorder2
qt_reorder3 reorder3
qt_readd1 readd1
qt_readd2 readd2
qt_readd3 readd3
}
}