[feature-wip](hudi) Step1: Support create hudi external table (#9559)

support create hudi table
support show create table for hudi table

### Design
1. create hudi table without schema(recommanded)
```sql
    CREATE [EXTERNAL] TABLE table_name
    ENGINE = HUDI
    [COMMENT "comment"]
    PROPERTIES (
    "hudi.database" = "hudi_db_in_hive_metastore",
    "hudi.table" = "hudi_table_in_hive_metastore",
    "hudi.hive.metastore.uris" = "thrift://127.0.0.1:9083"
    );
```

2. create hudi table with schema
```sql
    CREATE [EXTERNAL] TABLE table_name
    [(column_definition1[, column_definition2, ...])]
    ENGINE = HUDI
    [COMMENT "comment"]
    PROPERTIES (
    "hudi.database" = "hudi_db_in_hive_metastore",
    "hudi.table" = "hudi_table_in_hive_metastore",
    "hudi.hive.metastore.uris" = "thrift://127.0.0.1:9083"
    );
```
When create hudi table with schema, the columns must exist in corresponding table in hive metastore.
This commit is contained in:
dujl
2022-05-17 11:30:23 +08:00
committed by GitHub
parent bee5c2f8aa
commit 72e0042efb
19 changed files with 782 additions and 10 deletions

View File

@ -87,6 +87,7 @@ public class CreateTableStmt extends DdlStmt {
engineNames.add("elasticsearch");
engineNames.add("hive");
engineNames.add("iceberg");
engineNames.add("hudi");
}
// for backup. set to -1 for normal use
@ -166,7 +167,7 @@ public class CreateTableStmt extends DdlStmt {
this.rollupAlterClauseList = rollupAlterClauseList == null ? new ArrayList<>() : rollupAlterClauseList;
}
// This is for iceberg table, which has no column schema
// This is for iceberg/hudi table, which has no column schema
public CreateTableStmt(boolean ifNotExists,
boolean isExternal,
TableName tableName,
@ -358,7 +359,8 @@ public class CreateTableStmt extends DdlStmt {
}
// analyze column def
if (!engineName.equals("iceberg") && (columnDefs == null || columnDefs.isEmpty())) {
if (!(engineName.equals("iceberg") || engineName.equals("hudi"))
&& (columnDefs == null || columnDefs.isEmpty())) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLE_MUST_HAVE_COLUMNS);
}
// add a hidden column as delete flag for unique table
@ -480,7 +482,8 @@ public class CreateTableStmt extends DdlStmt {
}
if (engineName.equals("mysql") || engineName.equals("odbc") || engineName.equals("broker")
|| engineName.equals("elasticsearch") || engineName.equals("hive") || engineName.equals("iceberg")) {
|| engineName.equals("elasticsearch") || engineName.equals("hive")
|| engineName.equals("iceberg") || engineName.equals("hudi")) {
if (!isExternal) {
// this is for compatibility
isExternal = true;

View File

@ -150,6 +150,9 @@ import org.apache.doris.deploy.impl.AmbariDeployManager;
import org.apache.doris.deploy.impl.K8sDeployManager;
import org.apache.doris.deploy.impl.LocalFileDeployManager;
import org.apache.doris.external.elasticsearch.EsRepository;
import org.apache.doris.external.hudi.HudiProperty;
import org.apache.doris.external.hudi.HudiTable;
import org.apache.doris.external.hudi.HudiUtils;
import org.apache.doris.external.iceberg.IcebergCatalogMgr;
import org.apache.doris.external.iceberg.IcebergTableCreationRecordMgr;
import org.apache.doris.ha.BDBHA;
@ -3076,6 +3079,9 @@ public class Catalog {
} else if (engineName.equalsIgnoreCase("iceberg")) {
IcebergCatalogMgr.createIcebergTable(db, stmt);
return;
} else if (engineName.equalsIgnoreCase("hudi")) {
createHudiTable(db, stmt);
return;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, engineName);
}
@ -4105,6 +4111,44 @@ public class Catalog {
LOG.info("successfully create table[{}-{}]", tableName, tableId);
}
private void createHudiTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
List<Column> columns = stmt.getColumns();
long tableId = getNextId();
HudiTable hudiTable = new HudiTable(tableId, tableName, columns, stmt.getProperties());
hudiTable.setComment(stmt.getComment());
// check hudi properties in create stmt.
HudiUtils.validateCreateTable(hudiTable);
// check hudi table whether exists in hive database
String metastoreUris = hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS);
HiveMetaStoreClient hiveMetaStoreClient = HiveMetaStoreClientHelper.getClient(metastoreUris);
if (!HiveMetaStoreClientHelper.tableExists(hiveMetaStoreClient,
hudiTable.getHmsDatabaseName(), hudiTable.getHmsTableName())) {
throw new DdlException(String.format("Table [%s] dose not exist in Hive Metastore.",
hudiTable.getHmsTableIdentifer()));
}
org.apache.hadoop.hive.metastore.api.Table hiveTable = HiveMetaStoreClientHelper.getTable(
hudiTable.getHmsDatabaseName(),
hudiTable.getHmsTableName(),
metastoreUris);
if (!HudiUtils.isHudiTable(hiveTable)) {
throw new DdlException(String.format("Table [%s] is not a hudi table.", hudiTable.getHmsTableIdentifer()));
}
// after support snapshot query for mor, we should remove the check.
if (HudiUtils.isHudiRealtimeTable(hiveTable)) {
throw new DdlException(String.format("Can not support hudi realtime table.", hudiTable.getHmsTableName()));
}
// check table's schema when user specify the schema
if (!hudiTable.getFullSchema().isEmpty()) {
HudiUtils.validateColumns(hudiTable, hiveTable);
}
// check hive table if exists in doris database
if (!db.createTableWithLock(hudiTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
}
public static void getDdlStmt(Table table, List<String> createTableStmt, List<String> addPartitionStmt,
List<String> createRollupStmt, boolean separatePartition, boolean hidePassword) {
getDdlStmt(null, null, table, createTableStmt, addPartitionStmt, createRollupStmt, separatePartition, hidePassword);
@ -4412,6 +4456,15 @@ public class Catalog {
sb.append("\"iceberg.table\" = \"").append(icebergTable.getIcebergTbl()).append("\",\n");
sb.append(new PrintableMap<>(icebergTable.getIcebergProperties(), " = ", true, true, false).toString());
sb.append("\n)");
} else if (table.getType() == TableType.HUDI) {
HudiTable hudiTable = (HudiTable) table;
if (!Strings.isNullOrEmpty(table.getComment())) {
sb.append("\nCOMMENT \"").append(table.getComment(true)).append("\"");
}
// properties
sb.append("\nPROPERTIES (\n");
sb.append(new PrintableMap<>(hudiTable.getTableProperties(), " = ", true, true, false).toString());
sb.append("\n)");
}
createTableStmt.add(sb.toString());

View File

@ -286,6 +286,29 @@ public class HiveMetaStoreClientHelper {
return table;
}
/**
* Get hive table with dbName and tableName.
*
* @param dbName database name
* @param tableName table name
* @param metaStoreUris hive metastore uris
* @return HiveTable
* @throws DdlException when get table from hive metastore failed.
*/
public static Table getTable(String dbName, String tableName, String metaStoreUris) throws DdlException {
HiveMetaStoreClient client = getClient(metaStoreUris);
Table table;
try {
table = client.getTable(dbName, tableName);
} catch (TException e) {
LOG.warn("Hive metastore thrift exception: {}", e.getMessage());
throw new DdlException("Connect hive metastore failed. Error: " + e.getMessage());
} finally {
client.close();
}
return table;
}
/**
* Convert Doris expr to Hive expr, only for partition column
* @param dorisExpr

View File

@ -25,6 +25,7 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SqlUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.external.hudi.HudiTable;
import org.apache.doris.thrift.TTableDescriptor;
import com.google.common.base.Preconditions;
@ -67,7 +68,8 @@ public class Table extends MetaObject implements Writable {
BROKER,
ELASTICSEARCH,
HIVE,
ICEBERG
ICEBERG,
HUDI
}
protected long id;
@ -340,6 +342,8 @@ public class Table extends MetaObject implements Writable {
table = new HiveTable();
} else if (type == TableType.ICEBERG) {
table = new IcebergTable();
} else if (type == TableType.HUDI) {
table = new HudiTable();
} else {
throw new IOException("Unknown table type: " + type.name());
}
@ -432,6 +436,8 @@ public class Table extends MetaObject implements Writable {
return "ElasticSearch";
case HIVE:
return "Hive";
case HUDI:
return "Hudi";
default:
return null;
}
@ -451,6 +457,7 @@ public class Table extends MetaObject implements Writable {
case BROKER:
case ELASTICSEARCH:
case HIVE:
case HUDI:
return "EXTERNAL TABLE";
default:
return null;

View File

@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.hudi;
/**
* Hudi property contains information to connect a remote hudi db or table.
*/
public class HudiProperty {
public static final String HUDI_DATABASE = "hudi.database";
public static final String HUDI_TABLE = "hudi.table";
public static final String HUDI_HIVE_METASTORE_URIS = "hudi.hive.metastore.uris";
public static final String HUDI_CATALOG_TYPE = "hudi.catalog.type";
}

View File

@ -0,0 +1,125 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.hudi;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.io.Text;
import org.apache.doris.thrift.THudiTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* External Hudi table.
*/
public class HudiTable extends Table {
private static final Logger LOG = LogManager.getLogger(HudiTable.class);
// table properties of this hudi table
private Map<String, String> tableProperties = Maps.newHashMap();
// remote Hudi database name in hive metastore
private String hmsDatabaseName;
// remote Hudi table name in hive metastore
private String hmsTableName;
public HudiTable() {
super(TableType.HUDI);
}
/**
* Generate a Hudi Table with id, name, schema, properties.
*
* @param id table id
* @param tableName table name
* @param fullSchema table's schema
* @param tableProperties table's properties
*/
public HudiTable(long id, String tableName, List<Column> fullSchema, Map<String, String> tableProperties) {
super(id, tableName, TableType.HUDI, fullSchema);
this.tableProperties = tableProperties;
this.hmsDatabaseName = tableProperties.get(HudiProperty.HUDI_DATABASE);
this.hmsTableName = tableProperties.get(HudiProperty.HUDI_TABLE);
}
public String getHmsDatabaseName() {
return hmsDatabaseName;
}
public String getHmsTableName() {
return hmsTableName;
}
public Map<String, String> getTableProperties() {
return tableProperties;
}
public String getHmsTableIdentifer() {
return String.format("%s.%s", hmsDatabaseName, hmsTableName);
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
Text.writeString(out, hmsDatabaseName);
Text.writeString(out, hmsTableName);
out.writeInt(tableProperties.size());
for (Map.Entry<String, String> entry : tableProperties.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
hmsDatabaseName = Text.readString(in);
hmsTableName = Text.readString(in);
int size = in.readInt();
for (int i = 0; i < size; i++) {
String key = Text.readString(in);
String value = Text.readString(in);
tableProperties.put(key, value);
}
}
@Override
public TTableDescriptor toThrift() {
THudiTable thriftHudiTable = new THudiTable();
thriftHudiTable.setDbName(getHmsDatabaseName());
thriftHudiTable.setTableName(getHmsTableName());
thriftHudiTable.setProperties(getTableProperties());
TTableDescriptor thriftTableDescriptor = new TTableDescriptor(getId(), TTableType.HUDI_TABLE,
fullSchema.size(), 0, getName(), "");
thriftTableDescriptor.setHudiTable(thriftHudiTable);
return thriftTableDescriptor;
}
}

View File

@ -0,0 +1,128 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.external.hudi;
import org.apache.doris.common.DdlException;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.hudi.hadoop.HoodieParquetInputFormat;
import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Hudi utils.
*/
public class HudiUtils {
private static final String PROPERTY_MISSING_MSG =
"Hudi table %s is null. Please add properties('%s'='xxx') when create table";
/**
* check hudi table properties.
*/
public static void validateCreateTable(HudiTable table) throws DdlException {
if (table.getTableProperties() == null) {
throw new DdlException("Please set properties of hudi table, "
+ "they are: database, table and 'hive.metastore.uris'");
}
Map<String, String> copiedProps = Maps.newHashMap(table.getTableProperties());
String hiveDb = copiedProps.get(HudiProperty.HUDI_DATABASE);
if (Strings.isNullOrEmpty(hiveDb)) {
throw new DdlException(String.format(PROPERTY_MISSING_MSG,
HudiProperty.HUDI_DATABASE, HudiProperty.HUDI_DATABASE));
}
copiedProps.remove(HudiProperty.HUDI_DATABASE);
String hiveTable = copiedProps.get(HudiProperty.HUDI_TABLE);
if (Strings.isNullOrEmpty(hiveTable)) {
throw new DdlException(String.format(PROPERTY_MISSING_MSG,
HudiProperty.HUDI_TABLE, HudiProperty.HUDI_TABLE));
}
copiedProps.remove(HudiProperty.HUDI_TABLE);
// check hive properties
// hive.metastore.uris
String hiveMetastoreUris = copiedProps.get(HudiProperty.HUDI_HIVE_METASTORE_URIS);
if (Strings.isNullOrEmpty(hiveMetastoreUris)) {
throw new DdlException(String.format(PROPERTY_MISSING_MSG,
HudiProperty.HUDI_HIVE_METASTORE_URIS, HudiProperty.HUDI_HIVE_METASTORE_URIS));
}
copiedProps.remove(HudiProperty.HUDI_HIVE_METASTORE_URIS);
if (!copiedProps.isEmpty()) {
throw new DdlException("Unknown table properties: " + copiedProps.toString());
}
}
/**
* check a hiveTable is hudi table or not.
*
* @param hiveTable hive metastore table
* @return true when hiveTable is hudi table, false when it is not
*/
public static boolean isHudiTable(org.apache.hadoop.hive.metastore.api.Table hiveTable) {
String inputFormat = hiveTable.getSd().getInputFormat();
if (HoodieParquetInputFormat.class.getName().equals(inputFormat)
|| HoodieParquetRealtimeInputFormat.class.getName().equals(inputFormat)) {
return true;
}
return false;
}
/**
* check whether the table is hudi realtime table.
*
* @param hiveTable hive metastore table
* @return true when table is hudi table
*/
public static boolean isHudiRealtimeTable(org.apache.hadoop.hive.metastore.api.Table hiveTable) {
String inputFormat = hiveTable.getSd().getInputFormat();
if (HoodieParquetRealtimeInputFormat.class.getName().equals(inputFormat)) {
return true;
}
return false;
}
/**
* Check if there are duplicate columns in hudi table.
* check if columns of hudi table exist in hive table.
*
* @param table hudi table to be checked
* @param hiveTable the corresponding hive table
* @throws DdlException when hudi table's column(s) didn't exist in hive table
*/
public static void validateColumns(HudiTable table,
org.apache.hadoop.hive.metastore.api.Table hiveTable) throws DdlException {
Set<String> hudiColumnNames = table.getFullSchema().stream()
.map(x -> x.getName()).collect(Collectors.toSet());
Set<String> hiveTableColumnNames = hiveTable.getSd().getCols()
.stream().map(x -> x.getName()).collect(Collectors.toSet());
hudiColumnNames.removeAll(hiveTableColumnNames);
if (hudiColumnNames.size() > 0) {
throw new DdlException(String.format("Hudi table's column(s): {%s} didn't exist in hive table. ",
String.join(", ", hudiColumnNames)));
}
}
}

View File

@ -398,6 +398,7 @@ public class ShowExecutor {
rowSet.add(Lists.newArrayList("HIVE", "YES", "HIVE database which data is in it", "NO", "NO", "NO"));
rowSet.add(Lists.newArrayList("ICEBERG", "YES", "ICEBERG data lake which data is in it", "NO", "NO", "NO"));
rowSet.add(Lists.newArrayList("ODBC", "YES", "ODBC driver which data we can connect", "NO", "NO", "NO"));
rowSet.add(Lists.newArrayList("HUDI", "YES", "HUDI data lake which data is in it", "NO", "NO", "NO"));
// Only success
resultSet = new ShowResultSet(showStmt.getMetaData(), rowSet);

View File

@ -274,4 +274,21 @@ public class CreateTableStmtTest {
"\"iceberg.hive.metastore.uris\" = \"thrift://127.0.0.1:9087\",\n" +
"\"iceberg.table\" = \"test\")", stmt.toString());
}
@Test
public void testCreateHudiTable() throws UserException {
Map<String, String> properties = new HashMap<>();
properties.put("hudi.database", "doris");
properties.put("hudi.table", "test");
properties.put("hudi.hive.metastore.uris", "thrift://127.0.0.1:9087");
CreateTableStmt stmt = new CreateTableStmt(false, true, tblName, "hudi", properties, "");
stmt.analyze(analyzer);
Assert.assertEquals("CREATE EXTERNAL TABLE `testCluster:db1`.`table1` (\n"
+ "\n"
+ ") ENGINE = hudi\n"
+ "PROPERTIES (\"hudi.database\" = \"doris\",\n"
+ "\"hudi.hive.metastore.uris\" = \"thrift://127.0.0.1:9087\",\n"
+ "\"hudi.table\" = \"test\")", stmt.toString());
}
}