[feature-wip](catalog) support deltalake catalog step1-metadata (#22493)

This commit is contained in:
zy-kkk
2023-08-29 10:31:37 +08:00
committed by GitHub
parent d8f159728b
commit 5b641ebd40
15 changed files with 313 additions and 11 deletions

View File

@ -574,6 +574,12 @@ under the License.
<version>${paimon.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>${delta.version}</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>glue</artifactId>

View File

@ -145,7 +145,7 @@ public interface TableIf {
MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, @Deprecated HUDI, JDBC,
TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE,
ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, PAIMON_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE,
HUDI_EXTERNAL_TABLE;
HUDI_EXTERNAL_TABLE, DELTALAKE_EXTERNAL_TABLE;
public String toEngineName() {
switch (this) {
@ -182,6 +182,8 @@ public interface TableIf {
return "iceberg";
case HUDI_EXTERNAL_TABLE:
return "hudi";
case DELTALAKE_EXTERNAL_TABLE:
return "deltalake";
default:
return null;
}
@ -210,6 +212,7 @@ public interface TableIf {
case ES_EXTERNAL_TABLE:
case ICEBERG_EXTERNAL_TABLE:
case PAIMON_EXTERNAL_TABLE:
case DELTALAKE_EXTERNAL_TABLE:
return "EXTERNAL TABLE";
default:
return null;

View File

@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog.external;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitDatabaseLog;
import org.apache.doris.datasource.deltalake.DeltaLakeExternalCatalog;
public class DeltaLakeExternalDataBase extends HMSExternalDatabase {
public DeltaLakeExternalDataBase(ExternalCatalog extCatalog, long id, String name) {
super(extCatalog, id, name, InitDatabaseLog.Type.DELTALAKE);
}
@Override
protected DeltaLakeExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
return new DeltaLakeExternalTable(tblId, tableName, name, (DeltaLakeExternalCatalog) extCatalog);
}
}

View File

@ -0,0 +1,159 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.deltalake.DeltaLakeExternalCatalog;
import com.google.common.collect.Lists;
import io.delta.standalone.DeltaLog;
import io.delta.standalone.actions.Metadata;
import io.delta.standalone.types.DataType;
import io.delta.standalone.types.StructField;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class DeltaLakeExternalTable extends HMSExternalTable {
public DeltaLakeExternalTable(long id, String name, String dbName,
DeltaLakeExternalCatalog catalog) {
super(id, name, dbName, catalog, TableType.DELTALAKE_EXTERNAL_TABLE);
}
@Override
protected synchronized void makeSureInitialized() {
super.makeSureInitialized();
if (!objectCreated) {
remoteTable = ((HMSExternalCatalog) catalog).getClient().getTable(dbName, name);
if (remoteTable == null) {
dlaType = DLAType.UNKNOWN;
} else {
if (supportedDeltaLakeTable()) {
dlaType = DLAType.DELTALAKE;
} else {
dlaType = DLAType.UNKNOWN;
}
}
objectCreated = true;
}
}
private boolean supportedDeltaLakeTable() {
Map<String, String> parameters = remoteTable.getParameters();
if (parameters == null) {
return false;
}
// Check that the 'spark.sql.sources.provider' parameter exists and has a value of 'delta'
return "delta".equalsIgnoreCase(parameters.get("spark.sql.sources.provider"));
}
@Override
public List<Column> initSchema() {
makeSureInitialized();
List<Column> columns;
List<FieldSchema> schema = ((DeltaLakeExternalCatalog) catalog).getClient().getSchema(dbName, name);
io.delta.standalone.types.StructType deltaSchema = getDeltaTableSchema(this);
List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
for (StructField field : deltaSchema.getFields()) {
String columnName = field.getName();
tmpSchema.add(new Column(columnName, fromDeltaTypeToDorisType(field.getDataType()),
true, null, true, null, "", true, null, -1, null));
}
columns = tmpSchema;
initPartitionColumns(columns);
return columns;
}
private static io.delta.standalone.types.StructType getDeltaTableSchema(DeltaLakeExternalTable table) {
String path = table.getRemoteTable().getSd().getLocation();
Configuration conf = HiveMetaStoreClientHelper.getConfiguration(table);
DeltaLog deltaLog = DeltaLog.forTable(conf, path);
Metadata metadata = deltaLog.snapshot().getMetadata();
io.delta.standalone.types.StructType tableSchema = metadata.getSchema();
return tableSchema;
}
private static Type fromDeltaTypeToDorisType(DataType dataType) {
String typeName = dataType.getTypeName();
switch (typeName) {
case "boolean":
return Type.BOOLEAN;
case "byte":
case "tinyint":
return Type.TINYINT;
case "smallint":
return Type.SMALLINT;
case "integer":
return Type.INT;
case "long":
return Type.BIGINT;
case "float":
return Type.FLOAT;
case "double":
return Type.DOUBLE;
case "date":
return Type.DATEV2;
case "timestamp":
return ScalarType.createDatetimeV2Type(6);
case "string":
return Type.STRING;
case "decimal":
int precision = ((io.delta.standalone.types.DecimalType) dataType).getPrecision();
int scale = ((io.delta.standalone.types.DecimalType) dataType).getScale();
return ScalarType.createDecimalV3Type(precision, scale);
case "array":
io.delta.standalone.types.ArrayType arrayType = (io.delta.standalone.types.ArrayType) dataType;
Type innerType = fromDeltaTypeToDorisType(arrayType.getElementType());
return ArrayType.create(innerType, true);
case "map":
io.delta.standalone.types.MapType mapType = (io.delta.standalone.types.MapType) dataType;
return new MapType(Type.STRING, fromDeltaTypeToDorisType(mapType.getValueType()));
case "struct":
io.delta.standalone.types.StructType deltaStructType = (io.delta.standalone.types.StructType) dataType;
ArrayList<org.apache.doris.catalog.StructField> dorisFields = new ArrayList<>();
for (io.delta.standalone.types.StructField deltaField : deltaStructType.getFields()) {
// Convert the Delta field type to a Doris type
Type dorisFieldType = fromDeltaTypeToDorisType(deltaField.getDataType());
// Create a Doris struct field with the same name and type
org.apache.doris.catalog.StructField dorisField = new org.apache.doris.catalog.StructField(
deltaField.getName(), dorisFieldType);
// Add the Doris field to the list
dorisFields.add(dorisField);
}
// Create a Doris struct type with the converted fields
return new StructType(dorisFields);
case "null":
return Type.NULL;
case "binary":
default:
return Type.UNSUPPORTED;
}
}
}

View File

@ -46,6 +46,10 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> {
super(extCatalog, id, name, InitDatabaseLog.Type.HMS);
}
public HMSExternalDatabase(ExternalCatalog extCatalog, long id, String name, InitDatabaseLog.Type type) {
super(extCatalog, id, name, type);
}
@Override
protected HMSExternalTable getExternalTable(String tableName, long tblId, ExternalCatalog catalog) {
return new HMSExternalTable(tblId, tableName, name, (HMSExternalCatalog) extCatalog);

View File

@ -102,16 +102,16 @@ public class HMSExternalTable extends ExternalTable {
SUPPORTED_HUDI_FILE_FORMATS.add("com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat");
}
private volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null;
private List<Column> partitionColumns;
protected volatile org.apache.hadoop.hive.metastore.api.Table remoteTable = null;
protected List<Column> partitionColumns;
private DLAType dlaType = DLAType.UNKNOWN;
protected DLAType dlaType = DLAType.UNKNOWN;
// No as precise as row count in TableStats, but better than none.
private long estimatedRowCount = -1;
public enum DLAType {
UNKNOWN, HIVE, HUDI, ICEBERG
UNKNOWN, HIVE, HUDI, ICEBERG, DELTALAKE
}
/**
@ -126,6 +126,10 @@ public class HMSExternalTable extends ExternalTable {
super(id, name, catalog, dbName, TableType.HMS_EXTERNAL_TABLE);
}
public HMSExternalTable(long id, String name, String dbName, HMSExternalCatalog catalog, TableType type) {
super(id, name, catalog, dbName, type);
}
public boolean isSupportedHmsTable() {
makeSureInitialized();
return dlaType != DLAType.UNKNOWN;
@ -465,7 +469,7 @@ public class HMSExternalTable extends ExternalTable {
return tmpSchema;
}
private void initPartitionColumns(List<Column> schema) {
protected void initPartitionColumns(List<Column> schema) {
List<String> partitionKeys = remoteTable.getPartitionKeys().stream().map(FieldSchema::getName)
.collect(Collectors.toList());
partitionColumns = Lists.newArrayListWithCapacity(partitionKeys.size());

View File

@ -27,6 +27,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Resource;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.deltalake.DeltaLakeExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalogFactory;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalCatalogFactory;
@ -130,6 +131,9 @@ public class CatalogFactory {
case "max_compute":
catalog = new MaxComputeExternalCatalog(catalogId, name, resource, props, comment);
break;
case "deltalake":
catalog = new DeltaLakeExternalCatalog(catalogId, name, resource, props, comment);
break;
case "test":
if (!FeConstants.runningUnitTest) {
throw new DdlException("test catalog is only for FE unit test");

View File

@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.external.DeltaLakeExternalDataBase;
import org.apache.doris.catalog.external.EsExternalDatabase;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.ExternalTable;
@ -483,6 +484,8 @@ public abstract class ExternalCatalog
return new TestExternalDatabase(this, dbId, dbName);
case PAIMON:
return new PaimonExternalDatabase(this, dbId, dbName);
case DELTALAKE:
return new DeltaLakeExternalDataBase(this, dbId, dbName);
default:
break;
}

View File

@ -78,6 +78,13 @@ public class HMSExternalCatalog extends ExternalCatalog {
catalogProperty = new CatalogProperty(resource, props);
}
public HMSExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
String comment, InitCatalogLog.Type type) {
super(catalogId, name, type, comment);
props = PropertyConverter.convertToMetaProperties(props);
catalogProperty = new CatalogProperty(resource, props);
}
@Override
public void checkProperties() throws DdlException {
super.checkProperties();

View File

@ -40,6 +40,7 @@ public class InitCatalogLog implements Writable {
PAIMON,
MAX_COMPUTE,
HUDI,
DELTALAKE,
TEST,
UNKNOWN;
}

View File

@ -40,6 +40,7 @@ public class InitDatabaseLog implements Writable {
MAX_COMPUTE,
HUDI,
PAIMON,
DELTALAKE,
TEST,
UNKNOWN;
}

View File

@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.deltalake;
import org.apache.doris.catalog.external.HMSExternalDatabase;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.metastore.api.Table;
import java.util.List;
import java.util.Map;
public class DeltaLakeExternalCatalog extends HMSExternalCatalog {
public DeltaLakeExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
String comment) {
super(catalogId, name, resource, props, comment, InitCatalogLog.Type.DELTALAKE);
}
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
HMSExternalDatabase hmsExternalDatabase = (HMSExternalDatabase) idToDb.get(dbNameToId.get(dbName));
if (hmsExternalDatabase != null && hmsExternalDatabase.isInitialized()) {
List<String> names = Lists.newArrayList();
for (HMSExternalTable table : hmsExternalDatabase.getTables()) {
String tableName = table.getName();
Table tableDetails = client.getTable(dbName, tableName);
Map<String, String> parameters = tableDetails.getParameters();
String provider = parameters.get("spark.sql.sources.provider");
if ("delta".equalsIgnoreCase(provider)) {
names.add(tableName);
}
}
return names;
} else {
List<String> allTableNames = client.getAllTables(getRealTableName(dbName));
List<String> deltaTableNames = Lists.newArrayList();
for (String tableName : allTableNames) {
Table tableDetails = client.getTable(dbName, tableName);
Map<String, String> parameters = tableDetails.getParameters();
String provider = parameters.get("spark.sql.sources.provider");
if ("delta".equalsIgnoreCase(provider)) {
deltaTableNames.add(tableName);
}
}
return deltaTableNames;
}
}
}

View File

@ -44,6 +44,8 @@ import org.apache.doris.catalog.SinglePartitionInfo;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.DeltaLakeExternalDataBase;
import org.apache.doris.catalog.external.DeltaLakeExternalTable;
import org.apache.doris.catalog.external.EsExternalDatabase;
import org.apache.doris.catalog.external.EsExternalTable;
import org.apache.doris.catalog.external.ExternalDatabase;
@ -64,6 +66,7 @@ import org.apache.doris.datasource.EsExternalCatalog;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.MaxComputeExternalCatalog;
import org.apache.doris.datasource.deltalake.DeltaLakeExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergDLFExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergGlueExternalCatalog;
@ -209,7 +212,8 @@ public class GsonUtils {
.registerSubtype(IcebergHadoopExternalCatalog.class, IcebergHadoopExternalCatalog.class.getSimpleName())
.registerSubtype(PaimonExternalCatalog.class, PaimonExternalCatalog.class.getSimpleName())
.registerSubtype(PaimonHMSExternalCatalog.class, PaimonHMSExternalCatalog.class.getSimpleName())
.registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName());
.registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName())
.registerSubtype(DeltaLakeExternalCatalog.class, DeltaLakeExternalCatalog.class.getSimpleName());
// routine load data source
private static RuntimeTypeAdapterFactory<AbstractDataSourceProperties> rdsTypeAdapterFactory =
RuntimeTypeAdapterFactory.of(
@ -228,7 +232,8 @@ public class GsonUtils {
.registerSubtype(JdbcExternalDatabase.class, JdbcExternalDatabase.class.getSimpleName())
.registerSubtype(IcebergExternalDatabase.class, IcebergExternalDatabase.class.getSimpleName())
.registerSubtype(PaimonExternalDatabase.class, PaimonExternalDatabase.class.getSimpleName())
.registerSubtype(MaxComputeExternalDatabase.class, MaxComputeExternalDatabase.class.getSimpleName());
.registerSubtype(MaxComputeExternalDatabase.class, MaxComputeExternalDatabase.class.getSimpleName())
.registerSubtype(DeltaLakeExternalDataBase.class, DeltaLakeExternalDataBase.class.getSimpleName());
private static RuntimeTypeAdapterFactory<TableIf> tblTypeAdapterFactory = RuntimeTypeAdapterFactory.of(
TableIf.class, "clazz").registerSubtype(ExternalTable.class, ExternalTable.class.getSimpleName())
@ -238,7 +243,8 @@ public class GsonUtils {
.registerSubtype(JdbcExternalTable.class, JdbcExternalTable.class.getSimpleName())
.registerSubtype(IcebergExternalTable.class, IcebergExternalTable.class.getSimpleName())
.registerSubtype(PaimonExternalTable.class, PaimonExternalTable.class.getSimpleName())
.registerSubtype(MaxComputeExternalTable.class, MaxComputeExternalTable.class.getSimpleName());
.registerSubtype(MaxComputeExternalTable.class, MaxComputeExternalTable.class.getSimpleName())
.registerSubtype(DeltaLakeExternalTable.class, DeltaLakeExternalTable.class.getSimpleName());
// runtime adapter for class "PartitionInfo"
private static RuntimeTypeAdapterFactory<PartitionInfo> partitionInfoTypeAdapterFactory

View File

@ -2020,7 +2020,7 @@ public class SingleNodePlanner {
scanNode = new HiveScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true);
break;
default:
throw new UserException("Not supported table type" + table.getType());
throw new UserException("Not supported table type: " + ((HMSExternalTable) table).getDlaType());
}
break;
case ICEBERG_EXTERNAL_TABLE:
@ -2044,7 +2044,7 @@ public class SingleNodePlanner {
scanNode = new TestExternalTableScanNode(ctx.getNextNodeId(), tblRef.getDesc());
break;
default:
throw new UserException("Not supported table type" + tblRef.getTable().getType());
throw new UserException("Not supported table type: " + tblRef.getTable().getType());
}
if (scanNode instanceof OlapScanNode || scanNode instanceof EsScanNode
|| scanNode instanceof FileQueryScanNode) {

View File

@ -273,6 +273,7 @@ under the License.
<!-- Please modify iceberg.version and avro.version together,
you can find avro version info in iceberg mvn repository -->
<iceberg.version>1.1.0</iceberg.version>
<delta.version>3.0.0rc1</delta.version>
<maxcompute.version>0.43.3-public</maxcompute.version>
<arrow.version>9.0.0</arrow.version>
<avro.version>1.11.1</avro.version>