[deprecated](external) remove deprecated hudi and iceberg external table (#27456)
The creation of hudi and iceberg table is disallowed since v1.2. All these features are covered by hudi/iceberg catalog. We should remove the code in v2.1 The PR mainly changes: 1. remove the code of hudi/iceberg external table. 2. remove code of iceberg database. 3. disallowed hive external table's creation. 4. disabled odbc,mysql,broker external table by default, and add FE config `disable_odbc_mysql_broker_table` to control it
This commit is contained in:
@ -94,9 +94,9 @@ public class CreateResourceStmt extends DdlStmt {
|
||||
if (resourceType == ResourceType.SPARK && !isExternal) {
|
||||
throw new AnalysisException("Spark is external resource");
|
||||
}
|
||||
if (resourceType == ResourceType.ODBC_CATALOG && !Config.enable_odbc_table) {
|
||||
if (resourceType == ResourceType.ODBC_CATALOG && !Config.enable_odbc_mysql_broker_table) {
|
||||
throw new AnalysisException("ODBC table is deprecated, use JDBC instead. Or you can set "
|
||||
+ "`enable_odbc_table=true` in fe.conf to enable ODBC again.");
|
||||
+ "`enable_odbc_table=true` in fe.conf to enable ODBC again.");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -91,14 +91,11 @@ public class CreateTableStmt extends DdlStmt {
|
||||
static {
|
||||
engineNames = Sets.newHashSet();
|
||||
engineNames.add("olap");
|
||||
engineNames.add("jdbc");
|
||||
engineNames.add("elasticsearch");
|
||||
engineNames.add("odbc");
|
||||
engineNames.add("mysql");
|
||||
engineNames.add("broker");
|
||||
engineNames.add("elasticsearch");
|
||||
engineNames.add("hive");
|
||||
engineNames.add("iceberg");
|
||||
engineNames.add("hudi");
|
||||
engineNames.add("jdbc");
|
||||
}
|
||||
|
||||
// if auto bucket auto bucket enable, rewrite distribution bucket num &&
|
||||
@ -200,22 +197,6 @@ public class CreateTableStmt extends DdlStmt {
|
||||
this.rollupAlterClauseList = (rollupAlterClauseList == null) ? Lists.newArrayList() : rollupAlterClauseList;
|
||||
}
|
||||
|
||||
// This is for iceberg/hudi table, which has no column schema
|
||||
public CreateTableStmt(boolean ifNotExists,
|
||||
boolean isExternal,
|
||||
TableName tableName,
|
||||
String engineName,
|
||||
Map<String, String> properties,
|
||||
String comment) {
|
||||
this.ifNotExists = ifNotExists;
|
||||
this.isExternal = isExternal;
|
||||
this.tableName = tableName;
|
||||
this.engineName = engineName;
|
||||
this.properties = properties;
|
||||
this.columnDefs = Lists.newArrayList();
|
||||
this.comment = Strings.nullToEmpty(comment);
|
||||
}
|
||||
|
||||
// for Nereids
|
||||
public CreateTableStmt(boolean ifNotExists,
|
||||
boolean isExternal,
|
||||
@ -481,7 +462,7 @@ public class CreateTableStmt extends DdlStmt {
|
||||
}
|
||||
|
||||
// analyze column def
|
||||
if (!(engineName.equals("iceberg") || engineName.equals("hudi") || engineName.equals("elasticsearch"))
|
||||
if (!(engineName.equals("elasticsearch"))
|
||||
&& (columnDefs == null || columnDefs.isEmpty())) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLE_MUST_HAVE_COLUMNS);
|
||||
}
|
||||
@ -662,11 +643,7 @@ public class CreateTableStmt extends DdlStmt {
|
||||
|
||||
if (engineName.equals("mysql") || engineName.equals("odbc") || engineName.equals("broker")
|
||||
|| engineName.equals("elasticsearch") || engineName.equals("hive")
|
||||
|| engineName.equals("iceberg") || engineName.equals("hudi") || engineName.equals("jdbc")) {
|
||||
if (engineName.equals("odbc") && !Config.enable_odbc_table) {
|
||||
throw new AnalysisException("ODBC table is deprecated, use JDBC instead. Or you can set "
|
||||
+ "`enable_odbc_table=true` in fe.conf to enable ODBC again.");
|
||||
}
|
||||
|| engineName.equals("jdbc")) {
|
||||
if (!isExternal) {
|
||||
// this is for compatibility
|
||||
isExternal = true;
|
||||
@ -679,10 +656,13 @@ public class CreateTableStmt extends DdlStmt {
|
||||
}
|
||||
}
|
||||
|
||||
if (Config.disable_iceberg_hudi_table && (engineName.equals("iceberg") || engineName.equals("hudi"))) {
|
||||
if (!Config.enable_odbc_mysql_broker_table && (engineName.equals("odbc")
|
||||
|| engineName.equals("mysql") || engineName.equals("broker"))) {
|
||||
throw new AnalysisException(
|
||||
"iceberg and hudi table is no longer supported. Use multi catalog feature instead."
|
||||
+ ". Or you can temporarily set 'disable_iceberg_hudi_table=false'"
|
||||
"odbc, mysql and broker table is no longer supported."
|
||||
+ " For odbc and mysql external table, use jdbc table or jdbc catalog instead."
|
||||
+ " For broker table, use table valued function instead."
|
||||
+ ". Or you can temporarily set 'disable_odbc_mysql_broker_table=false'"
|
||||
+ " in fe.conf to reopen this feature.");
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,23 +17,16 @@
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.external.iceberg.IcebergCatalog;
|
||||
import org.apache.doris.external.iceberg.IcebergCatalogMgr;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
@ -44,16 +37,10 @@ import java.util.Map;
|
||||
* such as `checkAndBuildIcebergProperty` to check and build it.
|
||||
*/
|
||||
public class DatabaseProperty implements Writable {
|
||||
private static final Logger LOG = LogManager.getLogger(DatabaseProperty.class);
|
||||
|
||||
public static final String ICEBERG_PROPERTY_PREFIX = "iceberg";
|
||||
|
||||
@SerializedName(value = "properties")
|
||||
private Map<String, String> properties = Maps.newHashMap();
|
||||
|
||||
// the following variables are built from "properties"
|
||||
private IcebergProperty icebergProperty = new IcebergProperty(Maps.newHashMap());
|
||||
|
||||
public DatabaseProperty() {
|
||||
|
||||
}
|
||||
@ -78,30 +65,6 @@ public class DatabaseProperty implements Writable {
|
||||
return properties;
|
||||
}
|
||||
|
||||
public IcebergProperty getIcebergProperty() {
|
||||
return icebergProperty;
|
||||
}
|
||||
|
||||
public DatabaseProperty checkAndBuildProperties() throws DdlException {
|
||||
Map<String, String> icebergProperties = new HashMap<>();
|
||||
for (Map.Entry<String, String> entry : this.properties.entrySet()) {
|
||||
if (entry.getKey().startsWith(ICEBERG_PROPERTY_PREFIX)) {
|
||||
if (Config.disable_iceberg_hudi_table) {
|
||||
throw new DdlException(
|
||||
"database for iceberg is no longer supported. Use multi catalog feature instead."
|
||||
+ ". Or you can temporarily set 'disable_iceberg_hudi_table=false'"
|
||||
+ " in fe.conf to reopen this feature.");
|
||||
} else {
|
||||
icebergProperties.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (icebergProperties.size() > 0) {
|
||||
checkAndBuildIcebergProperty(icebergProperties);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public BinlogConfig getBinlogConfig() {
|
||||
BinlogConfig binlogConfig = new BinlogConfig();
|
||||
binlogConfig.mergeFromProperties(properties);
|
||||
@ -112,17 +75,6 @@ public class DatabaseProperty implements Writable {
|
||||
properties.putAll(newProperties);
|
||||
}
|
||||
|
||||
private void checkAndBuildIcebergProperty(Map<String, String> properties) throws DdlException {
|
||||
IcebergCatalogMgr.validateProperties(properties, false);
|
||||
icebergProperty = new IcebergProperty(properties);
|
||||
String icebergDb = icebergProperty.getDatabase();
|
||||
IcebergCatalog icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty);
|
||||
// check database exists
|
||||
if (!icebergCatalog.databaseExists(icebergDb)) {
|
||||
throw new DdlException("Database [" + icebergDb + "] dose not exist in Iceberg.");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Text.writeString(out, GsonUtils.GSON.toJson(this));
|
||||
|
||||
@ -139,7 +139,6 @@ import org.apache.doris.deploy.impl.AmbariDeployManager;
|
||||
import org.apache.doris.deploy.impl.K8sDeployManager;
|
||||
import org.apache.doris.deploy.impl.LocalFileDeployManager;
|
||||
import org.apache.doris.external.elasticsearch.EsRepository;
|
||||
import org.apache.doris.external.iceberg.IcebergTableCreationRecordMgr;
|
||||
import org.apache.doris.ha.BDBHA;
|
||||
import org.apache.doris.ha.FrontendNodeType;
|
||||
import org.apache.doris.ha.HAProtocol;
|
||||
@ -1570,7 +1569,6 @@ public class Env {
|
||||
}
|
||||
streamLoadRecordMgr.start();
|
||||
tabletLoadIndexRecorderMgr.start();
|
||||
getInternalCatalog().getIcebergTableCreationRecordMgr().start();
|
||||
new InternalSchemaInitializer().start();
|
||||
if (Config.enable_hms_events_incremental_sync) {
|
||||
metastoreEventsProcessor.start();
|
||||
@ -3397,18 +3395,6 @@ public class Env {
|
||||
sb.append("\"table\" = \"").append(hiveTable.getHiveTable()).append("\",\n");
|
||||
sb.append(new PrintableMap<>(hiveTable.getHiveProperties(), " = ", true, true, hidePassword).toString());
|
||||
sb.append("\n)");
|
||||
} else if (table.getType() == TableType.ICEBERG) {
|
||||
IcebergTable icebergTable = (IcebergTable) table;
|
||||
|
||||
addTableComment(icebergTable, sb);
|
||||
|
||||
// properties
|
||||
sb.append("\nPROPERTIES (\n");
|
||||
sb.append("\"iceberg.database\" = \"").append(icebergTable.getIcebergDb()).append("\",\n");
|
||||
sb.append("\"iceberg.table\" = \"").append(icebergTable.getIcebergTbl()).append("\",\n");
|
||||
sb.append(new PrintableMap<>(icebergTable.getIcebergProperties(),
|
||||
" = ", true, true, hidePassword).toString());
|
||||
sb.append("\n)");
|
||||
} else if (table.getType() == TableType.JDBC) {
|
||||
JdbcTable jdbcTable = (JdbcTable) table;
|
||||
addTableComment(jdbcTable, sb);
|
||||
@ -3795,10 +3781,6 @@ public class Env {
|
||||
return tabletLoadIndexRecorderMgr;
|
||||
}
|
||||
|
||||
public IcebergTableCreationRecordMgr getIcebergTableCreationRecordMgr() {
|
||||
return getInternalCatalog().getIcebergTableCreationRecordMgr();
|
||||
}
|
||||
|
||||
public MasterTaskExecutor getPendingLoadTaskScheduler() {
|
||||
return pendingLoadTaskScheduler;
|
||||
}
|
||||
|
||||
@ -29,30 +29,20 @@ import org.apache.doris.analysis.IntLiteral;
|
||||
import org.apache.doris.analysis.LiteralExpr;
|
||||
import org.apache.doris.analysis.NullLiteral;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.StorageBackend;
|
||||
import org.apache.doris.analysis.StringLiteral;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.datasource.property.constants.HMSProperties;
|
||||
import org.apache.doris.fs.FileSystemFactory;
|
||||
import org.apache.doris.fs.RemoteFiles;
|
||||
import org.apache.doris.fs.remote.RemoteFile;
|
||||
import org.apache.doris.fs.remote.RemoteFileSystem;
|
||||
import org.apache.doris.thrift.TBrokerFileStatus;
|
||||
import org.apache.doris.thrift.TExprOpcode;
|
||||
|
||||
import com.aliyun.datalake.metastore.common.DataLakeConfig;
|
||||
import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Queues;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hive.conf.HiveConf;
|
||||
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
|
||||
@ -60,11 +50,9 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
|
||||
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
import org.apache.hadoop.hive.metastore.api.MetaException;
|
||||
import org.apache.hadoop.hive.metastore.api.Partition;
|
||||
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
|
||||
import org.apache.hadoop.hive.metastore.api.Table;
|
||||
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
|
||||
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
|
||||
import org.apache.hadoop.hive.ql.parse.SemanticException;
|
||||
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
|
||||
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
|
||||
@ -94,7 +82,6 @@ import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
@ -175,123 +162,6 @@ public class HiveMetaStoreClientHelper {
|
||||
return metaStoreClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get data files of partitions in hive table, filter by partition predicate.
|
||||
*
|
||||
* @param hiveTable
|
||||
* @param hivePartitionPredicate
|
||||
* @param fileStatuses
|
||||
* @param remoteHiveTbl
|
||||
* @return
|
||||
* @throws DdlException
|
||||
*/
|
||||
public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDesc hivePartitionPredicate,
|
||||
List<TBrokerFileStatus> fileStatuses, Table remoteHiveTbl, StorageBackend.StorageType type)
|
||||
throws DdlException {
|
||||
RemoteFileSystem fs = FileSystemFactory.get("HiveMetaStore", type, hiveTable.getHiveProperties());
|
||||
List<RemoteFiles> remoteLocationsList = new ArrayList<>();
|
||||
try {
|
||||
if (remoteHiveTbl.getPartitionKeys().size() > 0) {
|
||||
String metaStoreUris = hiveTable.getHiveProperties().get(HMSProperties.HIVE_METASTORE_URIS);
|
||||
// hive partitioned table, get file iterator from table partition sd info
|
||||
List<Partition> hivePartitions = getHivePartitions(metaStoreUris, remoteHiveTbl,
|
||||
hivePartitionPredicate);
|
||||
for (Partition p : hivePartitions) {
|
||||
String location = normalizeS3LikeSchema(p.getSd().getLocation());
|
||||
remoteLocationsList.add(fs.listLocatedFiles(location));
|
||||
}
|
||||
} else {
|
||||
// hive non-partitioned table, get file iterator from table sd info
|
||||
String location = normalizeS3LikeSchema(remoteHiveTbl.getSd().getLocation());
|
||||
remoteLocationsList.add(fs.listLocatedFiles(location));
|
||||
}
|
||||
return getAllFileStatus(fileStatuses, remoteLocationsList, fs);
|
||||
} catch (UserException e) {
|
||||
throw new DdlException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public static String normalizeS3LikeSchema(String location) {
|
||||
String[] objectStorages = Config.s3_compatible_object_storages.split(",");
|
||||
for (String objectStorage : objectStorages) {
|
||||
if (location.startsWith(objectStorage + "://")) {
|
||||
location = location.replaceFirst(objectStorage, "s3");
|
||||
break;
|
||||
}
|
||||
}
|
||||
return location;
|
||||
}
|
||||
|
||||
private static String getAllFileStatus(List<TBrokerFileStatus> fileStatuses,
|
||||
List<RemoteFiles> remoteLocationsList, RemoteFileSystem fs)
|
||||
throws UserException {
|
||||
String hdfsUrl = "";
|
||||
Queue<RemoteFiles> queue = Queues.newArrayDeque(remoteLocationsList);
|
||||
while (queue.peek() != null) {
|
||||
RemoteFiles locs = queue.poll();
|
||||
try {
|
||||
for (RemoteFile fileLocation : locs.files()) {
|
||||
Path filePath = fileLocation.getPath();
|
||||
// hdfs://host:port/path/to/partition/file_name
|
||||
String fullUri = filePath.toString();
|
||||
if (fileLocation.isDirectory()) {
|
||||
// recursive visit the directory to get the file path.
|
||||
queue.add(fs.listLocatedFiles(fullUri));
|
||||
continue;
|
||||
}
|
||||
TBrokerFileStatus brokerFileStatus = new TBrokerFileStatus();
|
||||
brokerFileStatus.setIsDir(fileLocation.isDirectory());
|
||||
brokerFileStatus.setIsSplitable(true);
|
||||
brokerFileStatus.setSize(fileLocation.getSize());
|
||||
brokerFileStatus.setModificationTime(fileLocation.getModificationTime());
|
||||
// filePath.toUri().getPath() = "/path/to/partition/file_name"
|
||||
// eg: /home/work/dev/hive/apache-hive-2.3.7-bin/data/warehouse
|
||||
// + /dae.db/customer/state=CA/city=SanJose/000000_0
|
||||
// fullUri: Backend need full s3 path (with s3://bucket at the beginning) to read the data on s3.
|
||||
// path = "s3://bucket/path/to/partition/file_name"
|
||||
// eg: s3://hive-s3-test/region/region.tbl
|
||||
String path = fs.needFullPath() ? fullUri : filePath.toUri().getPath();
|
||||
brokerFileStatus.setPath(path);
|
||||
fileStatuses.add(brokerFileStatus);
|
||||
if (StringUtils.isEmpty(hdfsUrl)) {
|
||||
// hdfs://host:port
|
||||
hdfsUrl = fullUri.replace(path, "");
|
||||
}
|
||||
}
|
||||
} catch (UserException e) {
|
||||
LOG.warn("List HDFS file IOException: {}", e.getMessage());
|
||||
throw new DdlException("List HDFS file failed. Error: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
return hdfsUrl;
|
||||
}
|
||||
|
||||
/**
|
||||
* list partitions from hiveMetaStore.
|
||||
*
|
||||
* @param metaStoreUris hiveMetaStore uris
|
||||
* @param remoteHiveTbl Hive table
|
||||
* @param hivePartitionPredicate filter when list partitions
|
||||
* @return a list of hive partitions
|
||||
* @throws DdlException when connect hiveMetaStore failed.
|
||||
*/
|
||||
public static List<Partition> getHivePartitions(String metaStoreUris, Table remoteHiveTbl,
|
||||
ExprNodeGenericFuncDesc hivePartitionPredicate) throws DdlException {
|
||||
List<Partition> hivePartitions = new ArrayList<>();
|
||||
IMetaStoreClient client = getClient(metaStoreUris);
|
||||
try {
|
||||
client.listPartitionsByExpr(remoteHiveTbl.getDbName(), remoteHiveTbl.getTableName(),
|
||||
SerializationUtilities.serializeExpressionToKryo(hivePartitionPredicate),
|
||||
null, (short) -1, hivePartitions);
|
||||
} catch (TException e) {
|
||||
LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
|
||||
throw new DdlException("Connect hive metastore failed: " + e.getMessage());
|
||||
} finally {
|
||||
client.close();
|
||||
}
|
||||
return hivePartitions;
|
||||
}
|
||||
|
||||
public static Table getTable(HiveTable hiveTable) throws DdlException {
|
||||
IMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HMSProperties.HIVE_METASTORE_URIS));
|
||||
Table table;
|
||||
|
||||
@ -1,266 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.StorageBackend;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.external.iceberg.IcebergCatalog;
|
||||
import org.apache.doris.external.iceberg.IcebergCatalogMgr;
|
||||
import org.apache.doris.thrift.TBrokerFileStatus;
|
||||
import org.apache.doris.thrift.TIcebergTable;
|
||||
import org.apache.doris.thrift.TTableDescriptor;
|
||||
import org.apache.doris.thrift.TTableType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.iceberg.FileScanTask;
|
||||
import org.apache.iceberg.Schema;
|
||||
import org.apache.iceberg.TableProperties;
|
||||
import org.apache.iceberg.TableScan;
|
||||
import org.apache.iceberg.catalog.TableIdentifier;
|
||||
import org.apache.iceberg.expressions.Expression;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* External Iceberg table
|
||||
*/
|
||||
public class IcebergTable extends Table {
|
||||
private static final Logger LOG = LogManager.getLogger(IcebergTable.class);
|
||||
|
||||
// remote Iceberg database name
|
||||
private String icebergDb;
|
||||
// remote Iceberg table name
|
||||
private String icebergTbl;
|
||||
// remote Iceberg table location
|
||||
private String location;
|
||||
// Iceberg table file format
|
||||
private String fileFormat;
|
||||
// Iceberg storage type
|
||||
private StorageBackend.StorageType storageType;
|
||||
// Iceberg remote host uri
|
||||
private String hostUri;
|
||||
// location analyze flag
|
||||
private boolean isAnalyzed = false;
|
||||
private Map<String, String> icebergProperties = Maps.newHashMap();
|
||||
|
||||
private org.apache.iceberg.Table icebergTable;
|
||||
|
||||
private final byte[] loadLock = new byte[0];
|
||||
private final AtomicBoolean isLoaded = new AtomicBoolean(false);
|
||||
|
||||
public IcebergTable() {
|
||||
super(TableType.ICEBERG);
|
||||
}
|
||||
|
||||
public IcebergTable(long id, String tableName, List<Column> fullSchema, IcebergProperty icebergProperty,
|
||||
org.apache.iceberg.Table icebergTable) {
|
||||
super(id, tableName, TableType.ICEBERG, fullSchema);
|
||||
this.icebergDb = icebergProperty.getDatabase();
|
||||
this.icebergTbl = icebergProperty.getTable();
|
||||
|
||||
icebergProperties.put(IcebergProperty.ICEBERG_HIVE_METASTORE_URIS, icebergProperty.getHiveMetastoreUris());
|
||||
icebergProperties.put(IcebergProperty.ICEBERG_CATALOG_TYPE, icebergProperty.getCatalogType());
|
||||
icebergProperties.putAll(icebergProperty.getDfsProperties());
|
||||
this.icebergTable = icebergTable;
|
||||
}
|
||||
|
||||
public String getIcebergDbTable() {
|
||||
return String.format("%s.%s", icebergDb, icebergTbl);
|
||||
}
|
||||
|
||||
public String getIcebergDb() {
|
||||
return icebergDb;
|
||||
}
|
||||
|
||||
public String getIcebergTbl() {
|
||||
return icebergTbl;
|
||||
}
|
||||
|
||||
public Map<String, String> getIcebergProperties() {
|
||||
return icebergProperties;
|
||||
}
|
||||
|
||||
private void getLocation() throws UserException {
|
||||
if (Strings.isNullOrEmpty(location)) {
|
||||
try {
|
||||
getTable();
|
||||
} catch (Exception e) {
|
||||
throw new UserException("Failed to get table: " + name + ",error: " + e.getMessage());
|
||||
}
|
||||
location = icebergTable.location();
|
||||
}
|
||||
analyzeLocation();
|
||||
}
|
||||
|
||||
private void analyzeLocation() throws UserException {
|
||||
if (isAnalyzed) {
|
||||
return;
|
||||
}
|
||||
String[] strings = StringUtils.split(location, "/");
|
||||
|
||||
// analyze storage type
|
||||
String storagePrefix = strings[0].split(":")[0];
|
||||
if (Util.isS3CompatibleStorageSchema(storagePrefix)) {
|
||||
this.storageType = StorageBackend.StorageType.S3;
|
||||
} else if (storagePrefix.equalsIgnoreCase("hdfs")) {
|
||||
this.storageType = StorageBackend.StorageType.HDFS;
|
||||
} else {
|
||||
throw new UserException("Not supported storage type: " + storagePrefix);
|
||||
}
|
||||
|
||||
// analyze host uri
|
||||
// eg: hdfs://host:port
|
||||
// s3://host:port
|
||||
String host = strings[1];
|
||||
this.hostUri = storagePrefix + "://" + host;
|
||||
this.isAnalyzed = true;
|
||||
}
|
||||
|
||||
public String getHostUri() throws UserException {
|
||||
if (!isAnalyzed) {
|
||||
getLocation();
|
||||
}
|
||||
return hostUri;
|
||||
}
|
||||
|
||||
public StorageBackend.StorageType getStorageType() throws UserException {
|
||||
if (!isAnalyzed) {
|
||||
getLocation();
|
||||
}
|
||||
return storageType;
|
||||
}
|
||||
|
||||
public String getFileFormat() throws UserException {
|
||||
if (Strings.isNullOrEmpty(fileFormat)) {
|
||||
try {
|
||||
getTable();
|
||||
} catch (Exception e) {
|
||||
throw new UserException("Failed to get table: " + name + ",error: " + e.getMessage());
|
||||
}
|
||||
fileFormat = icebergTable.properties().get(TableProperties.DEFAULT_FILE_FORMAT);
|
||||
}
|
||||
return fileFormat;
|
||||
}
|
||||
|
||||
public Schema getIcebergSchema() {
|
||||
return icebergTable.schema();
|
||||
}
|
||||
|
||||
private org.apache.iceberg.Table getTable() throws Exception {
|
||||
if (isLoaded.get()) {
|
||||
Preconditions.checkNotNull(icebergTable);
|
||||
return icebergTable;
|
||||
}
|
||||
synchronized (loadLock) {
|
||||
if (icebergTable != null) {
|
||||
return icebergTable;
|
||||
}
|
||||
|
||||
IcebergProperty icebergProperty = getIcebergProperty();
|
||||
IcebergCatalog icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty);
|
||||
try {
|
||||
this.icebergTable = icebergCatalog.loadTable(TableIdentifier.of(icebergDb, icebergTbl));
|
||||
LOG.info("finished to load iceberg table: {}", name);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("failed to load iceberg table {} from {}", name, icebergProperty.getHiveMetastoreUris(), e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
isLoaded.set(true);
|
||||
return icebergTable;
|
||||
}
|
||||
}
|
||||
|
||||
private IcebergProperty getIcebergProperty() {
|
||||
Map<String, String> properties = Maps.newHashMap(icebergProperties);
|
||||
properties.put(IcebergProperty.ICEBERG_DATABASE, icebergDb);
|
||||
properties.put(IcebergProperty.ICEBERG_TABLE, icebergTbl);
|
||||
return new IcebergProperty(properties);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get iceberg data file by file system table location and iceberg predicates
|
||||
* @throws Exception
|
||||
*/
|
||||
public List<TBrokerFileStatus> getIcebergDataFiles(List<Expression> predicates) throws Exception {
|
||||
org.apache.iceberg.Table table = getTable();
|
||||
TableScan scan = table.newScan();
|
||||
for (Expression predicate : predicates) {
|
||||
scan = scan.filter(predicate);
|
||||
}
|
||||
List<TBrokerFileStatus> relatedFiles = Lists.newArrayList();
|
||||
for (FileScanTask task : scan.planFiles()) {
|
||||
Path path = Paths.get(task.file().path().toString());
|
||||
String relativePath = "/" + path.subpath(2, path.getNameCount());
|
||||
relatedFiles.add(new TBrokerFileStatus(relativePath, false, task.file().fileSizeInBytes(), false));
|
||||
}
|
||||
return relatedFiles;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
super.write(out);
|
||||
|
||||
Text.writeString(out, icebergDb);
|
||||
Text.writeString(out, icebergTbl);
|
||||
|
||||
out.writeInt(icebergProperties.size());
|
||||
for (Map.Entry<String, String> entry : icebergProperties.entrySet()) {
|
||||
Text.writeString(out, entry.getKey());
|
||||
Text.writeString(out, entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
super.readFields(in);
|
||||
|
||||
icebergDb = Text.readString(in);
|
||||
icebergTbl = Text.readString(in);
|
||||
int size = in.readInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
String key = Text.readString(in);
|
||||
String value = Text.readString(in);
|
||||
icebergProperties.put(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TTableDescriptor toThrift() {
|
||||
TIcebergTable tIcebergTable = new TIcebergTable(getIcebergDb(), getIcebergTbl(), getIcebergProperties());
|
||||
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ICEBERG_TABLE,
|
||||
fullSchema.size(), 0, getName(), "");
|
||||
tTableDescriptor.setIcebergTable(tIcebergTable);
|
||||
return tTableDescriptor;
|
||||
}
|
||||
}
|
||||
@ -17,12 +17,9 @@
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.CreateTableStmt;
|
||||
import org.apache.doris.analysis.DropTableStmt;
|
||||
import org.apache.doris.analysis.RefreshCatalogStmt;
|
||||
import org.apache.doris.analysis.RefreshDbStmt;
|
||||
import org.apache.doris.analysis.RefreshTableStmt;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.external.ExternalDatabase;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
@ -30,7 +27,6 @@ import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.datasource.ExternalObjectLog;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.qe.DdlExecutor;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
@ -64,13 +60,8 @@ public class RefreshManager {
|
||||
throw new DdlException("Catalog " + catalogName + " doesn't exist.");
|
||||
}
|
||||
|
||||
if (catalog.getName().equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
|
||||
// Process internal catalog iceberg external table refresh.
|
||||
refreshInternalCtlIcebergTable(stmt, env);
|
||||
} else {
|
||||
// Process external catalog table refresh
|
||||
env.getCatalogMgr().refreshExternalTable(dbName, tableName, catalogName, false);
|
||||
}
|
||||
// Process external catalog table refresh
|
||||
env.getCatalogMgr().refreshExternalTable(dbName, tableName, catalogName, false);
|
||||
LOG.info("Successfully refresh table: {} from db: {}", tableName, dbName);
|
||||
}
|
||||
|
||||
@ -84,42 +75,11 @@ public class RefreshManager {
|
||||
throw new DdlException("Catalog " + catalogName + " doesn't exist.");
|
||||
}
|
||||
|
||||
if (catalog.getName().equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
|
||||
// Process internal catalog iceberg external db refresh.
|
||||
refreshInternalCtlIcebergDb(dbName, env);
|
||||
} else {
|
||||
// Process external catalog db refresh
|
||||
refreshExternalCtlDb(dbName, catalog, stmt.isInvalidCache());
|
||||
}
|
||||
// Process external catalog db refresh
|
||||
refreshExternalCtlDb(dbName, catalog, stmt.isInvalidCache());
|
||||
LOG.info("Successfully refresh db: {}", dbName);
|
||||
}
|
||||
|
||||
private void refreshInternalCtlIcebergDb(String dbName, Env env) throws DdlException {
|
||||
Database db = env.getInternalCatalog().getDbOrDdlException(dbName);
|
||||
|
||||
// 0. build iceberg property
|
||||
// Since we have only persisted database properties with key-value format in DatabaseProperty,
|
||||
// we build IcebergProperty here, before checking database type.
|
||||
db.getDbProperties().checkAndBuildProperties();
|
||||
// 1. check database type
|
||||
if (!db.getDbProperties().getIcebergProperty().isExist()) {
|
||||
throw new DdlException("Only support refresh Iceberg database.");
|
||||
}
|
||||
|
||||
// 2. only drop iceberg table in the database
|
||||
// Current database may have other types of table, which is not allowed to drop.
|
||||
for (Table table : db.getTables()) {
|
||||
if (table instanceof IcebergTable) {
|
||||
DropTableStmt dropTableStmt =
|
||||
new DropTableStmt(true, new TableName(null, dbName, table.getName()), true);
|
||||
env.dropTable(dropTableStmt);
|
||||
}
|
||||
}
|
||||
|
||||
// 3. register iceberg database to recreate iceberg table
|
||||
env.getIcebergTableCreationRecordMgr().registerDb(db);
|
||||
}
|
||||
|
||||
private void refreshExternalCtlDb(String dbName, CatalogIf catalog, boolean invalidCache) throws DdlException {
|
||||
if (!(catalog instanceof ExternalCatalog)) {
|
||||
throw new DdlException("Only support refresh ExternalCatalog Database");
|
||||
@ -137,29 +97,6 @@ public class RefreshManager {
|
||||
Env.getCurrentEnv().getEditLog().logRefreshExternalDb(log);
|
||||
}
|
||||
|
||||
private void refreshInternalCtlIcebergTable(RefreshTableStmt stmt, Env env) throws UserException {
|
||||
// 0. check table type
|
||||
Database db = env.getInternalCatalog().getDbOrDdlException(stmt.getDbName());
|
||||
Table table = db.getTableNullable(stmt.getTblName());
|
||||
if (!(table instanceof IcebergTable)) {
|
||||
throw new DdlException("Only support refresh Iceberg table.");
|
||||
}
|
||||
|
||||
// 1. get iceberg properties
|
||||
Map<String, String> icebergProperties = ((IcebergTable) table).getIcebergProperties();
|
||||
icebergProperties.put(IcebergProperty.ICEBERG_TABLE, ((IcebergTable) table).getIcebergTbl());
|
||||
icebergProperties.put(IcebergProperty.ICEBERG_DATABASE, ((IcebergTable) table).getIcebergDb());
|
||||
|
||||
// 2. drop old table
|
||||
DropTableStmt dropTableStmt = new DropTableStmt(true, stmt.getTableName(), true);
|
||||
env.dropTable(dropTableStmt);
|
||||
|
||||
// 3. create new table
|
||||
CreateTableStmt createTableStmt = new CreateTableStmt(true, true,
|
||||
stmt.getTableName(), "ICEBERG", icebergProperties, "");
|
||||
env.createTable(createTableStmt);
|
||||
}
|
||||
|
||||
public void addToRefreshMap(long catalogId, Integer[] sec) {
|
||||
refreshMap.put(catalogId, sec);
|
||||
}
|
||||
|
||||
@ -26,7 +26,6 @@ import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.QueryableReentrantReadWriteLock;
|
||||
import org.apache.doris.common.util.SqlUtils;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.external.hudi.HudiTable;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.BaseAnalysisTask;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
@ -377,10 +376,6 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
|
||||
table = new EsTable();
|
||||
} else if (type == TableType.HIVE) {
|
||||
table = new HiveTable();
|
||||
} else if (type == TableType.ICEBERG) {
|
||||
table = new IcebergTable();
|
||||
} else if (type == TableType.HUDI) {
|
||||
table = new HudiTable();
|
||||
} else if (type == TableType.JDBC) {
|
||||
table = new JdbcTable();
|
||||
} else {
|
||||
|
||||
@ -69,7 +69,6 @@ import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.EsTable;
|
||||
import org.apache.doris.catalog.HashDistributionInfo;
|
||||
import org.apache.doris.catalog.HiveTable;
|
||||
import org.apache.doris.catalog.IcebergTable;
|
||||
import org.apache.doris.catalog.Index;
|
||||
import org.apache.doris.catalog.InfoSchemaDb;
|
||||
import org.apache.doris.catalog.JdbcTable;
|
||||
@ -135,8 +134,6 @@ import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
|
||||
import org.apache.doris.datasource.property.constants.HMSProperties;
|
||||
import org.apache.doris.external.elasticsearch.EsRepository;
|
||||
import org.apache.doris.external.iceberg.IcebergCatalogMgr;
|
||||
import org.apache.doris.external.iceberg.IcebergTableCreationRecordMgr;
|
||||
import org.apache.doris.persist.AlterDatabasePropertyInfo;
|
||||
import org.apache.doris.persist.AutoIncrementIdUpdateLog;
|
||||
import org.apache.doris.persist.ColocatePersistInfo;
|
||||
@ -212,8 +209,6 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
// Add transient to fix gson issue.
|
||||
@Getter
|
||||
private transient EsRepository esRepository = new EsRepository();
|
||||
@Getter
|
||||
private IcebergTableCreationRecordMgr icebergTableCreationRecordMgr = new IcebergTableCreationRecordMgr();
|
||||
|
||||
public InternalCatalog() {
|
||||
// create internal databases
|
||||
@ -420,7 +415,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
Database db = new Database(id, fullDbName);
|
||||
db.setClusterName(SystemInfoService.DEFAULT_CLUSTER);
|
||||
// check and analyze database properties before create database
|
||||
db.setDbProperties(new DatabaseProperty(properties).checkAndBuildProperties());
|
||||
db.setDbProperties(new DatabaseProperty(properties));
|
||||
|
||||
if (!tryLock(false)) {
|
||||
throw new DdlException("Failed to acquire catalog lock. Try again");
|
||||
@ -441,11 +436,6 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
unlock();
|
||||
}
|
||||
LOG.info("createDb dbName = " + fullDbName + ", id = " + id);
|
||||
|
||||
// create tables in iceberg database
|
||||
if (db.getDbProperties().getIcebergProperty().isExist()) {
|
||||
icebergTableCreationRecordMgr.registerDb(db);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -557,10 +547,6 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
}
|
||||
|
||||
public void unprotectDropDb(Database db, boolean isForeDrop, boolean isReplay, long recycleTime) {
|
||||
// drop Iceberg database table creation records
|
||||
if (db.getDbProperties().getIcebergProperty().isExist()) {
|
||||
icebergTableCreationRecordMgr.deregisterDb(db);
|
||||
}
|
||||
for (Table table : db.getTables()) {
|
||||
unprotectDropTable(db, table, isForeDrop, isReplay, recycleTime);
|
||||
}
|
||||
@ -943,9 +929,6 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
// drop all temp partitions of this table, so that there is no temp partitions in recycle bin,
|
||||
// which make things easier.
|
||||
((OlapTable) table).dropAllTempPartitions();
|
||||
} else if (table.getType() == TableType.ICEBERG) {
|
||||
// drop Iceberg database table creation record
|
||||
icebergTableCreationRecordMgr.deregisterTable(db, (IcebergTable) table);
|
||||
} else if (table.getType() == TableType.MATERIALIZED_VIEW) {
|
||||
Env.getCurrentEnv().getMtmvService().deregisterMTMV((MTMV) table);
|
||||
}
|
||||
@ -1123,9 +1106,6 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
} else if (engineName.equalsIgnoreCase("hive")) {
|
||||
createHiveTable(db, stmt);
|
||||
return;
|
||||
} else if (engineName.equalsIgnoreCase("iceberg")) {
|
||||
IcebergCatalogMgr.createIcebergTable(db, stmt);
|
||||
return;
|
||||
} else if (engineName.equalsIgnoreCase("jdbc")) {
|
||||
createJdbcTable(db, stmt);
|
||||
return;
|
||||
|
||||
@ -1,26 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.external;
|
||||
|
||||
/**
|
||||
* Used to describe the data information that ExternalScanNode needs to read external catalogs.
|
||||
* For example, for hive, the ExternalScanRange may save the file info which need to be read,
|
||||
* such as file path, file format, start and offset, etc.
|
||||
*/
|
||||
public class ExternalScanRange {
|
||||
}
|
||||
@ -1,110 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.external.hudi;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.thrift.THudiTable;
|
||||
import org.apache.doris.thrift.TTableDescriptor;
|
||||
import org.apache.doris.thrift.TTableType;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Deprecated
|
||||
public class HudiTable extends Table {
|
||||
private static final Logger LOG = LogManager.getLogger(HudiTable.class);
|
||||
|
||||
public static final String HUDI_DATABASE = "hudi.database";
|
||||
public static final String HUDI_TABLE = "hudi.table";
|
||||
|
||||
// table properties of this hudi table
|
||||
private Map<String, String> tableProperties = Maps.newHashMap();
|
||||
// remote Hudi database name in hive metastore
|
||||
private String hmsDatabaseName;
|
||||
// remote Hudi table name in hive metastore
|
||||
private String hmsTableName;
|
||||
|
||||
public HudiTable() {
|
||||
super(TableType.HUDI);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a Hudi Table with id, name, schema, properties.
|
||||
*
|
||||
* @param id table id
|
||||
* @param tableName table name
|
||||
* @param fullSchema table's schema
|
||||
* @param tableProperties table's properties
|
||||
*/
|
||||
public HudiTable(long id, String tableName, List<Column> fullSchema, Map<String, String> tableProperties) {
|
||||
super(id, tableName, TableType.HUDI, fullSchema);
|
||||
this.tableProperties = tableProperties;
|
||||
this.hmsDatabaseName = tableProperties.get(HUDI_DATABASE);
|
||||
this.hmsTableName = tableProperties.get(HUDI_TABLE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
super.write(out);
|
||||
|
||||
Text.writeString(out, hmsDatabaseName);
|
||||
Text.writeString(out, hmsTableName);
|
||||
|
||||
out.writeInt(tableProperties.size());
|
||||
for (Map.Entry<String, String> entry : tableProperties.entrySet()) {
|
||||
Text.writeString(out, entry.getKey());
|
||||
Text.writeString(out, entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
super.readFields(in);
|
||||
|
||||
hmsDatabaseName = Text.readString(in);
|
||||
hmsTableName = Text.readString(in);
|
||||
int size = in.readInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
String key = Text.readString(in);
|
||||
String value = Text.readString(in);
|
||||
tableProperties.put(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public TTableDescriptor toThrift() {
|
||||
THudiTable thriftHudiTable = new THudiTable();
|
||||
thriftHudiTable.setDbName(hmsDatabaseName);
|
||||
thriftHudiTable.setTableName(hmsTableName);
|
||||
thriftHudiTable.setProperties(tableProperties);
|
||||
|
||||
TTableDescriptor thriftTableDescriptor = new TTableDescriptor(getId(), TTableType.BROKER_TABLE,
|
||||
fullSchema.size(), 0, getName(), "");
|
||||
thriftTableDescriptor.setHudiTable(thriftHudiTable);
|
||||
return thriftTableDescriptor;
|
||||
}
|
||||
}
|
||||
@ -1,32 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.external.iceberg;
|
||||
|
||||
/**
|
||||
* Exception class for Iceberg in Doris
|
||||
*/
|
||||
public class DorisIcebergException extends RuntimeException {
|
||||
|
||||
public DorisIcebergException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public DorisIcebergException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
}
|
||||
@ -1,91 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.external.iceberg;
|
||||
|
||||
import org.apache.doris.catalog.IcebergProperty;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.iceberg.Table;
|
||||
import org.apache.iceberg.catalog.Namespace;
|
||||
import org.apache.iceberg.catalog.TableIdentifier;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* HiveCatalog of Iceberg
|
||||
*/
|
||||
public class HiveCatalog implements IcebergCatalog {
|
||||
private static final Logger LOG = LogManager.getLogger(HiveCatalog.class);
|
||||
|
||||
private org.apache.iceberg.hive.HiveCatalog hiveCatalog;
|
||||
|
||||
public HiveCatalog() {
|
||||
hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(IcebergProperty icebergProperty) {
|
||||
// set hadoop conf
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
for (Map.Entry<String, String> entry : icebergProperty.getDfsProperties().entrySet()) {
|
||||
conf.set(entry.getKey(), entry.getValue());
|
||||
}
|
||||
hiveCatalog.setConf(conf);
|
||||
// initialize hive catalog
|
||||
Map<String, String> catalogProperties = new HashMap<>();
|
||||
catalogProperties.put("uri", icebergProperty.getHiveMetastoreUris());
|
||||
hiveCatalog.initialize("hive", catalogProperties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean tableExists(TableIdentifier tableIdentifier) {
|
||||
return hiveCatalog.tableExists(tableIdentifier);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Table loadTable(TableIdentifier tableIdentifier) throws DorisIcebergException {
|
||||
try {
|
||||
return hiveCatalog.loadTable(tableIdentifier);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to load table[{}] from database[{}], with error: {}",
|
||||
tableIdentifier.name(), tableIdentifier.namespace(), e.getMessage());
|
||||
throw new DorisIcebergException(String.format("Failed to load table[%s] from database[%s]",
|
||||
tableIdentifier.name(), tableIdentifier.namespace()), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TableIdentifier> listTables(String db) throws DorisIcebergException {
|
||||
try {
|
||||
return hiveCatalog.listTables(Namespace.of(db));
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to list table in database[{}], with error: {}", db, e.getMessage());
|
||||
throw new DorisIcebergException(String.format("Failed to list table in database[%s]", db), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean databaseExists(String db) {
|
||||
return hiveCatalog.namespaceExists(Namespace.of(db));
|
||||
}
|
||||
}
|
||||
@ -1,67 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.external.iceberg;
|
||||
|
||||
|
||||
import org.apache.doris.catalog.IcebergProperty;
|
||||
|
||||
import org.apache.iceberg.Table;
|
||||
import org.apache.iceberg.catalog.TableIdentifier;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A Catalog API for iceberg table and namespace.
|
||||
*/
|
||||
public interface IcebergCatalog {
|
||||
/**
|
||||
* Initialize a catalog given a map of catalog properties.
|
||||
* @param icebergProperty
|
||||
*/
|
||||
default void initialize(IcebergProperty icebergProperty) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether table exists.
|
||||
* @param tableIdentifier
|
||||
*/
|
||||
default boolean tableExists(TableIdentifier tableIdentifier) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Load a table
|
||||
* @param tableIdentifier
|
||||
*/
|
||||
Table loadTable(TableIdentifier tableIdentifier) throws DorisIcebergException;
|
||||
|
||||
/**
|
||||
* Return all the identifiers under this db.
|
||||
* @param db
|
||||
*/
|
||||
List<TableIdentifier> listTables(String db) throws DorisIcebergException;
|
||||
|
||||
/**
|
||||
* Checks whether the database exists.
|
||||
*
|
||||
* @param db
|
||||
*/
|
||||
default boolean databaseExists(String db) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@ -1,220 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.external.iceberg;
|
||||
|
||||
import org.apache.doris.analysis.CreateTableStmt;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.IcebergProperty;
|
||||
import org.apache.doris.catalog.IcebergTable;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.SystemIdGenerator;
|
||||
import org.apache.doris.external.iceberg.util.IcebergUtils;
|
||||
|
||||
import com.google.common.base.Enums;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.iceberg.catalog.TableIdentifier;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* Iceberg catalog manager
|
||||
*/
|
||||
public class IcebergCatalogMgr {
|
||||
private static final Logger LOG = LogManager.getLogger(IcebergCatalogMgr.class);
|
||||
|
||||
private static final String PROPERTY_MISSING_MSG = "Iceberg %s is null. "
|
||||
+ "Please add properties('%s'='xxx') when create iceberg database.";
|
||||
|
||||
// hive metastore uri -> iceberg catalog
|
||||
// used to cache iceberg catalogs
|
||||
private static final ConcurrentHashMap<String, IcebergCatalog> metastoreUriToCatalog = new ConcurrentHashMap();
|
||||
|
||||
// TODO:(qjl) We'll support more types of Iceberg catalog.
|
||||
public enum CatalogType {
|
||||
HIVE_CATALOG
|
||||
}
|
||||
|
||||
public static IcebergCatalog getCatalog(IcebergProperty icebergProperty) throws DdlException {
|
||||
String uri = icebergProperty.getHiveMetastoreUris();
|
||||
if (!metastoreUriToCatalog.containsKey(uri)) {
|
||||
metastoreUriToCatalog.put(uri, createCatalog(icebergProperty));
|
||||
}
|
||||
return metastoreUriToCatalog.get(uri);
|
||||
}
|
||||
|
||||
private static IcebergCatalog createCatalog(IcebergProperty icebergProperty) throws DdlException {
|
||||
CatalogType type = CatalogType.valueOf(icebergProperty.getCatalogType());
|
||||
IcebergCatalog catalog;
|
||||
switch (type) {
|
||||
case HIVE_CATALOG:
|
||||
catalog = new HiveCatalog();
|
||||
break;
|
||||
default:
|
||||
throw new DdlException("Unsupported catalog type: " + type);
|
||||
}
|
||||
catalog.initialize(icebergProperty);
|
||||
return catalog;
|
||||
}
|
||||
|
||||
public static void validateProperties(Map<String, String> properties, boolean isTable) throws DdlException {
|
||||
if (properties.size() == 0) {
|
||||
throw new DdlException("Please set properties of iceberg, "
|
||||
+ "they are: iceberg.database and 'iceberg.hive.metastore.uris'");
|
||||
}
|
||||
|
||||
Map<String, String> copiedProps = Maps.newHashMap(properties);
|
||||
String icebergDb = copiedProps.get(IcebergProperty.ICEBERG_DATABASE);
|
||||
if (Strings.isNullOrEmpty(icebergDb)) {
|
||||
throw new DdlException(String.format(PROPERTY_MISSING_MSG,
|
||||
IcebergProperty.ICEBERG_DATABASE, IcebergProperty.ICEBERG_DATABASE));
|
||||
}
|
||||
copiedProps.remove(IcebergProperty.ICEBERG_DATABASE);
|
||||
|
||||
// check hive properties
|
||||
// hive.metastore.uris
|
||||
String hiveMetastoreUris = copiedProps.get(IcebergProperty.ICEBERG_HIVE_METASTORE_URIS);
|
||||
if (Strings.isNullOrEmpty(hiveMetastoreUris)) {
|
||||
throw new DdlException(String.format(PROPERTY_MISSING_MSG,
|
||||
IcebergProperty.ICEBERG_HIVE_METASTORE_URIS, IcebergProperty.ICEBERG_HIVE_METASTORE_URIS));
|
||||
}
|
||||
copiedProps.remove(IcebergProperty.ICEBERG_HIVE_METASTORE_URIS);
|
||||
|
||||
// check iceberg catalog type
|
||||
String icebergCatalogType = copiedProps.get(IcebergProperty.ICEBERG_CATALOG_TYPE);
|
||||
if (Strings.isNullOrEmpty(icebergCatalogType)) {
|
||||
icebergCatalogType = IcebergCatalogMgr.CatalogType.HIVE_CATALOG.name();
|
||||
properties.put(IcebergProperty.ICEBERG_CATALOG_TYPE, icebergCatalogType);
|
||||
} else {
|
||||
copiedProps.remove(IcebergProperty.ICEBERG_CATALOG_TYPE);
|
||||
}
|
||||
|
||||
if (!Enums.getIfPresent(IcebergCatalogMgr.CatalogType.class, icebergCatalogType).isPresent()) {
|
||||
throw new DdlException("Unknown catalog type: " + icebergCatalogType
|
||||
+ ". Current only support HiveCatalog.");
|
||||
}
|
||||
|
||||
// only check table property when it's an iceberg table
|
||||
if (isTable) {
|
||||
String icebergTbl = copiedProps.get(IcebergProperty.ICEBERG_TABLE);
|
||||
if (Strings.isNullOrEmpty(icebergTbl)) {
|
||||
throw new DdlException(String.format(PROPERTY_MISSING_MSG,
|
||||
IcebergProperty.ICEBERG_TABLE, IcebergProperty.ICEBERG_TABLE));
|
||||
}
|
||||
copiedProps.remove(IcebergProperty.ICEBERG_TABLE);
|
||||
}
|
||||
|
||||
if (!copiedProps.isEmpty()) {
|
||||
Iterator<Map.Entry<String, String>> iter = copiedProps.entrySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
Map.Entry<String, String> entry = iter.next();
|
||||
if (entry.getKey().startsWith(IcebergProperty.ICEBERG_HDFS_PREFIX)) {
|
||||
iter.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!copiedProps.isEmpty()) {
|
||||
throw new DdlException("Unknown table properties: " + copiedProps.toString());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Doris IcebergTable from remote Iceberg by database and table
|
||||
* @param tableId table id in Doris
|
||||
* @param tableName table name in Doris
|
||||
* @param icebergProperty Iceberg property
|
||||
* @param identifier Iceberg table identifier
|
||||
* @param isTable
|
||||
* @return IcebergTable in Doris
|
||||
* @throws DdlException
|
||||
*/
|
||||
public static IcebergTable getTableFromIceberg(long tableId, String tableName, IcebergProperty icebergProperty,
|
||||
TableIdentifier identifier,
|
||||
boolean isTable) throws DdlException {
|
||||
IcebergCatalog icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty);
|
||||
|
||||
if (isTable && !icebergCatalog.tableExists(identifier)) {
|
||||
throw new DdlException(String.format("Table [%s] dose not exist in Iceberg.", identifier.toString()));
|
||||
}
|
||||
|
||||
// get iceberg table schema
|
||||
org.apache.iceberg.Table icebergTable = icebergCatalog.loadTable(identifier);
|
||||
|
||||
// covert iceberg table schema to Doris's
|
||||
List<Column> columns = IcebergUtils.createSchemaFromIcebergSchema(icebergTable.schema());
|
||||
|
||||
// create new iceberg table in doris
|
||||
IcebergTable table = new IcebergTable(tableId, tableName, columns, icebergProperty, icebergTable);
|
||||
|
||||
return table;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* create iceberg table in Doris
|
||||
*
|
||||
* 1. check table existence in Iceberg
|
||||
* 2. get table schema from Iceberg
|
||||
* 3. convert Iceberg table schema to Doris table schema
|
||||
* 4. create associate table in Doris
|
||||
*
|
||||
* @param db
|
||||
* @param stmt
|
||||
* @throws DdlException
|
||||
*/
|
||||
public static void createIcebergTable(Database db, CreateTableStmt stmt) throws DdlException {
|
||||
String tableName = stmt.getTableName();
|
||||
Map<String, String> properties = stmt.getProperties();
|
||||
|
||||
// validate iceberg table properties
|
||||
validateProperties(properties, true);
|
||||
IcebergProperty icebergProperty = new IcebergProperty(properties);
|
||||
|
||||
String icebergDb = icebergProperty.getDatabase();
|
||||
String icebergTbl = icebergProperty.getTable();
|
||||
|
||||
// create iceberg table struct
|
||||
// 1. Already set column def in Create Stmt, just create table
|
||||
// 2. No column def in Create Stmt, get it from remote Iceberg schema.
|
||||
IcebergTable table;
|
||||
long tableId = SystemIdGenerator.getNextId();
|
||||
if (stmt.getColumns().size() > 0) {
|
||||
// set column def in CREATE TABLE
|
||||
table = new IcebergTable(tableId, tableName, stmt.getColumns(), icebergProperty, null);
|
||||
} else {
|
||||
// get column def from remote Iceberg
|
||||
table = getTableFromIceberg(tableId, tableName, icebergProperty,
|
||||
TableIdentifier.of(icebergDb, icebergTbl), true);
|
||||
}
|
||||
|
||||
// check iceberg table if exists in doris database
|
||||
if (!db.createTableWithLock(table, false, stmt.isSetIfNotExists()).first) {
|
||||
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
|
||||
}
|
||||
LOG.info("successfully create table[{}-{}]", tableName, table.getId());
|
||||
}
|
||||
}
|
||||
@ -1,88 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.external.iceberg;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Represents the record of Iceberg table automating creation in an Iceberg database
|
||||
*/
|
||||
public class IcebergTableCreationRecord {
|
||||
private static final Logger LOG = LogManager.getLogger(IcebergTableCreationRecord.class);
|
||||
|
||||
private long dbId;
|
||||
private long tableId;
|
||||
private String db;
|
||||
private String table;
|
||||
private String status;
|
||||
private String createTime;
|
||||
private String errorMsg;
|
||||
|
||||
public IcebergTableCreationRecord(long dbId, long tableId, String db, String table, String status,
|
||||
String createTime, String errorMsg) {
|
||||
this.dbId = dbId;
|
||||
this.tableId = tableId;
|
||||
this.db = db;
|
||||
this.table = table;
|
||||
this.status = status;
|
||||
this.createTime = createTime;
|
||||
this.errorMsg = errorMsg;
|
||||
}
|
||||
|
||||
public List<Comparable> getTableCreationRecord() {
|
||||
List<Comparable> record = new ArrayList<>();
|
||||
record.add(this.db);
|
||||
record.add(this.table);
|
||||
record.add(this.status);
|
||||
record.add(this.createTime);
|
||||
record.add(this.errorMsg);
|
||||
return record;
|
||||
}
|
||||
|
||||
public long getDbId() {
|
||||
return dbId;
|
||||
}
|
||||
|
||||
public long getTableId() {
|
||||
return tableId;
|
||||
}
|
||||
|
||||
public String getDb() {
|
||||
return db;
|
||||
}
|
||||
|
||||
public String getTable() {
|
||||
return table;
|
||||
}
|
||||
|
||||
public String getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public String getCreateTime() {
|
||||
return createTime;
|
||||
}
|
||||
|
||||
public String getErrorMsg() {
|
||||
return errorMsg;
|
||||
}
|
||||
}
|
||||
@ -1,279 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.external.iceberg;
|
||||
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.IcebergProperty;
|
||||
import org.apache.doris.catalog.IcebergTable;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.SystemIdGenerator;
|
||||
import org.apache.doris.common.property.PropertySchema;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.iceberg.catalog.TableIdentifier;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.Date;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.PriorityQueue;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
/**
|
||||
* Manager for Iceberg automatic creation table records
|
||||
* used to create iceberg tables and show table creation records
|
||||
*/
|
||||
public class IcebergTableCreationRecordMgr extends MasterDaemon {
|
||||
private static final Logger LOG = LogManager.getLogger(IcebergTableCreationRecordMgr.class);
|
||||
|
||||
private static final String SUCCESS = "success";
|
||||
private static final String FAIL = "fail";
|
||||
|
||||
// Iceberg databases, used to list remote iceberg tables
|
||||
// dbId -> database
|
||||
private final Map<Long, Database> icebergDbs = new ConcurrentHashMap<>();
|
||||
// database -> table identifier -> properties
|
||||
// used to create table
|
||||
private final Map<Database, Map<TableIdentifier, IcebergProperty>> dbToTableIdentifiers = Maps.newConcurrentMap();
|
||||
// table creation records, used for show stmt
|
||||
// dbId -> tableId -> create msg
|
||||
private final Map<Long, Map<Long, IcebergTableCreationRecord>> dbToTableToCreationRecord = Maps.newConcurrentMap();
|
||||
|
||||
private final Queue<IcebergTableCreationRecord> tableCreationRecordQueue
|
||||
= new PriorityQueue<>(new TableCreationComparator());
|
||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
|
||||
|
||||
public IcebergTableCreationRecordMgr() {
|
||||
super("iceberg_table_creation_record_mgr", Config.iceberg_table_creation_interval_second * 1000);
|
||||
}
|
||||
|
||||
public void registerDb(Database db) throws DdlException {
|
||||
long dbId = db.getId();
|
||||
icebergDbs.put(dbId, db);
|
||||
LOG.info("Register a new Iceberg database[{}-{}]", dbId, db.getFullName());
|
||||
}
|
||||
|
||||
private void registerTable(Database db, TableIdentifier identifier, IcebergProperty icebergProperty) {
|
||||
if (dbToTableIdentifiers.containsKey(db)) {
|
||||
dbToTableIdentifiers.get(db).put(identifier, icebergProperty);
|
||||
} else {
|
||||
Map<TableIdentifier, IcebergProperty> identifierToProperties = Maps.newConcurrentMap();
|
||||
identifierToProperties.put(identifier, icebergProperty);
|
||||
dbToTableIdentifiers.put(db, identifierToProperties);
|
||||
}
|
||||
LOG.info("Register a new table[{}] to database[{}]", identifier.name(), db.getFullName());
|
||||
}
|
||||
|
||||
public void deregisterDb(Database db) {
|
||||
icebergDbs.remove(db.getId());
|
||||
dbToTableIdentifiers.remove(db);
|
||||
dbToTableToCreationRecord.remove(db.getId());
|
||||
LOG.info("Deregister database[{}-{}]", db.getFullName(), db.getId());
|
||||
}
|
||||
|
||||
public void deregisterTable(Database db, IcebergTable table) {
|
||||
if (dbToTableIdentifiers.containsKey(db)) {
|
||||
TableIdentifier identifier = TableIdentifier.of(table.getIcebergDb(), table.getIcebergTbl());
|
||||
Map<TableIdentifier, IcebergProperty> identifierToProperties = dbToTableIdentifiers.get(db);
|
||||
identifierToProperties.remove(identifier);
|
||||
}
|
||||
if (dbToTableToCreationRecord.containsKey(db.getId())) {
|
||||
Map<Long, IcebergTableCreationRecord> recordMap = dbToTableToCreationRecord.get(db.getId());
|
||||
recordMap.remove(table.getId());
|
||||
}
|
||||
LOG.info("Deregister table[{}-{}] from database[{}-{}]", table.getName(),
|
||||
table.getId(), db.getFullName(), db.getId());
|
||||
}
|
||||
|
||||
// remove already created tables or failed tables
|
||||
private void removeDuplicateTables() {
|
||||
for (Map.Entry<Long, Map<Long, IcebergTableCreationRecord>> entry : dbToTableToCreationRecord.entrySet()) {
|
||||
Env.getCurrentInternalCatalog().getDb(entry.getKey()).ifPresent(db -> {
|
||||
if (dbToTableIdentifiers.containsKey(db)) {
|
||||
for (Map.Entry<Long, IcebergTableCreationRecord> innerEntry : entry.getValue().entrySet()) {
|
||||
String tableName = innerEntry.getValue().getTable();
|
||||
String icebergDbName = db.getDbProperties().getIcebergProperty().getDatabase();
|
||||
TableIdentifier identifier = TableIdentifier.of(icebergDbName, tableName);
|
||||
dbToTableIdentifiers.get(db).remove(identifier);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runAfterCatalogReady() {
|
||||
PropertySchema.DateProperty prop =
|
||||
new PropertySchema.DateProperty("key", TimeUtils.DATETIME_FORMAT);
|
||||
// list iceberg tables in dbs
|
||||
// When listing table is done, remove database from icebergDbs.
|
||||
for (Iterator<Map.Entry<Long, Database>> it = icebergDbs.entrySet().iterator(); it.hasNext(); it.remove()) {
|
||||
Map.Entry<Long, Database> entry = it.next();
|
||||
Database db = entry.getValue();
|
||||
IcebergProperty icebergProperty = db.getDbProperties().getIcebergProperty();
|
||||
IcebergCatalog icebergCatalog = null;
|
||||
try {
|
||||
icebergCatalog = IcebergCatalogMgr.getCatalog(icebergProperty);
|
||||
} catch (DdlException e) {
|
||||
addTableCreationRecord(db.getId(), -1, db.getFullName(), "", FAIL,
|
||||
prop.writeTimeFormat(new Date(System.currentTimeMillis())), e.getMessage());
|
||||
LOG.warn("Failed get Iceberg catalog, hive.metastore.uris[{}], error: {}",
|
||||
icebergProperty.getHiveMetastoreUris(), e.getMessage());
|
||||
}
|
||||
List<TableIdentifier> icebergTables = null;
|
||||
try {
|
||||
icebergTables = icebergCatalog.listTables(icebergProperty.getDatabase());
|
||||
|
||||
} catch (Exception e) {
|
||||
addTableCreationRecord(db.getId(), -1, db.getFullName(), "", FAIL,
|
||||
prop.writeTimeFormat(new Date(System.currentTimeMillis())), e.getMessage());
|
||||
LOG.warn("Failed list remote Iceberg database, hive.metastore.uris[{}], database[{}], error: {}",
|
||||
icebergProperty.getHiveMetastoreUris(), icebergProperty.getDatabase(), e.getMessage());
|
||||
}
|
||||
for (TableIdentifier identifier : icebergTables) {
|
||||
IcebergProperty tableProperties = new IcebergProperty(icebergProperty);
|
||||
tableProperties.setTable(identifier.name());
|
||||
registerTable(db, identifier, tableProperties);
|
||||
}
|
||||
}
|
||||
|
||||
// create table in Doris
|
||||
for (Map.Entry<Database, Map<TableIdentifier, IcebergProperty>> entry : dbToTableIdentifiers.entrySet()) {
|
||||
Database db = entry.getKey();
|
||||
for (Map.Entry<TableIdentifier, IcebergProperty> innerEntry : entry.getValue().entrySet()) {
|
||||
TableIdentifier identifier = innerEntry.getKey();
|
||||
IcebergProperty icebergProperty = innerEntry.getValue();
|
||||
long tableId = SystemIdGenerator.getNextId();
|
||||
try {
|
||||
// get doris table from iceberg
|
||||
IcebergTable table = IcebergCatalogMgr.getTableFromIceberg(tableId, identifier.name(),
|
||||
icebergProperty, identifier, false);
|
||||
// check iceberg table if exists in doris database
|
||||
if (!db.createTableWithLock(table, false, false).first) {
|
||||
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, table.getName());
|
||||
}
|
||||
addTableCreationRecord(db.getId(), tableId, db.getFullName(), table.getName(), SUCCESS,
|
||||
prop.writeTimeFormat(new Date(System.currentTimeMillis())), "");
|
||||
LOG.info("Successfully create table[{}-{}]", table.getName(), tableId);
|
||||
} catch (Exception e) {
|
||||
addTableCreationRecord(db.getId(), tableId, db.getFullName(), identifier.name(), FAIL,
|
||||
prop.writeTimeFormat(new Date(System.currentTimeMillis())), e.getMessage());
|
||||
LOG.warn("Failed create table[{}], error: {}", identifier.name(), e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
removeDuplicateTables();
|
||||
}
|
||||
|
||||
private void addTableCreationRecord(long dbId, long tableId, String db, String table, String status,
|
||||
String createTime, String errorMsg) {
|
||||
writeLock();
|
||||
try {
|
||||
while (isQueueFull()) {
|
||||
IcebergTableCreationRecord record = tableCreationRecordQueue.poll();
|
||||
if (record != null) {
|
||||
Map<Long, IcebergTableCreationRecord> tableRecords
|
||||
= dbToTableToCreationRecord.get(record.getDbId());
|
||||
Iterator<Map.Entry<Long, IcebergTableCreationRecord>> tableRecordsIterator
|
||||
= tableRecords.entrySet().iterator();
|
||||
while (tableRecordsIterator.hasNext()) {
|
||||
long t = tableRecordsIterator.next().getKey();
|
||||
if (t == record.getTableId()) {
|
||||
tableRecordsIterator.remove();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
IcebergTableCreationRecord record = new IcebergTableCreationRecord(dbId, tableId, db, table, status,
|
||||
createTime, errorMsg);
|
||||
tableCreationRecordQueue.offer(record);
|
||||
|
||||
if (!dbToTableToCreationRecord.containsKey(dbId)) {
|
||||
dbToTableToCreationRecord.put(dbId, new ConcurrentHashMap<>());
|
||||
}
|
||||
Map<Long, IcebergTableCreationRecord> tableToRecord = dbToTableToCreationRecord.get(dbId);
|
||||
if (!tableToRecord.containsKey(tableId)) {
|
||||
tableToRecord.put(tableId, record);
|
||||
}
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public List<IcebergTableCreationRecord> getTableCreationRecordByDbId(long dbId) {
|
||||
List<IcebergTableCreationRecord> records = new ArrayList<>();
|
||||
|
||||
readLock();
|
||||
try {
|
||||
if (!dbToTableToCreationRecord.containsKey(dbId)) {
|
||||
return records;
|
||||
}
|
||||
Map<Long, IcebergTableCreationRecord> tableToRecords = dbToTableToCreationRecord.get(dbId);
|
||||
for (Map.Entry<Long, IcebergTableCreationRecord> entry : tableToRecords.entrySet()) {
|
||||
records.add(entry.getValue());
|
||||
}
|
||||
|
||||
return records;
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
class TableCreationComparator implements Comparator<IcebergTableCreationRecord> {
|
||||
@Override
|
||||
public int compare(IcebergTableCreationRecord r1, IcebergTableCreationRecord r2) {
|
||||
return r1.getCreateTime().compareTo(r2.getCreateTime());
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isQueueFull() {
|
||||
return tableCreationRecordQueue.size() >= 2000;
|
||||
}
|
||||
|
||||
private void readLock() {
|
||||
lock.readLock().lock();
|
||||
}
|
||||
|
||||
private void readUnlock() {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
|
||||
private void writeLock() {
|
||||
lock.writeLock().lock();
|
||||
}
|
||||
|
||||
private void writeUnlock() {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
|
||||
}
|
||||
@ -21,7 +21,6 @@ package org.apache.doris.external.iceberg.util;
|
||||
import org.apache.doris.analysis.BinaryPredicate;
|
||||
import org.apache.doris.analysis.BoolLiteral;
|
||||
import org.apache.doris.analysis.CastExpr;
|
||||
import org.apache.doris.analysis.ColumnDef;
|
||||
import org.apache.doris.analysis.CompoundPredicate;
|
||||
import org.apache.doris.analysis.DateLiteral;
|
||||
import org.apache.doris.analysis.DecimalLiteral;
|
||||
@ -34,39 +33,18 @@ import org.apache.doris.analysis.NullLiteral;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.StringLiteral;
|
||||
import org.apache.doris.analysis.Subquery;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.thrift.TExprOpcode;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.iceberg.CombinedScanTask;
|
||||
import org.apache.iceberg.MetadataTableType;
|
||||
import org.apache.iceberg.MetadataTableUtils;
|
||||
import org.apache.iceberg.PartitionSpec;
|
||||
import org.apache.iceberg.Schema;
|
||||
import org.apache.iceberg.TableOperations;
|
||||
import org.apache.iceberg.TableScan;
|
||||
import org.apache.iceberg.expressions.Expression;
|
||||
import org.apache.iceberg.expressions.Expressions;
|
||||
import org.apache.iceberg.transforms.PartitionSpecVisitor;
|
||||
import org.apache.iceberg.types.TypeUtil;
|
||||
import org.apache.iceberg.types.Types;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Iceberg utils
|
||||
@ -81,147 +59,6 @@ public class IcebergUtils {
|
||||
};
|
||||
static long MILLIS_TO_NANO_TIME = 1000;
|
||||
|
||||
/**
|
||||
* Create Iceberg schema from Doris ColumnDef.
|
||||
*
|
||||
* @param columnDefs columns for create iceberg table
|
||||
* @return Iceberg schema
|
||||
* @throws UserException if has aggregate type in create table statement
|
||||
*/
|
||||
public static Schema createIcebergSchema(List<ColumnDef> columnDefs) throws UserException {
|
||||
columnIdThreadLocal.set(1);
|
||||
List<Types.NestedField> nestedFields = Lists.newArrayList();
|
||||
for (ColumnDef columnDef : columnDefs) {
|
||||
columnDef.analyze(false);
|
||||
if (columnDef.getAggregateType() != null) {
|
||||
throw new DdlException("Do not support aggregation column: " + columnDef.getName());
|
||||
}
|
||||
boolean isNullable = columnDef.isAllowNull();
|
||||
org.apache.iceberg.types.Type icebergType = convertDorisToIceberg(columnDef.getType());
|
||||
if (isNullable) {
|
||||
nestedFields.add(
|
||||
Types.NestedField.optional(nextId(), columnDef.getName(), icebergType, columnDef.getComment()));
|
||||
} else {
|
||||
nestedFields.add(
|
||||
Types.NestedField.required(nextId(), columnDef.getName(), icebergType, columnDef.getComment()));
|
||||
}
|
||||
}
|
||||
return new Schema(nestedFields);
|
||||
}
|
||||
|
||||
public static List<Column> createSchemaFromIcebergSchema(Schema schema) throws DdlException {
|
||||
List<Column> columns = Lists.newArrayList();
|
||||
for (Types.NestedField nestedField : schema.columns()) {
|
||||
try {
|
||||
columns.add(nestedFieldToColumn(nestedField));
|
||||
} catch (UnsupportedOperationException e) {
|
||||
if (Config.iceberg_table_creation_strict_mode) {
|
||||
throw e;
|
||||
}
|
||||
LOG.warn("Unsupported data type in Doris, ignore column[{}], with error: {}",
|
||||
nestedField.name(), e.getMessage());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
return columns;
|
||||
}
|
||||
|
||||
public static Column nestedFieldToColumn(Types.NestedField field) {
|
||||
Type type = convertIcebergToDoris(field.type());
|
||||
return new Column(field.name(), type, true, null, field.isOptional(), null, field.doc());
|
||||
}
|
||||
|
||||
/**
|
||||
* get iceberg table schema id to name mapping
|
||||
*
|
||||
* @param schema iceberg table schema
|
||||
* @return id to name mapping
|
||||
*/
|
||||
public static Map<Integer, String> getIdToName(Schema schema) {
|
||||
Map<Integer, String> idToName = new HashMap<>();
|
||||
for (Types.NestedField nestedField : schema.columns()) {
|
||||
idToName.put(nestedField.fieldId(), nestedField.name());
|
||||
}
|
||||
return idToName;
|
||||
}
|
||||
|
||||
public static List<String> getIdentityPartitionField(PartitionSpec spec) {
|
||||
return PartitionSpecVisitor.visit(spec,
|
||||
new PartitionSpecVisitor<String>() {
|
||||
@Override
|
||||
public String identity(String sourceName, int sourceId) {
|
||||
return sourceName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String bucket(String sourceName, int sourceId, int numBuckets) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String truncate(String sourceName, int sourceId, int width) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String year(String sourceName, int sourceId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String month(String sourceName, int sourceId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String day(String sourceName, int sourceId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String hour(String sourceName, int sourceId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String alwaysNull(int fieldId, String sourceName, int sourceId) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String unknown(int fieldId, String sourceName, int sourceId, String transform) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
).stream().filter(Objects::nonNull).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a {@link org.apache.iceberg.types.Type} to a {@link Type doris type}.
|
||||
*
|
||||
* @param type a iceberg Type
|
||||
* @return the equivalent doris type
|
||||
* @throws IllegalArgumentException if the type cannot be converted to doris
|
||||
*/
|
||||
public static Type convertIcebergToDoris(org.apache.iceberg.types.Type type) {
|
||||
return TypeUtil.visit(type, new TypeToDorisType());
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a doris {@link Type struct} to a {@link org.apache.iceberg.types.Type} with new field ids.
|
||||
* <p>
|
||||
* This conversion assigns fresh ids.
|
||||
* <p>
|
||||
* Some data types are represented as the same doris type. These are converted to a default type.
|
||||
*
|
||||
* @param type a doris Type
|
||||
* @return the equivalent Type
|
||||
* @throws IllegalArgumentException if the type cannot be converted
|
||||
*/
|
||||
public static org.apache.iceberg.types.Type convertDorisToIceberg(Type type) {
|
||||
return DorisTypeVisitor.visit(type, new DorisTypeToType());
|
||||
}
|
||||
|
||||
public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
|
||||
if (expr == null) {
|
||||
return null;
|
||||
@ -397,45 +234,4 @@ public class IcebergUtils {
|
||||
}
|
||||
return slotRef;
|
||||
}
|
||||
|
||||
private static int findWidth(IntLiteral literal) {
|
||||
Preconditions.checkArgument(literal.getValue() > 0 && literal.getValue() < Integer.MAX_VALUE,
|
||||
"Unsupported width " + literal.getValue());
|
||||
return (int) literal.getValue();
|
||||
}
|
||||
|
||||
public static int nextId() {
|
||||
int nextId = columnIdThreadLocal.get();
|
||||
columnIdThreadLocal.set(nextId + 1);
|
||||
return nextId;
|
||||
}
|
||||
|
||||
public static Set<String> getAllDataFilesPath(org.apache.iceberg.Table table, TableOperations ops) {
|
||||
org.apache.iceberg.Table dataFilesTable = MetadataTableUtils.createMetadataTableInstance(
|
||||
ops, table.name(), table.name(), MetadataTableType.ALL_DATA_FILES);
|
||||
|
||||
Set<String> dataFilesPath = Sets.newHashSet();
|
||||
TableScan tableScan = dataFilesTable.newScan();
|
||||
List<CombinedScanTask> tasks = Lists.newArrayList(tableScan.planTasks());
|
||||
tasks.forEach(task ->
|
||||
task.files().forEach(fileScanTask -> {
|
||||
Lists.newArrayList(fileScanTask.asDataTask().rows())
|
||||
.forEach(row -> dataFilesPath.add(row.get(1, String.class)));
|
||||
})
|
||||
);
|
||||
|
||||
return dataFilesPath;
|
||||
}
|
||||
|
||||
public static PartitionSpec buildPartitionSpec(Schema schema, List<String> partitionNames) {
|
||||
if (partitionNames == null || partitionNames.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
|
||||
for (String partitionName : partitionNames) {
|
||||
builder.identity(partitionName);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -50,14 +50,7 @@ public interface FileSystem {
|
||||
|
||||
Status makeDir(String remotePath);
|
||||
|
||||
default RemoteFiles listLocatedFiles(String remotePath) throws UserException {
|
||||
return listLocatedFiles(remotePath, false, false);
|
||||
}
|
||||
|
||||
// Get files and directories located status, not only files
|
||||
default RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException {
|
||||
throw new UserException("Not support to listLocations.");
|
||||
}
|
||||
RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException;
|
||||
|
||||
// List files in remotePath
|
||||
// The remote file name will only contain file name only(Not full path)
|
||||
|
||||
@ -59,6 +59,11 @@ public class LocalFileSystem implements FileSystem {
|
||||
throw new UnsupportedOperationException("Unsupported operation on local file system.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) {
|
||||
throw new UnsupportedOperationException("Unsupported operation on local file system.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
|
||||
throw new UnsupportedOperationException("Unsupported operation on local file system.");
|
||||
|
||||
@ -45,10 +45,6 @@ public class RemoteFile {
|
||||
this(name, null, isFile, !isFile, size, blockSize, modificationTime, null);
|
||||
}
|
||||
|
||||
public RemoteFile(Path path, boolean isDirectory, long size, long blockSize, BlockLocation[] blockLocations) {
|
||||
this(path.getName(), path, !isDirectory, isDirectory, size, blockSize, 0, blockLocations);
|
||||
}
|
||||
|
||||
public RemoteFile(Path path, boolean isDirectory, long size, long blockSize, long modificationTime,
|
||||
BlockLocation[] blockLocations) {
|
||||
this(path.getName(), path, !isDirectory, isDirectory, size, blockSize, modificationTime, blockLocations);
|
||||
|
||||
@ -179,7 +179,6 @@ import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.external.iceberg.IcebergTableCreationRecord;
|
||||
import org.apache.doris.job.task.AbstractTask;
|
||||
import org.apache.doris.load.DeleteHandler;
|
||||
import org.apache.doris.load.ExportJobState;
|
||||
@ -2495,21 +2494,8 @@ public class ShowExecutor {
|
||||
|
||||
private void handleShowTableCreation() throws AnalysisException {
|
||||
ShowTableCreationStmt showStmt = (ShowTableCreationStmt) stmt;
|
||||
String dbName = showStmt.getDbName();
|
||||
DatabaseIf db = ctx.getCurrentCatalog().getDbOrAnalysisException(dbName);
|
||||
|
||||
List<IcebergTableCreationRecord> records = ctx.getEnv().getIcebergTableCreationRecordMgr()
|
||||
.getTableCreationRecordByDbId(db.getId());
|
||||
|
||||
List<List<Comparable>> rowSet = Lists.newArrayList();
|
||||
for (IcebergTableCreationRecord record : records) {
|
||||
List<Comparable> row = record.getTableCreationRecord();
|
||||
// like predicate
|
||||
if (Strings.isNullOrEmpty(showStmt.getWild()) || showStmt.like(record.getTable())) {
|
||||
rowSet.add(row);
|
||||
}
|
||||
}
|
||||
|
||||
// sort function rows by fourth column (Create Time) asc
|
||||
ListComparator<List<Comparable>> comparator = null;
|
||||
OrderByPair orderByPair = new OrderByPair(3, false);
|
||||
|
||||
Reference in New Issue
Block a user