diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index d4b83ccb51..3b13de1a15 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2216,21 +2216,10 @@ public class Config extends ConfigBase { "Enable external table DDL"}) public static boolean enable_external_ddl = false; - @ConfField(mutable = true, masterOnly = true, description = { - "Hive创建外部表默认指定的input format", - "Default hive input format for creating table."}) - public static String hive_default_input_format = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; - - @ConfField(mutable = true, masterOnly = true, description = { - "Hive创建外部表默认指定的output format", - "Default hive output format for creating table."}) - public static String hive_default_output_format = "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"; - - @ConfField(mutable = true, masterOnly = true, description = { - "Hive创建外部表默认指定的SerDe类", - "Default hive serde class for creating table."}) - public static String hive_default_serde = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; + "Hive创建外部表默认指定的文件格式", + "Default hive file format for creating table."}) + public static String hive_default_file_format = "orc"; @ConfField public static int statistics_sql_parallel_exec_instance_num = 1; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java index ffae9420ed..44dba04157 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.analysis.DropDbStmt; import org.apache.doris.analysis.DropTableStmt; +import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; @@ -176,6 +177,8 @@ public interface CatalogIf { return log; } + TableName getTableNameByTableId(Long tableId); + // Return a copy of all db collection. Collection> getAllDbs(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index aa6ef1f14e..d9af555711 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -21,6 +21,7 @@ import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.analysis.DropDbStmt; import org.apache.doris.analysis.DropTableStmt; +import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; @@ -395,6 +396,16 @@ public abstract class ExternalCatalog } } + public TableName getTableNameByTableId(Long tableId) { + for (DatabaseIf db : idToDb.values()) { + TableIf table = db.getTableNullable(tableId); + if (table != null) { + return new TableName(getName(), db.getFullName(), table.getName()); + } + } + return null; + } + @Override public String getResource() { return catalogProperty.getResource(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 3057de0a38..afb3d73f35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -307,7 +307,7 @@ public class InternalCatalog implements CatalogIf { for (Database db : fullNameToDb.values()) { Table table = db.getTableNullable(tableId); if (table != null) { - return new TableName("", db.getFullName(), table.getName()); + return new TableName(INTERNAL_CATALOG_NAME, db.getFullName(), table.getName()); } } return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java index b55ed7bdf0..d8daeb155c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java @@ -51,6 +51,8 @@ public interface HMSCachedClient { List listPartitionNames(String dbName, String tblName); + List listPartitions(String dbName, String tblName); + List listPartitionNames(String dbName, String tblName, long maxListPartitionNum); Partition getPartition(String dbName, String tblName, List partitionValues); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index 886d6d76fa..091cd51b23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -125,17 +125,13 @@ public class HiveMetadataOps implements ExternalMetadataOps { } try { Map props = stmt.getExtProperties(); - String inputFormat = props.getOrDefault("input_format", Config.hive_default_input_format); - String outputFormat = props.getOrDefault("output_format", Config.hive_default_output_format); - String serDe = props.getOrDefault("serde", Config.hive_default_serde); + String fileFormat = props.getOrDefault("file_format", Config.hive_default_file_format); HiveTableMetadata catalogTable = HiveTableMetadata.of(dbName, tblName, stmt.getColumns(), parsePartitionKeys(props), props, - inputFormat, - outputFormat, - serDe); + fileFormat); client.createTable(catalogTable, stmt.isSetIfNotExists()); db.setUnInitialized(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java index 8edd303318..fde0a2d4d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTableMetadata.java @@ -30,9 +30,7 @@ public class HiveTableMetadata implements TableMetadata { private String tableName; private List columns; private List partitionKeys; - private String inputFormat; - private String outputFormat; - private String serDe; + private String fileFormat; private Map properties; // private String viewSql; @@ -41,16 +39,12 @@ public class HiveTableMetadata implements TableMetadata { List columns, List partitionKeys, Map props, - String inputFormat, - String outputFormat, - String serDe) { + String fileFormat) { this.dbName = dbName; this.tableName = tblName; this.columns = columns; this.partitionKeys = partitionKeys; - this.inputFormat = inputFormat; - this.outputFormat = outputFormat; - this.serDe = serDe; + this.fileFormat = fileFormat; this.properties = props; } @@ -77,16 +71,8 @@ public class HiveTableMetadata implements TableMetadata { return partitionKeys; } - public String getInputFormat() { - return inputFormat; - } - - public String getOutputFormat() { - return outputFormat; - } - - public String getSerDe() { - return serDe; + public String getFileFormat() { + return fileFormat; } public static HiveTableMetadata of(String dbName, @@ -94,9 +80,7 @@ public class HiveTableMetadata implements TableMetadata { List columns, List partitionKeys, Map props, - String inputFormat, - String outputFormat, String serDe) { - return new HiveTableMetadata(dbName, tblName, columns, partitionKeys, props, - inputFormat, outputFormat, serDe); + String fileFormat) { + return new HiveTableMetadata(dbName, tblName, columns, partitionKeys, props, fileFormat); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java index c18fa30189..0259784b7b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java @@ -123,6 +123,10 @@ public class PostgreSQLJdbcHMSCachedClient extends JdbcHMSCachedClient { return listPartitionNames(dbName, tblName, (long) -1); } + public List listPartitions(String dbName, String tblName) { + return getPartitionsByNames(dbName, tblName, ImmutableList.of()); + } + @Override public List listPartitionNames(String dbName, String tblName, long maxListPartitionNum) { String sql = String.format("SELECT \"PART_NAME\" from \"PARTITIONS\" WHERE \"TBL_ID\" = (" @@ -173,15 +177,26 @@ public class PostgreSQLJdbcHMSCachedClient extends JdbcHMSCachedClient { private List getPartitionsByNames(String dbName, String tblName, List partitionNames) { List partitionNamesWithQuote = partitionNames.stream().map(partitionName -> "'" + partitionName + "'") .collect(Collectors.toList()); - String partitionNamesString = Joiner.on(", ").join(partitionNamesWithQuote); - String sql = String.format("SELECT \"PART_ID\", \"PARTITIONS\".\"CREATE_TIME\"," - + " \"PARTITIONS\".\"LAST_ACCESS_TIME\"," - + " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM \"PARTITIONS\"" - + " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = \"PARTITIONS\".\"TBL_ID\"" - + " join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\"" - + " WHERE \"DBS\".\"NAME\" = '%s' AND \"TBLS\".\"TBL_NAME\"='%s'" - + " AND \"PART_NAME\" in (%s);", - dbName, tblName, partitionNamesString); + String sql; + if (partitionNamesWithQuote.isEmpty()) { + sql = String.format("SELECT \"PART_ID\", \"PARTITIONS\".\"CREATE_TIME\"," + + " \"PARTITIONS\".\"LAST_ACCESS_TIME\"," + + " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM \"PARTITIONS\"" + + " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = \"PARTITIONS\".\"TBL_ID\"" + + " join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\"" + + " WHERE \"DBS\".\"NAME\" = '%s' AND \"TBLS\".\"TBL_NAME\"='%s';", + dbName, tblName); + } else { + String partitionNamesString = Joiner.on(", ").join(partitionNamesWithQuote); + sql = String.format("SELECT \"PART_ID\", \"PARTITIONS\".\"CREATE_TIME\"," + + " \"PARTITIONS\".\"LAST_ACCESS_TIME\"," + + " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM \"PARTITIONS\"" + + " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = \"PARTITIONS\".\"TBL_ID\"" + + " join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\"" + + " WHERE \"DBS\".\"NAME\" = '%s' AND \"TBLS\".\"TBL_NAME\"='%s'" + + " AND \"PART_NAME\" in (%s);", + dbName, tblName, partitionNamesString); + } if (LOG.isDebugEnabled()) { LOG.debug("getPartitionsByNames exec sql: {}", sql); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index cb5328395c..d4f63c5a8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -201,9 +201,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient { // table.setRetention(0); String location = hiveTable.getProperties().get("external_location"); table.setSd(toHiveStorageDesc(hiveTable.getColumns(), - hiveTable.getInputFormat(), - hiveTable.getOutputFormat(), - hiveTable.getSerDe(), + hiveTable.getFileFormat(), location)); table.setPartitionKeys(hiveTable.getPartitionKeys()); // table.setViewOriginalText(hiveTable.getViewSql()); @@ -213,15 +211,10 @@ public class ThriftHMSCachedClient implements HMSCachedClient { return table; } - private static StorageDescriptor toHiveStorageDesc(List columns, String inputFormat, String outputFormat, - String serDe, String location) { + private static StorageDescriptor toHiveStorageDesc(List columns, String fileFormat, String location) { StorageDescriptor sd = new StorageDescriptor(); sd.setCols(toHiveColumns(columns)); - SerDeInfo serDeInfo = new SerDeInfo(); - serDeInfo.setSerializationLib(serDe); - sd.setSerdeInfo(serDeInfo); - sd.setInputFormat(inputFormat); - sd.setOutputFormat(outputFormat); + setFileFormat(fileFormat, sd); if (StringUtils.isNotEmpty(location)) { sd.setLocation(location); } @@ -231,6 +224,28 @@ public class ThriftHMSCachedClient implements HMSCachedClient { return sd; } + private static void setFileFormat(String fileFormat, StorageDescriptor sd) { + String inputFormat; + String outputFormat; + String serDe; + if (fileFormat.equalsIgnoreCase("orc")) { + inputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; + outputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"; + serDe = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; + } else if (fileFormat.equalsIgnoreCase("parquet")) { + inputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"; + outputFormat = "'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"; + serDe = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"; + } else { + throw new IllegalArgumentException("Creating table with an unsupported file format: " + fileFormat); + } + SerDeInfo serDeInfo = new SerDeInfo(); + serDeInfo.setSerializationLib(serDe); + sd.setSerdeInfo(serDeInfo); + sd.setInputFormat(inputFormat); + sd.setOutputFormat(outputFormat); + } + private static List toHiveColumns(List columns) { List result = new ArrayList<>(); for (Column column : columns) { @@ -297,6 +312,19 @@ public class ThriftHMSCachedClient implements HMSCachedClient { return listPartitionNames(dbName, tblName, MAX_LIST_PARTITION_NUM); } + public List listPartitions(String dbName, String tblName) { + try (ThriftHMSClient client = getClient()) { + try { + return ugiDoAs(() -> client.client.listPartitions(dbName, tblName, MAX_LIST_PARTITION_NUM)); + } catch (Exception e) { + client.setThrowable(e); + throw e; + } + } catch (Exception e) { + throw new HMSClientException("failed to check if table %s in db %s exists", e, tblName, dbName); + } + } + @Override public List listPartitionNames(String dbName, String tblName, long maxListPartitionNum) { // list all parts when the limit is greater than the short maximum diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundHiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundHiveTableSink.java new file mode 100644 index 0000000000..0b56c2b681 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundHiveTableSink.java @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.analyzer; + +import org.apache.doris.nereids.exceptions.UnboundException; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.UnboundLogicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.BlockFuncDepsPropagation; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** + * Represent an hive table sink plan node that has not been bound. + */ +public class UnboundHiveTableSink extends UnboundLogicalSink + implements Unbound, Sink, BlockFuncDepsPropagation { + private final List hints; + private final List partitions; + + public UnboundHiveTableSink(List nameParts, List colNames, List hints, + List partitions, CHILD_TYPE child) { + this(nameParts, colNames, hints, partitions, DMLCommandType.NONE, + Optional.empty(), Optional.empty(), child); + } + + /** + * constructor + */ + public UnboundHiveTableSink(List nameParts, List colNames, List hints, + List partitions, + DMLCommandType dmlCommandType, + Optional groupExpression, + Optional logicalProperties, + CHILD_TYPE child) { + super(nameParts, PlanType.LOGICAL_UNBOUND_HIVE_TABLE_SINK, ImmutableList.of(), groupExpression, + logicalProperties, colNames, dmlCommandType, child); + this.hints = Utils.copyRequiredList(hints); + this.partitions = Utils.copyRequiredList(partitions); + } + + public List getColNames() { + return colNames; + } + + public List getPartitions() { + return partitions; + } + + public List getHints() { + return hints; + } + + @Override + public Plan withChildren(List children) { + Preconditions.checkArgument(children.size() == 1, + "UnboundHiveTableSink only accepts one child"); + return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, Optional.empty(), children.get(0)); + } + + @Override + public UnboundHiveTableSink withOutputExprs(List outputExprs) { + throw new UnboundException("could not call withOutputExprs on UnboundHiveTableSink"); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitUnboundHiveTableSink(this, context); + } + + @Override + public List getExpressions() { + throw new UnsupportedOperationException(this.getClass().getSimpleName() + " don't support getExpression()"); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UnboundHiveTableSink that = (UnboundHiveTableSink) o; + return Objects.equals(nameParts, that.nameParts) + && Objects.equals(colNames, that.colNames) + && Objects.equals(hints, that.hints) + && Objects.equals(partitions, that.partitions); + } + + @Override + public int hashCode() { + return Objects.hash(nameParts, colNames, hints, partitions); + } + + @Override + public Plan withGroupExpression(Optional groupExpression) { + return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional groupExpression, + Optional logicalProperties, List children) { + return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, groupExpression, logicalProperties, children.get(0)); + } + + @Override + public LogicalProperties computeLogicalProperties() { + return UnboundLogicalProperties.INSTANCE; + } + + @Override + public List computeOutput() { + throw new UnboundException("output"); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java index 89d67dc376..2a4416686c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java @@ -29,7 +29,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Sink; import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; -import org.apache.doris.nereids.trees.plans.logical.LogicalSink; +import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; @@ -43,11 +43,8 @@ import java.util.Optional; /** * Represent an olap table sink plan node that has not been bound. */ -public class UnboundTableSink extends LogicalSink +public class UnboundTableSink extends UnboundLogicalSink implements Unbound, Sink, BlockFuncDepsPropagation { - - private final List nameParts; - private final List colNames; private final List hints; private final boolean temporaryPartition; private final List partitions; @@ -60,31 +57,6 @@ public class UnboundTableSink extends LogicalSink nameParts, List colNames, List hints, - boolean temporaryPartition, List partitions, CHILD_TYPE child) { - this(nameParts, colNames, hints, temporaryPartition, partitions, - false, DMLCommandType.NONE, Optional.empty(), Optional.empty(), child); - } - - public UnboundTableSink(List nameParts, List colNames, List hints, - List partitions, boolean isPartialUpdate, CHILD_TYPE child) { - this(nameParts, colNames, hints, false, partitions, isPartialUpdate, DMLCommandType.NONE, - Optional.empty(), Optional.empty(), child); - } - - public UnboundTableSink(List nameParts, List colNames, List hints, - boolean temporaryPartition, List partitions, boolean isPartialUpdate, CHILD_TYPE child) { - this(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, DMLCommandType.NONE, - Optional.empty(), Optional.empty(), child); - } - - public UnboundTableSink(List nameParts, List colNames, List hints, - boolean temporaryPartition, List partitions, - boolean isPartialUpdate, DMLCommandType dmlCommandType, CHILD_TYPE child) { - this(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, dmlCommandType, - Optional.empty(), Optional.empty(), child); - } - /** * constructor */ @@ -93,9 +65,8 @@ public class UnboundTableSink extends LogicalSink groupExpression, Optional logicalProperties, CHILD_TYPE child) { - super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, logicalProperties, child); - this.nameParts = Utils.copyRequiredList(nameParts); - this.colNames = Utils.copyRequiredList(colNames); + super(nameParts, PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, + logicalProperties, colNames, dmlCommandType, child); this.hints = Utils.copyRequiredList(hints); this.temporaryPartition = temporaryPartition; this.partitions = Utils.copyRequiredList(partitions); @@ -103,14 +74,6 @@ public class UnboundTableSink extends LogicalSink getColNames() { - return colNames; - } - - public List getNameParts() { - return nameParts; - } - public boolean isTemporaryPartition() { return temporaryPartition; } @@ -127,10 +90,6 @@ public class UnboundTableSink extends LogicalSink children) { Preconditions.checkArgument(children.size() == 1, "UnboundOlapTableSink only accepts one child"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java new file mode 100644 index 0000000000..335d2f5803 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.analyzer; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalSink; +import org.apache.doris.nereids.util.RelationUtil; +import org.apache.doris.qe.ConnectContext; + +import java.util.List; +import java.util.Optional; + +/** + * Create unbound table sink + */ +public class UnboundTableSinkCreator { + + /** + * create unbound sink without DML command + */ + public static LogicalSink createUnboundTableSink(List nameParts, + List colNames, List hints, List partitions, Plan query) + throws UserException { + String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0); + CatalogIf curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); + if (curCatalog instanceof InternalCatalog) { + return new UnboundTableSink<>(nameParts, colNames, hints, partitions, query); + } else if (curCatalog instanceof HMSExternalCatalog) { + return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, query); + } + throw new UserException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported."); + } + + /** + * create unbound sink for DML plan + */ + public static LogicalSink createUnboundTableSink(List nameParts, + List colNames, List hints, boolean temporaryPartition, List partitions, + boolean isPartialUpdate, DMLCommandType dmlCommandType, LogicalPlan plan) { + String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0); + CatalogIf curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); + if (curCatalog instanceof InternalCatalog) { + return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, + isPartialUpdate, dmlCommandType, Optional.empty(), + Optional.empty(), plan); + } else if (curCatalog instanceof HMSExternalCatalog) { + return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, + dmlCommandType, Optional.empty(), Optional.empty(), plan); + } + throw new RuntimeException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported."); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 37d0e25f5a..2b17f31633 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -71,6 +71,8 @@ import org.apache.doris.nereids.properties.DistributionSpecHash; import org.apache.doris.nereids.properties.DistributionSpecReplicated; import org.apache.doris.nereids.properties.DistributionSpecStorageAny; import org.apache.doris.nereids.properties.DistributionSpecStorageGather; +import org.apache.doris.nereids.properties.DistributionSpecTableSinkHashPartitioned; +import org.apache.doris.nereids.properties.DistributionSpecTableSinkRandomPartitioned; import org.apache.doris.nereids.properties.DistributionSpecTabletIdShuffle; import org.apache.doris.nereids.properties.OrderKey; import org.apache.doris.nereids.properties.PhysicalProperties; @@ -163,6 +165,7 @@ import org.apache.doris.planner.ExceptNode; import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.HashJoinNode; import org.apache.doris.planner.HashJoinNode.DistributionMode; +import org.apache.doris.planner.HiveTableSink; import org.apache.doris.planner.IntersectNode; import org.apache.doris.planner.JoinNodeBase; import org.apache.doris.planner.MultiCastDataSink; @@ -432,6 +435,20 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor hiveTableSink, PlanTranslatorContext context) { PlanFragment rootFragment = hiveTableSink.child().accept(this, context); + rootFragment.setOutputPartition(DataPartition.UNPARTITIONED); + + TupleDescriptor hiveTuple = context.generateTupleDesc(); + List targetTableColumns = hiveTableSink.getTargetTable().getFullSchema(); + for (Column column : targetTableColumns) { + SlotDescriptor slotDesc = context.addSlotDesc(hiveTuple); + slotDesc.setIsMaterialized(true); + slotDesc.setType(column.getType()); + slotDesc.setColumn(column); + slotDesc.setIsNullable(column.isAllowNull()); + slotDesc.setAutoInc(column.isAutoInc()); + } + HiveTableSink sink = new HiveTableSink(hiveTableSink.getTargetTable()); + rootFragment.setSink(sink); return rootFragment; } @@ -2568,6 +2585,19 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor partitionExprs = Lists.newArrayList(); + List partitionExprIds = partitionSpecHash.getOutputColExprIds(); + for (ExprId partitionExprId : partitionExprIds) { + if (childOutputIds.contains(partitionExprId)) { + partitionExprs.add(context.findSlotRef(partitionExprId)); + } + } + return new DataPartition(TPartitionType.TABLE_SINK_HASH_PARTITIONED, partitionExprs); + } else if (distributionSpec instanceof DistributionSpecTableSinkRandomPartitioned) { + return new DataPartition(TPartitionType.TABLE_SINK_RANDOM_PARTITIONED); } else { throw new RuntimeException("Unknown DistributionSpec: " + distributionSpec); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index a5f16fc87e..4fe38f409b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -205,7 +205,7 @@ import org.apache.doris.nereids.analyzer.UnboundResultSink; import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundStar; import org.apache.doris.nereids.analyzer.UnboundTVFRelation; -import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator; import org.apache.doris.nereids.analyzer.UnboundVariable; import org.apache.doris.nereids.analyzer.UnboundVariable.VariableType; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -421,6 +421,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat; import org.apache.doris.nereids.trees.plans.logical.LogicalSelectHint; +import org.apache.doris.nereids.trees.plans.logical.LogicalSink; import org.apache.doris.nereids.trees.plans.logical.LogicalSort; import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias; import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; @@ -511,10 +512,11 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor { boolean isOverwrite = ctx.INTO() == null; ImmutableList.Builder tableName = ImmutableList.builder(); if (null != ctx.tableName) { - tableName.addAll(visitMultipartIdentifier(ctx.tableName)); + List nameParts = visitMultipartIdentifier(ctx.tableName); + tableName.addAll(nameParts); } else if (null != ctx.tableId) { // process group commit insert table command send by be - TableName name = Env.getCurrentEnv().getInternalCatalog() + TableName name = Env.getCurrentEnv().getCurrentCatalog() .getTableNameByTableId(Long.valueOf(ctx.tableId.getText())); tableName.add(name.getDb()); tableName.add(name.getTbl()); @@ -526,7 +528,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor { // TODO visit partitionSpecCtx Pair> partitionSpec = visitPartitionSpec(ctx.partitionSpec()); LogicalPlan plan = visitQuery(ctx.query()); - UnboundTableSink sink = new UnboundTableSink<>( + LogicalSink sink = UnboundTableSinkCreator.createUnboundTableSink( tableName.build(), colNames, ImmutableList.of(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTableSinkHashPartitioned.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTableSinkHashPartitioned.java new file mode 100644 index 0000000000..4333bd956e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTableSinkHashPartitioned.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.properties; + +import org.apache.doris.nereids.trees.expressions.ExprId; + +import java.util.List; + +/** + * use for shuffle data by partition keys before sink. + */ +public class DistributionSpecTableSinkHashPartitioned extends DistributionSpec { + + public static final DistributionSpecTableSinkHashPartitioned INSTANCE = + new DistributionSpecTableSinkHashPartitioned(); + + private List outputColExprIds; + + public DistributionSpecTableSinkHashPartitioned() { + super(); + } + + public List getOutputColExprIds() { + return outputColExprIds; + } + + public void setOutputColExprIds(List outputColExprIds) { + this.outputColExprIds = outputColExprIds; + } + + @Override + public boolean satisfy(DistributionSpec other) { + return other instanceof DistributionSpecTableSinkHashPartitioned; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTableSinkRandomPartitioned.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTableSinkRandomPartitioned.java new file mode 100644 index 0000000000..88f791dabe --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/DistributionSpecTableSinkRandomPartitioned.java @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.properties; + +/** + * use for Round Robin by data sink. + */ +public class DistributionSpecTableSinkRandomPartitioned extends DistributionSpec { + + public static final DistributionSpecTableSinkRandomPartitioned INSTANCE = + new DistributionSpecTableSinkRandomPartitioned(); + + private DistributionSpecTableSinkRandomPartitioned() { + super(); + } + + @Override + public boolean satisfy(DistributionSpec other) { + return other instanceof DistributionSpecTableSinkRandomPartitioned; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java index cc5d7db6a0..81e7190e16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/PhysicalProperties.java @@ -49,6 +49,9 @@ public class PhysicalProperties { public static PhysicalProperties TABLET_ID_SHUFFLE = new PhysicalProperties(DistributionSpecTabletIdShuffle.INSTANCE); + public static PhysicalProperties SINK_RANDOM_PARTITIONED + = new PhysicalProperties(DistributionSpecTableSinkRandomPartitioned.INSTANCE); + private final OrderSpec orderSpec; private final DistributionSpec distributionSpec; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java index 72df678b12..16086f7e29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java @@ -38,6 +38,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeRes import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; @@ -129,6 +130,16 @@ public class RequestPropertyDeriver extends PlanVisitor { return null; } + @Override + public Void visitPhysicalHiveTableSink(PhysicalHiveTableSink hiveTableSink, PlanContext context) { + if (connectContext != null && !connectContext.getSessionVariable().enableStrictConsistencyDml) { + addRequestPropertyToChildren(PhysicalProperties.ANY); + } else { + addRequestPropertyToChildren(hiveTableSink.getRequirePhysicalProperties()); + } + return null; + } + @Override public Void visitPhysicalResultSink(PhysicalResultSink physicalResultSink, PlanContext context) { addRequestPropertyToChildren(PhysicalProperties.GATHER); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index 9650a9147b..672f78163c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -30,7 +30,7 @@ public enum RuleType { // **** make sure BINDING_UNBOUND_LOGICAL_PLAN is the lowest priority in the rewrite rules. **** BINDING_RESULT_SINK(RuleTypeClass.REWRITE), - BINDING_INSERT_TARGET_EXTERNAL_TABLE(RuleTypeClass.REWRITE), + BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE), BINDING_INSERT_FILE(RuleTypeClass.REWRITE), BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index 06ad6921de..cac10b75ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -27,11 +27,15 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Pair; +import org.apache.doris.datasource.hive.HMSExternalDatabase; +import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.pattern.MatchingContext; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext; @@ -47,6 +51,7 @@ import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; @@ -61,10 +66,12 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; /** @@ -75,265 +82,7 @@ public class BindSink implements AnalysisRuleFactory { @Override public List buildRules() { return ImmutableList.of( - RuleType.BINDING_INSERT_TARGET_TABLE.build(unboundTableSink().thenApply(ctx -> { - UnboundTableSink sink = ctx.root; - Pair pair = bind(ctx.cascadesContext, sink); - Database database = pair.first; - OlapTable table = pair.second; - boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType() == KeysType.UNIQUE_KEYS; - - LogicalPlan child = ((LogicalPlan) sink.child()); - boolean childHasSeqCol = child.getOutput().stream() - .anyMatch(slot -> slot.getName().equals(Column.SEQUENCE_COL)); - boolean needExtraSeqCol = isPartialUpdate && !childHasSeqCol && table.hasSequenceCol() - && table.getSequenceMapCol() != null - && sink.getColNames().contains(table.getSequenceMapCol()); - Pair, Integer> bindColumnsResult = - bindTargetColumns(table, sink.getColNames(), childHasSeqCol, needExtraSeqCol); - List bindColumns = bindColumnsResult.first; - int extraColumnsNum = bindColumnsResult.second; - - LogicalOlapTableSink boundSink = new LogicalOlapTableSink<>( - database, - table, - bindColumns, - bindPartitionIds(table, sink.getPartitions(), sink.isTemporaryPartition()), - child.getOutput().stream() - .map(NamedExpression.class::cast) - .collect(ImmutableList.toImmutableList()), - isPartialUpdate, - sink.getDMLCommandType(), - child); - - // we need to insert all the columns of the target table - // although some columns are not mentions. - // so we add a projects to supply the default value. - if (boundSink.getCols().size() != child.getOutput().size() + extraColumnsNum) { - throw new AnalysisException("insert into cols should be corresponding to the query output"); - } - - try { - // For Unique Key table with sequence column (which default value is not CURRENT_TIMESTAMP), - // user MUST specify the sequence column while inserting data - // - // case1: create table by `function_column.sequence_col` - // a) insert with column list, must include the sequence map column - // b) insert without column list, already contains the column, don't need to check - // case2: create table by `function_column.sequence_type` - // a) insert with column list, must include the hidden column __DORIS_SEQUENCE_COL__ - // b) insert without column list, don't include the hidden column __DORIS_SEQUENCE_COL__ - // by default, will fail. - if (table.hasSequenceCol()) { - boolean haveInputSeqCol = false; - Optional seqColInTable = Optional.empty(); - if (table.getSequenceMapCol() != null) { - if (!sink.getColNames().isEmpty()) { - if (sink.getColNames().stream() - .anyMatch(c -> c.equalsIgnoreCase(table.getSequenceMapCol()))) { - haveInputSeqCol = true; // case1.a - } - } else { - haveInputSeqCol = true; // case1.b - } - seqColInTable = table.getFullSchema().stream() - .filter(col -> col.getName().equalsIgnoreCase(table.getSequenceMapCol())) - .findFirst(); - } else { - if (!sink.getColNames().isEmpty()) { - if (sink.getColNames().stream() - .anyMatch(c -> c.equalsIgnoreCase(Column.SEQUENCE_COL))) { - haveInputSeqCol = true; // case2.a - } // else case2.b - } - } - - // Don't require user to provide sequence column for partial updates, - // including the following cases: - // 1. it's a load job with `partial_columns=true` - // 2. UPDATE and DELETE, planner will automatically add these hidden columns - if (!haveInputSeqCol && !isPartialUpdate && ( - boundSink.getDmlCommandType() != DMLCommandType.UPDATE - && boundSink.getDmlCommandType() != DMLCommandType.DELETE)) { - if (!seqColInTable.isPresent() || seqColInTable.get().getDefaultValue() == null - || !seqColInTable.get().getDefaultValue() - .equalsIgnoreCase(DefaultValue.CURRENT_TIMESTAMP)) { - throw new org.apache.doris.common.AnalysisException("Table " + table.getName() - + " has sequence column, need to specify the sequence column"); - } - } - } - } catch (Exception e) { - throw new AnalysisException(e.getMessage(), e.getCause()); - } - - // we need to insert all the columns of the target table - // although some columns are not mentions. - // so we add a projects to supply the default value. - - Map columnToChildOutput = Maps.newHashMap(); - for (int i = 0; i < child.getOutput().size(); ++i) { - columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i)); - } - - Map columnToOutput = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - NereidsParser expressionParser = new NereidsParser(); - - // generate slots not mentioned in sql, mv slots and shaded slots. - for (Column column : boundSink.getTargetTable().getFullSchema()) { - if (column.isMaterializedViewColumn()) { - List refs = column.getRefColumns(); - // now we have to replace the column to slots. - Preconditions.checkArgument(refs != null, - "mv column %s 's ref column cannot be null", column); - Expression parsedExpression = expressionParser.parseExpression( - column.getDefineExpr().toSqlWithoutTbl()); - Expression boundSlotExpression = SlotReplacer.INSTANCE - .replace(parsedExpression, columnToOutput); - // the boundSlotExpression is an expression whose slots are bound but function - // may not be bound, we have to bind it again. - // for example: to_bitmap. - Expression boundExpression = FunctionBinder.INSTANCE.rewrite( - boundSlotExpression, new ExpressionRewriteContext(ctx.cascadesContext)); - if (boundExpression instanceof Alias) { - boundExpression = ((Alias) boundExpression).child(); - } - NamedExpression slot = new Alias(boundExpression, column.getDefineExpr().toSqlWithoutTbl()); - columnToOutput.put(column.getName(), slot); - } else if (columnToChildOutput.containsKey(column) - // do not process explicitly use DEFAULT value here: - // insert into table t values(DEFAULT) - && !(columnToChildOutput.get(column) instanceof DefaultValueSlot)) { - columnToOutput.put(column.getName(), columnToChildOutput.get(column)); - } else { - if (table.hasSequenceCol() - && column.getName().equals(Column.SEQUENCE_COL) - && table.getSequenceMapCol() != null) { - Optional seqCol = table.getFullSchema().stream() - .filter(col -> col.getName().equals(table.getSequenceMapCol())) - .findFirst(); - if (!seqCol.isPresent()) { - throw new AnalysisException("sequence column is not contained in" - + " target table " + table.getName()); - } - if (columnToOutput.get(seqCol.get().getName()) != null) { - // should generate diff exprId for seq column - NamedExpression seqColumn = columnToOutput.get(seqCol.get().getName()); - if (seqColumn instanceof Alias) { - seqColumn = new Alias(((Alias) seqColumn).child(), column.getName()); - } else { - seqColumn = new Alias(seqColumn, column.getName()); - } - columnToOutput.put(column.getName(), seqColumn); - } - } else if (isPartialUpdate) { - // If the current load is a partial update, the values of unmentioned - // columns will be filled in SegmentWriter. And the output of sink node - // should not contain these unmentioned columns, so we just skip them. - - // But if the column has 'on update value', we should unconditionally - // update the value of the column to the current timestamp whenever there - // is an update on the row - if (column.hasOnUpdateDefaultValue()) { - Expression defualtValueExpression = FunctionBinder.INSTANCE.rewrite( - new NereidsParser().parseExpression( - column.getOnUpdateDefaultValueExpr().toSqlWithoutTbl()), - new ExpressionRewriteContext(ctx.cascadesContext)); - columnToOutput.put(column.getName(), - new Alias(defualtValueExpression, column.getName())); - } else { - continue; - } - } else if (column.getDefaultValue() == null) { - // throw exception if explicitly use Default value but no default value present - // insert into table t values(DEFAULT) - if (columnToChildOutput.get(column) instanceof DefaultValueSlot) { - throw new AnalysisException("Column has no default value," - + " column=" + column.getName()); - } - // Otherwise, the unmentioned columns should be filled with default values - // or null values - columnToOutput.put(column.getName(), new Alias( - new NullLiteral(DataType.fromCatalogType(column.getType())), - column.getName() - )); - } else { - try { - // it comes from the original planner, if default value expression is - // null, we use the literal string of the default value, or it may be - // default value function, like CURRENT_TIMESTAMP. - if (column.getDefaultValueExpr() == null) { - columnToOutput.put(column.getName(), - new Alias(Literal.of(column.getDefaultValue()) - .checkedCastTo(DataType.fromCatalogType(column.getType())), - column.getName())); - } else { - Expression defualtValueExpression = FunctionBinder.INSTANCE.rewrite( - new NereidsParser().parseExpression( - column.getDefaultValueExpr().toSqlWithoutTbl()), - new ExpressionRewriteContext(ctx.cascadesContext)); - if (defualtValueExpression instanceof Alias) { - defualtValueExpression = ((Alias) defualtValueExpression).child(); - } - columnToOutput.put(column.getName(), - new Alias(defualtValueExpression, column.getName())); - } - } catch (Exception e) { - throw new AnalysisException(e.getMessage(), e.getCause()); - } - } - } - } - List fullOutputExprs = ImmutableList.copyOf(columnToOutput.values()); - if (child instanceof LogicalOneRowRelation) { - // remove default value slot in one row relation - child = ((LogicalOneRowRelation) child).withProjects(((LogicalOneRowRelation) child) - .getProjects().stream() - .filter(p -> !(p instanceof DefaultValueSlot)) - .collect(ImmutableList.toImmutableList())); - } - LogicalProject fullOutputProject = new LogicalProject<>(fullOutputExprs, child); - - // add cast project - List castExprs = Lists.newArrayList(); - for (int i = 0; i < table.getFullSchema().size(); ++i) { - Column col = table.getFullSchema().get(i); - NamedExpression expr = columnToOutput.get(col.getName()); - if (expr == null) { - // If `expr` is null, it means that the current load is a partial update - // and `col` should not be contained in the output of the sink node so - // we skip it. - continue; - } - DataType inputType = expr.getDataType(); - DataType targetType = DataType.fromCatalogType(table.getFullSchema().get(i).getType()); - Expression castExpr = expr; - // TODO move string like type logic into TypeCoercionUtils#castIfNotSameType - if (isSourceAndTargetStringLikeType(inputType, targetType) && !inputType.equals(targetType)) { - int sourceLength = ((CharacterType) inputType).getLen(); - int targetLength = ((CharacterType) targetType).getLen(); - if (sourceLength == targetLength) { - castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType); - } else if (sourceLength > targetLength && targetLength >= 0) { - castExpr = new Substring(castExpr, Literal.of(1), Literal.of(targetLength)); - } else if (targetType.isStringType()) { - castExpr = new Cast(castExpr, StringType.INSTANCE); - } - } else { - castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType); - } - if (castExpr instanceof NamedExpression) { - castExprs.add(((NamedExpression) castExpr)); - } else { - castExprs.add(new Alias(castExpr)); - } - } - if (!castExprs.equals(fullOutputExprs)) { - fullOutputProject = new LogicalProject(castExprs, fullOutputProject); - } - - return boundSink.withChildAndUpdateOutput(fullOutputProject); - - })), + RuleType.BINDING_INSERT_TARGET_TABLE.build(unboundTableSink().thenApply(this::bindOlapTableSink)), RuleType.BINDING_INSERT_FILE.build( logicalFileSink().when(s -> s.getOutputExprs().isEmpty()) .then(fileSink -> fileSink.withOutputExprs( @@ -342,20 +91,316 @@ public class BindSink implements AnalysisRuleFactory { .collect(ImmutableList.toImmutableList()))) ), // TODO: bind hive taget table - RuleType.BINDING_INSERT_TARGET_EXTERNAL_TABLE.build( - logicalHiveTableSink().when(s -> s.getOutputExprs().isEmpty()) - .then(hiveTableSink -> hiveTableSink.withOutputExprs( - hiveTableSink.child().getOutput().stream() - .map(NamedExpression.class::cast) - .collect(ImmutableList.toImmutableList()))) - ) + RuleType.BINDING_INSERT_HIVE_TABLE.build(unboundHiveTableSink().thenApply(this::bindHiveTableSink)) ); } + private Plan bindOlapTableSink(MatchingContext> ctx) { + UnboundTableSink sink = ctx.root; + Pair pair = bind(ctx.cascadesContext, sink); + Database database = pair.first; + OlapTable table = pair.second; + boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType() == KeysType.UNIQUE_KEYS; + + LogicalPlan child = ((LogicalPlan) sink.child()); + boolean childHasSeqCol = child.getOutput().stream() + .anyMatch(slot -> slot.getName().equals(Column.SEQUENCE_COL)); + boolean needExtraSeqCol = isPartialUpdate && !childHasSeqCol && table.hasSequenceCol() + && table.getSequenceMapCol() != null + && sink.getColNames().contains(table.getSequenceMapCol()); + Pair, Integer> bindColumnsResult = + bindTargetColumns(table, sink.getColNames(), childHasSeqCol, needExtraSeqCol); + List bindColumns = bindColumnsResult.first; + int extraColumnsNum = bindColumnsResult.second; + + LogicalOlapTableSink boundSink = new LogicalOlapTableSink<>( + database, + table, + bindColumns, + bindPartitionIds(table, sink.getPartitions(), sink.isTemporaryPartition()), + child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()), + isPartialUpdate, + sink.getDMLCommandType(), + child); + + // we need to insert all the columns of the target table + // although some columns are not mentions. + // so we add a projects to supply the default value. + if (boundSink.getCols().size() != child.getOutput().size() + extraColumnsNum) { + throw new AnalysisException("insert into cols should be corresponding to the query output"); + } + + try { + // For Unique Key table with sequence column (which default value is not CURRENT_TIMESTAMP), + // user MUST specify the sequence column while inserting data + // + // case1: create table by `function_column.sequence_col` + // a) insert with column list, must include the sequence map column + // b) insert without column list, already contains the column, don't need to check + // case2: create table by `function_column.sequence_type` + // a) insert with column list, must include the hidden column __DORIS_SEQUENCE_COL__ + // b) insert without column list, don't include the hidden column __DORIS_SEQUENCE_COL__ + // by default, will fail. + if (table.hasSequenceCol()) { + boolean haveInputSeqCol = false; + Optional seqColInTable = Optional.empty(); + if (table.getSequenceMapCol() != null) { + if (!sink.getColNames().isEmpty()) { + if (sink.getColNames().stream() + .anyMatch(c -> c.equalsIgnoreCase(table.getSequenceMapCol()))) { + haveInputSeqCol = true; // case1.a + } + } else { + haveInputSeqCol = true; // case1.b + } + seqColInTable = table.getFullSchema().stream() + .filter(col -> col.getName().equalsIgnoreCase(table.getSequenceMapCol())) + .findFirst(); + } else { + if (!sink.getColNames().isEmpty()) { + if (sink.getColNames().stream() + .anyMatch(c -> c.equalsIgnoreCase(Column.SEQUENCE_COL))) { + haveInputSeqCol = true; // case2.a + } // else case2.b + } + } + + // Don't require user to provide sequence column for partial updates, + // including the following cases: + // 1. it's a load job with `partial_columns=true` + // 2. UPDATE and DELETE, planner will automatically add these hidden columns + if (!haveInputSeqCol && !isPartialUpdate && ( + boundSink.getDmlCommandType() != DMLCommandType.UPDATE + && boundSink.getDmlCommandType() != DMLCommandType.DELETE)) { + if (!seqColInTable.isPresent() || seqColInTable.get().getDefaultValue() == null + || !seqColInTable.get().getDefaultValue() + .equalsIgnoreCase(DefaultValue.CURRENT_TIMESTAMP)) { + throw new org.apache.doris.common.AnalysisException("Table " + table.getName() + + " has sequence column, need to specify the sequence column"); + } + } + } + } catch (Exception e) { + throw new AnalysisException(e.getMessage(), e.getCause()); + } + + // we need to insert all the columns of the target table + // although some columns are not mentions. + // so we add a projects to supply the default value. + + Map columnToChildOutput = Maps.newHashMap(); + for (int i = 0; i < child.getOutput().size(); ++i) { + columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i)); + } + + Map columnToOutput = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + NereidsParser expressionParser = new NereidsParser(); + + // generate slots not mentioned in sql, mv slots and shaded slots. + for (Column column : boundSink.getTargetTable().getFullSchema()) { + if (column.isMaterializedViewColumn()) { + List refs = column.getRefColumns(); + // now we have to replace the column to slots. + Preconditions.checkArgument(refs != null, + "mv column %s 's ref column cannot be null", column); + Expression parsedExpression = expressionParser.parseExpression( + column.getDefineExpr().toSqlWithoutTbl()); + Expression boundSlotExpression = SlotReplacer.INSTANCE + .replace(parsedExpression, columnToOutput); + // the boundSlotExpression is an expression whose slots are bound but function + // may not be bound, we have to bind it again. + // for example: to_bitmap. + Expression boundExpression = FunctionBinder.INSTANCE.rewrite( + boundSlotExpression, new ExpressionRewriteContext(ctx.cascadesContext)); + if (boundExpression instanceof Alias) { + boundExpression = ((Alias) boundExpression).child(); + } + NamedExpression slot = new Alias(boundExpression, column.getDefineExpr().toSqlWithoutTbl()); + columnToOutput.put(column.getName(), slot); + } else if (columnToChildOutput.containsKey(column) + // do not process explicitly use DEFAULT value here: + // insert into table t values(DEFAULT) + && !(columnToChildOutput.get(column) instanceof DefaultValueSlot)) { + columnToOutput.put(column.getName(), columnToChildOutput.get(column)); + } else { + if (table.hasSequenceCol() + && column.getName().equals(Column.SEQUENCE_COL) + && table.getSequenceMapCol() != null) { + Optional seqCol = table.getFullSchema().stream() + .filter(col -> col.getName().equals(table.getSequenceMapCol())) + .findFirst(); + if (!seqCol.isPresent()) { + throw new AnalysisException("sequence column is not contained in" + + " target table " + table.getName()); + } + if (columnToOutput.get(seqCol.get().getName()) != null) { + // should generate diff exprId for seq column + NamedExpression seqColumn = columnToOutput.get(seqCol.get().getName()); + if (seqColumn instanceof Alias) { + seqColumn = new Alias(((Alias) seqColumn).child(), column.getName()); + } else { + seqColumn = new Alias(seqColumn, column.getName()); + } + columnToOutput.put(column.getName(), seqColumn); + } + } else if (isPartialUpdate) { + // If the current load is a partial update, the values of unmentioned + // columns will be filled in SegmentWriter. And the output of sink node + // should not contain these unmentioned columns, so we just skip them. + + // But if the column has 'on update value', we should unconditionally + // update the value of the column to the current timestamp whenever there + // is an update on the row + if (column.hasOnUpdateDefaultValue()) { + Expression defualtValueExpression = FunctionBinder.INSTANCE.rewrite( + new NereidsParser().parseExpression( + column.getOnUpdateDefaultValueExpr().toSqlWithoutTbl()), + new ExpressionRewriteContext(ctx.cascadesContext)); + columnToOutput.put(column.getName(), + new Alias(defualtValueExpression, column.getName())); + } else { + continue; + } + } else if (column.getDefaultValue() == null) { + // throw exception if explicitly use Default value but no default value present + // insert into table t values(DEFAULT) + if (columnToChildOutput.get(column) instanceof DefaultValueSlot) { + throw new AnalysisException("Column has no default value," + + " column=" + column.getName()); + } + // Otherwise, the unmentioned columns should be filled with default values + // or null values + columnToOutput.put(column.getName(), new Alias( + new NullLiteral(DataType.fromCatalogType(column.getType())), + column.getName() + )); + } else { + try { + // it comes from the original planner, if default value expression is + // null, we use the literal string of the default value, or it may be + // default value function, like CURRENT_TIMESTAMP. + if (column.getDefaultValueExpr() == null) { + columnToOutput.put(column.getName(), + new Alias(Literal.of(column.getDefaultValue()) + .checkedCastTo(DataType.fromCatalogType(column.getType())), + column.getName())); + } else { + Expression defualtValueExpression = FunctionBinder.INSTANCE.rewrite( + new NereidsParser().parseExpression( + column.getDefaultValueExpr().toSqlWithoutTbl()), + new ExpressionRewriteContext(ctx.cascadesContext)); + if (defualtValueExpression instanceof Alias) { + defualtValueExpression = ((Alias) defualtValueExpression).child(); + } + columnToOutput.put(column.getName(), + new Alias(defualtValueExpression, column.getName())); + } + } catch (Exception e) { + throw new AnalysisException(e.getMessage(), e.getCause()); + } + } + } + } + List fullOutputExprs = ImmutableList.copyOf(columnToOutput.values()); + if (child instanceof LogicalOneRowRelation) { + // remove default value slot in one row relation + child = ((LogicalOneRowRelation) child).withProjects(((LogicalOneRowRelation) child) + .getProjects().stream() + .filter(p -> !(p instanceof DefaultValueSlot)) + .collect(ImmutableList.toImmutableList())); + } + LogicalProject fullOutputProject = new LogicalProject<>(fullOutputExprs, child); + + // add cast project + List castExprs = Lists.newArrayList(); + for (int i = 0; i < table.getFullSchema().size(); ++i) { + Column col = table.getFullSchema().get(i); + NamedExpression expr = columnToOutput.get(col.getName()); + if (expr == null) { + // If `expr` is null, it means that the current load is a partial update + // and `col` should not be contained in the output of the sink node so + // we skip it. + continue; + } + DataType inputType = expr.getDataType(); + DataType targetType = DataType.fromCatalogType(table.getFullSchema().get(i).getType()); + Expression castExpr = expr; + // TODO move string like type logic into TypeCoercionUtils#castIfNotSameType + if (isSourceAndTargetStringLikeType(inputType, targetType) && !inputType.equals(targetType)) { + int sourceLength = ((CharacterType) inputType).getLen(); + int targetLength = ((CharacterType) targetType).getLen(); + if (sourceLength == targetLength) { + castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType); + } else if (sourceLength > targetLength && targetLength >= 0) { + castExpr = new Substring(castExpr, Literal.of(1), Literal.of(targetLength)); + } else if (targetType.isStringType()) { + castExpr = new Cast(castExpr, StringType.INSTANCE); + } + } else { + castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType); + } + if (castExpr instanceof NamedExpression) { + castExprs.add(((NamedExpression) castExpr)); + } else { + castExprs.add(new Alias(castExpr)); + } + } + if (!castExprs.equals(fullOutputExprs)) { + fullOutputProject = new LogicalProject(castExprs, fullOutputProject); + } + + return boundSink.withChildAndUpdateOutput(fullOutputProject); + } + + private Plan bindHiveTableSink(MatchingContext> ctx) { + UnboundHiveTableSink sink = ctx.root; + Pair pair = bind(ctx.cascadesContext, sink); + HMSExternalDatabase database = pair.first; + HMSExternalTable table = pair.second; + LogicalPlan child = ((LogicalPlan) sink.child()); + + List bindColumns; + if (sink.getColNames().isEmpty()) { + bindColumns = table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList()); + } else { + bindColumns = sink.getColNames().stream().map(cn -> { + Column column = table.getColumn(cn); + if (column == null) { + throw new AnalysisException(String.format("column %s is not found in table %s", + cn, table.getName())); + } + return column; + }).collect(ImmutableList.toImmutableList()); + } + Set hivePartitionKeys = table.getRemoteTable() + .getPartitionKeys().stream() + .map(FieldSchema::getName) + .collect(Collectors.toSet()); + LogicalHiveTableSink boundSink = new LogicalHiveTableSink<>( + database, + table, + bindColumns, + hivePartitionKeys, + child.getOutput().stream() + .map(NamedExpression.class::cast) + .collect(ImmutableList.toImmutableList()), + sink.getDMLCommandType(), + Optional.empty(), + Optional.empty(), + child); + // we need to insert all the columns of the target table + if (boundSink.getCols().size() != child.getOutput().size()) { + throw new AnalysisException("insert into cols should be corresponding to the query output"); + } + return boundSink; + } + private Pair bind(CascadesContext cascadesContext, UnboundTableSink sink) { List tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), sink.getNameParts()); - Pair pair = RelationUtil.getDbAndTable(tableQualifier, + Pair, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier, cascadesContext.getConnectContext().getEnv()); if (!(pair.second instanceof OlapTable)) { try { @@ -368,6 +413,18 @@ public class BindSink implements AnalysisRuleFactory { return Pair.of(((Database) pair.first), (OlapTable) pair.second); } + private Pair bind(CascadesContext cascadesContext, + UnboundHiveTableSink sink) { + List tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(), + sink.getNameParts()); + Pair, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier, + cascadesContext.getConnectContext().getEnv()); + if (pair.second instanceof HMSExternalTable) { + return Pair.of(((HMSExternalDatabase) pair.first), (HMSExternalTable) pair.second); + } + throw new AnalysisException("the target table of insert into is not a Hive table"); + } + private List bindPartitionIds(OlapTable table, List partitions, boolean temp) { return partitions.isEmpty() ? ImmutableList.of() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java index 13329a5d55..f212865861 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalHiveTableSinkToPhysicalHiveTableSink.java @@ -37,13 +37,13 @@ public class LogicalHiveTableSinkToPhysicalHiveTableSink extends OneImplementati sink.getDatabase(), sink.getTargetTable(), sink.getCols(), - sink.getPartitionIds(), sink.getOutputExprs(), Optional.empty(), sink.getLogicalProperties(), null, null, - sink.child()); + sink.child(), + sink.getHivePartitionKeys()); }).toRule(RuleType.LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 11a6a7b568..1c493deae0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -46,8 +46,10 @@ public enum PlanType { // logical sinks LOGICAL_FILE_SINK, LOGICAL_OLAP_TABLE_SINK, + LOGICAL_HIVE_TABLE_SINK, LOGICAL_RESULT_SINK, LOGICAL_UNBOUND_OLAP_TABLE_SINK, + LOGICAL_UNBOUND_HIVE_TABLE_SINK, LOGICAL_UNBOUND_RESULT_SINK, // logical others diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java index 10dfdd2c2b..1a6004e939 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java @@ -25,7 +25,7 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.common.ErrorCode; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.analyzer.UnboundResultSink; -import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator; import org.apache.doris.nereids.annotation.Developing; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.properties.PhysicalProperties; @@ -153,8 +153,8 @@ public class CreateTableCommand extends Command implements ForwardWithSync { throw new AnalysisException(e.getMessage(), e.getCause()); } - query = new UnboundTableSink<>(createTableInfo.getTableNameParts(), ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), query); + query = UnboundTableSinkCreator.createUnboundTableSink(createTableInfo.getTableNameParts(), + ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), query); try { new InsertIntoTableCommand(query, Optional.empty(), Optional.empty()).run(ctx, executor); if (ctx.getState().getStateType() == MysqlStateType.ERR) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java index 3791b47f14..ff70c75558 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteFromUsingCommand.java @@ -21,7 +21,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.OlapTable; import org.apache.doris.nereids.analyzer.UnboundAlias; import org.apache.doris.nereids.analyzer.UnboundSlot; -import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral; @@ -115,7 +115,7 @@ public class DeleteFromUsingCommand extends Command implements ForwardWithSync, && cols.size() < targetTable.getColumns().size(); // make UnboundTableSink - return new UnboundTableSink<>(nameParts, cols, ImmutableList.of(), + return UnboundTableSinkCreator.createUnboundTableSink(nameParts, cols, ImmutableList.of(), isTempPart, partitions, isPartialUpdate, DMLCommandType.DELETE, logicalQuery); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java index 01f2d0f8e8..cea0efc6fe 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java @@ -37,7 +37,7 @@ import org.apache.doris.nereids.analyzer.UnboundAlias; import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundStar; import org.apache.doris.nereids.analyzer.UnboundTVFRelation; -import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator; import org.apache.doris.nereids.trees.expressions.ComparisonPredicate; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; @@ -236,7 +236,7 @@ public class LoadCommand extends Command implements ForwardWithSync { checkAndAddSequenceCol(olapTable, dataDesc, sinkCols, selectLists); boolean isPartialUpdate = olapTable.getEnableUniqueKeyMergeOnWrite() && sinkCols.size() < olapTable.getColumns().size(); - return new UnboundTableSink<>(dataDesc.getNameParts(), sinkCols, ImmutableList.of(), + return UnboundTableSinkCreator.createUnboundTableSink(dataDesc.getNameParts(), sinkCols, ImmutableList.of(), false, dataDesc.getPartitionNames(), isPartialUpdate, DMLCommandType.LOAD, tvfLogicalPlan); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java index 29d3b8d96f..76143c0e80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java @@ -24,7 +24,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.nereids.analyzer.UnboundAlias; import org.apache.doris.nereids.analyzer.UnboundSlot; -import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.rules.analysis.SlotBinder; @@ -189,8 +189,8 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab logicalQuery = ((LogicalPlan) cte.get().withChildren(logicalQuery)); } // make UnboundTableSink - return new UnboundTableSink<>(nameParts, isPartialUpdate ? partialUpdateColNames : ImmutableList.of(), - ImmutableList.of(), + return UnboundTableSinkCreator.createUnboundTableSink(nameParts, + isPartialUpdate ? partialUpdateColNames : ImmutableList.of(), ImmutableList.of(), false, ImmutableList.of(), isPartialUpdate, DMLCommandType.UPDATE, logicalQuery); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index 8b714b64ed..b456be6e26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -24,9 +24,10 @@ import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; +import org.apache.doris.common.UserException; import org.apache.doris.nereids.analyzer.UnboundRelation; import org.apache.doris.nereids.analyzer.UnboundSlot; -import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator; import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.GreaterThanEqual; @@ -42,6 +43,7 @@ import org.apache.doris.nereids.trees.plans.algebra.Sink; import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalSink; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.RelationUtil; @@ -76,7 +78,7 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand { * @return command */ public static UpdateMvByPartitionCommand from(MTMV mv, Set partitionIds, - Map tableWithPartKey) { + Map tableWithPartKey) throws UserException { NereidsParser parser = new NereidsParser(); Map> predicates = constructTableWithPredicates(mv, partitionIds, tableWithPartKey); @@ -86,9 +88,8 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand { if (plan instanceof Sink) { plan = plan.child(0); } - UnboundTableSink sink = - new UnboundTableSink<>(mv.getFullQualifiers(), ImmutableList.of(), ImmutableList.of(), - parts, plan); + LogicalSink sink = UnboundTableSinkCreator.createUnboundTableSink(mv.getFullQualifiers(), + ImmutableList.of(), ImmutableList.of(), parts, plan); return new UpdateMvByPartitionCommand(sink); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java new file mode 100644 index 0000000000..9e4c2bc92a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.insert; + +/** + * For Hive Table + */ +public class HiveInsertCommandContext extends InsertCommandContext { + private boolean overwrite = true; + + public boolean isOverwrite() { + return overwrite; + } + + public void setOverwrite(boolean overwrite) { + this.overwrite = overwrite; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java index f8bc8f2db4..fc6b7a776b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java @@ -17,20 +17,24 @@ package org.apache.doris.nereids.trees.plans.commands.insert; -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.catalog.Env; +import org.apache.doris.common.ErrorCode; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.nereids.NereidsPlanner; import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; import org.apache.doris.planner.DataSink; import org.apache.doris.planner.HiveTableSink; import org.apache.doris.planner.PlanFragment; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.QueryState; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.transaction.TransactionState; +import org.apache.doris.transaction.TransactionStatus; +import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -43,6 +47,7 @@ public class HiveInsertExecutor extends AbstractInsertExecutor { private static final Logger LOG = LogManager.getLogger(HiveInsertExecutor.class); private static final long INVALID_TXN_ID = -1L; private long txnId = INVALID_TXN_ID; + private TransactionStatus txnStatus = TransactionStatus.ABORTED; /** * constructor @@ -59,20 +64,15 @@ public class HiveInsertExecutor extends AbstractInsertExecutor { @Override public void beginTransaction() { - + // TODO: use hive txn rather than internal txn } @Override protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) { HiveTableSink hiveTableSink = (HiveTableSink) sink; - // PhysicalHiveTableSink physicalHiveTableSink = (PhysicalHiveTableSink) physicalSink; + PhysicalHiveTableSink physicalHiveSink = (PhysicalHiveTableSink) physicalSink; try { - hiveTableSink.init(); - hiveTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx)); - TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId); - if (state == null) { - throw new AnalysisException("txn does not exist: " + txnId); - } + hiveTableSink.bindDataSink(physicalHiveSink.getCols(), insertCtx); } catch (Exception e) { throw new AnalysisException(e.getMessage(), e); } @@ -80,21 +80,36 @@ public class HiveInsertExecutor extends AbstractInsertExecutor { @Override protected void beforeExec() { - + // check params } @Override protected void onComplete() throws UserException { - + if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) { + LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier()); + } else { + txnStatus = TransactionStatus.COMMITTED; + } } @Override protected void onFail(Throwable t) { - + errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage(); + String queryId = DebugUtil.printId(ctx.queryId()); + // if any throwable being thrown during insert operation, first we should abort this txn + LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t); + if (txnId != INVALID_TXN_ID) { + LOG.warn("insert [{}] with query id {} abort txn {} failed", labelName, queryId, txnId); + StringBuilder sb = new StringBuilder(t.getMessage()); + if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) { + sb.append(". url: ").append(coordinator.getTrackingUrl()); + } + ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString()); + } } @Override protected void afterExec(StmtExecutor executor) { - + // TODO: set THivePartitionUpdate } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java index 29d96ae4ad..cf70ccd56d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java @@ -141,9 +141,8 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, if (ctx.getMysqlChannel() != null) { ctx.getMysqlChannel().reset(); } - - Optional> plan = (planner.getPhysicalPlan() - .>>collect(PhysicalSink.class::isInstance)).stream() + Optional> plan = (planner.getPhysicalPlan() + .>>collect(PhysicalSink.class::isInstance)).stream() .findAny(); Preconditions.checkArgument(plan.isPresent(), "insert into command must contain target table"); PhysicalSink physicalSink = plan.get(); @@ -165,6 +164,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, } else if (physicalSink instanceof PhysicalHiveTableSink) { HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf; insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner, insertCtx); + // set hive query options } else { // TODO: support other table types throw new AnalysisException("insert into command only support olap table"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index fffdf06b54..788871c744 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -24,10 +24,13 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.common.util.InternalDatabaseUtil; +import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.insertoverwrite.InsertOverwriteUtil; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundTableSink; +import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.trees.TreeNode; @@ -37,6 +40,7 @@ import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.qe.ConnectContext; @@ -95,8 +99,8 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS } TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx); - if (!(targetTableIf instanceof OlapTable)) { - throw new AnalysisException("insert into overwrite only support OLAP table." + if (!(targetTableIf instanceof OlapTable || targetTableIf instanceof HMSExternalTable)) { + throw new AnalysisException("insert into overwrite only support OLAP and HMS table." + " But current table type is " + targetTableIf.getType()); } this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf); @@ -156,20 +160,40 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS */ private void insertInto(ConnectContext ctx, StmtExecutor executor, List tempPartitionNames) throws Exception { - UnboundTableSink sink = (UnboundTableSink) logicalQuery; - UnboundTableSink copySink = new UnboundTableSink<>( - sink.getNameParts(), - sink.getColNames(), - sink.getHints(), - true, - tempPartitionNames, - sink.isPartialUpdate(), - sink.getDMLCommandType(), - (LogicalPlan) (sink.child(0))); - // for overwrite situation, we disable auto create partition. - OlapInsertCommandContext insertCtx = new OlapInsertCommandContext(); - insertCtx.setAllowAutoPartition(false); - InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(copySink, labelName, Optional.of(insertCtx)); + UnboundLogicalSink copySink; + InsertCommandContext insertCtx; + if (logicalQuery instanceof UnboundTableSink) { + UnboundTableSink sink = (UnboundTableSink) logicalQuery; + copySink = (UnboundLogicalSink) UnboundTableSinkCreator.createUnboundTableSink( + sink.getNameParts(), + sink.getColNames(), + sink.getHints(), + true, + tempPartitionNames, + sink.isPartialUpdate(), + sink.getDMLCommandType(), + (LogicalPlan) (sink.child(0))); + // for overwrite situation, we disable auto create partition. + insertCtx = new OlapInsertCommandContext(); + ((OlapInsertCommandContext) insertCtx).setAllowAutoPartition(false); + } else if (logicalQuery instanceof UnboundHiveTableSink) { + UnboundHiveTableSink sink = (UnboundHiveTableSink) logicalQuery; + copySink = (UnboundLogicalSink) UnboundTableSinkCreator.createUnboundTableSink( + sink.getNameParts(), + sink.getColNames(), + sink.getHints(), + false, + sink.getPartitions(), + false, + sink.getDMLCommandType(), + (LogicalPlan) (sink.child(0))); + insertCtx = new HiveInsertCommandContext(); + ((HiveInsertCommandContext) insertCtx).setOverwrite(false); + } else { + throw new RuntimeException("Current catalog does not support insert overwrite yet."); + } + InsertIntoTableCommand insertCommand = + new InsertIntoTableCommand(copySink, labelName, Optional.of(insertCtx)); insertCommand.run(ctx, executor); if (ctx.getState().getStateType() == MysqlStateType.ERR) { String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index 007fd48ccf..c1894c50eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.UserException; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.nereids.analyzer.UnboundAlias; +import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundOneRowRelation; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.exceptions.AnalysisException; @@ -46,6 +47,7 @@ import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier; import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink; import org.apache.doris.nereids.types.DataType; import org.apache.doris.nereids.util.RelationUtil; import org.apache.doris.nereids.util.TypeCoercionUtils; @@ -228,39 +230,39 @@ public class InsertUtils { * normalize plan to let it could be process correctly by nereids */ public static Plan normalizePlan(Plan plan, TableIf table) { - UnboundTableSink unboundTableSink = (UnboundTableSink) plan; - - if (table instanceof OlapTable && ((OlapTable) table).getKeysType() == KeysType.UNIQUE_KEYS - && unboundTableSink.isPartialUpdate()) { - // check the necessary conditions for partial updates - OlapTable olapTable = (OlapTable) table; - if (!olapTable.getEnableUniqueKeyMergeOnWrite()) { - throw new AnalysisException("Partial update is only allowed on " - + "unique table with merge-on-write enabled."); - } - if (unboundTableSink.getDMLCommandType() == DMLCommandType.INSERT) { - if (unboundTableSink.getColNames().isEmpty()) { - throw new AnalysisException("You must explicitly specify the columns to be updated when " - + "updating partial columns using the INSERT statement."); - } - for (Column col : olapTable.getFullSchema()) { - Optional insertCol = unboundTableSink.getColNames().stream() - .filter(c -> c.equalsIgnoreCase(col.getName())).findFirst(); - if (col.isKey() && !insertCol.isPresent()) { - throw new AnalysisException("Partial update should include all key columns, missing: " - + col.getName()); - } - } - } - } + UnboundLogicalSink unboundLogicalSink = (UnboundLogicalSink) plan; if (table instanceof HMSExternalTable) { - // TODO: check HMSExternalTable HMSExternalTable hiveTable = (HMSExternalTable) table; if (hiveTable.isView()) { throw new AnalysisException("View is not support in hive external table."); } } - Plan query = unboundTableSink.child(); + if (table instanceof OlapTable && ((OlapTable) table).getKeysType() == KeysType.UNIQUE_KEYS) { + if (unboundLogicalSink instanceof UnboundTableSink + && ((UnboundTableSink) unboundLogicalSink).isPartialUpdate()) { + // check the necessary conditions for partial updates + OlapTable olapTable = (OlapTable) table; + if (!olapTable.getEnableUniqueKeyMergeOnWrite()) { + throw new AnalysisException("Partial update is only allowed on " + + "unique table with merge-on-write enabled."); + } + if (unboundLogicalSink.getDMLCommandType() == DMLCommandType.INSERT) { + if (unboundLogicalSink.getColNames().isEmpty()) { + throw new AnalysisException("You must explicitly specify the columns to be updated when " + + "updating partial columns using the INSERT statement."); + } + for (Column col : olapTable.getFullSchema()) { + Optional insertCol = unboundLogicalSink.getColNames().stream() + .filter(c -> c.equalsIgnoreCase(col.getName())).findFirst(); + if (col.isKey() && !insertCol.isPresent()) { + throw new AnalysisException("Partial update should include all key columns, missing: " + + col.getName()); + } + } + } + } + } + Plan query = unboundLogicalSink.child(); if (!(query instanceof LogicalInlineTable)) { return plan; } @@ -271,28 +273,28 @@ public class InsertUtils { for (List values : logicalInlineTable.getConstantExprsList()) { ImmutableList.Builder constantExprs = ImmutableList.builder(); if (values.isEmpty()) { - if (CollectionUtils.isNotEmpty(unboundTableSink.getColNames())) { + if (CollectionUtils.isNotEmpty(unboundLogicalSink.getColNames())) { throw new AnalysisException("value list should not be empty if columns are specified"); } for (Column column : columns) { constantExprs.add(generateDefaultExpression(column)); } } else { - if (CollectionUtils.isNotEmpty(unboundTableSink.getColNames())) { - if (values.size() != unboundTableSink.getColNames().size()) { + if (CollectionUtils.isNotEmpty(unboundLogicalSink.getColNames())) { + if (values.size() != unboundLogicalSink.getColNames().size()) { throw new AnalysisException("Column count doesn't match value count"); } for (int i = 0; i < values.size(); i++) { Column sameNameColumn = null; for (Column column : table.getBaseSchema(true)) { - if (unboundTableSink.getColNames().get(i).equalsIgnoreCase(column.getName())) { + if (unboundLogicalSink.getColNames().get(i).equalsIgnoreCase(column.getName())) { sameNameColumn = column; break; } } if (sameNameColumn == null) { throw new AnalysisException("Unknown column '" - + unboundTableSink.getColNames().get(i) + "' in target table."); + + unboundLogicalSink.getColNames().get(i) + "' in target table."); } if (values.get(i) instanceof DefaultValueSlot) { constantExprs.add(generateDefaultExpression(sameNameColumn)); @@ -340,11 +342,15 @@ public class InsertUtils { * get target table from names. */ public static TableIf getTargetTable(Plan plan, ConnectContext ctx) { - if (!(plan instanceof UnboundTableSink)) { - throw new AnalysisException("the root of plan should be UnboundTableSink" + UnboundLogicalSink unboundTableSink; + if (plan instanceof UnboundTableSink) { + unboundTableSink = (UnboundTableSink) plan; + } else if (plan instanceof UnboundHiveTableSink) { + unboundTableSink = (UnboundHiveTableSink) plan; + } else { + throw new AnalysisException("the root of plan should be UnboundTableSink or UnboundHiveTableSink" + " but it is " + plan.getType()); } - UnboundTableSink unboundTableSink = (UnboundTableSink) plan; List tableQualifier = RelationUtil.getQualifierName(ctx, unboundTableSink.getNameParts()); return RelationUtil.getDbAndTable(tableQualifier, ctx.getEnv()).second; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java index 9d31a39b3e..5522f81cb5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalHiveTableSink.java @@ -37,6 +37,7 @@ import com.google.common.collect.ImmutableList; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; /** * logical hive table sink for insert command @@ -47,41 +48,46 @@ public class LogicalHiveTableSink extends LogicalSink cols; - private final List partitionIds; + private final Set hivePartitionKeys; private final DMLCommandType dmlCommandType; /** * constructor */ - public LogicalHiveTableSink(HMSExternalDatabase database, HMSExternalTable targetTable, List cols, - List partitionIds, List outputExprs, - DMLCommandType dmlCommandType, Optional groupExpression, - Optional logicalProperties, CHILD_TYPE child) { - super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, child); + public LogicalHiveTableSink(HMSExternalDatabase database, + HMSExternalTable targetTable, + List cols, + Set hivePartitionKeys, + List outputExprs, + DMLCommandType dmlCommandType, + Optional groupExpression, + Optional logicalProperties, + CHILD_TYPE child) { + super(PlanType.LOGICAL_HIVE_TABLE_SINK, outputExprs, groupExpression, logicalProperties, child); this.database = Objects.requireNonNull(database, "database != null in LogicalHiveTableSink"); this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalHiveTableSink"); this.cols = Utils.copyRequiredList(cols); this.dmlCommandType = dmlCommandType; - this.partitionIds = Utils.copyRequiredList(partitionIds); + this.hivePartitionKeys = hivePartitionKeys; } public Plan withChildAndUpdateOutput(Plan child) { List output = child.getOutput().stream() .map(NamedExpression.class::cast) .collect(ImmutableList.toImmutableList()); - return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, output, + return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, output, dmlCommandType, Optional.empty(), Optional.empty(), child); } @Override public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "LogicalHiveTableSink only accepts one child"); - return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, + return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, outputExprs, dmlCommandType, Optional.empty(), Optional.empty(), children.get(0)); } public LogicalHiveTableSink withOutputExprs(List outputExprs) { - return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, + return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, outputExprs, dmlCommandType, Optional.empty(), Optional.empty(), child()); } @@ -97,8 +103,8 @@ public class LogicalHiveTableSink extends LogicalSink getPartitionIds() { - return partitionIds; + public Set getHivePartitionKeys() { + return hivePartitionKeys; } public DMLCommandType getDmlCommandType() { @@ -119,13 +125,12 @@ public class LogicalHiveTableSink extends LogicalSink that = (LogicalHiveTableSink) o; return dmlCommandType == that.dmlCommandType && Objects.equals(database, that.database) - && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols) - && Objects.equals(partitionIds, that.partitionIds); + && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), database, targetTable, cols, partitionIds, dmlCommandType); + return Objects.hash(super.hashCode(), database, targetTable, cols, dmlCommandType); } @Override @@ -135,7 +140,7 @@ public class LogicalHiveTableSink extends LogicalSink extends LogicalSink groupExpression) { - return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, + return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, outputExprs, dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { - return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, + return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, outputExprs, dmlCommandType, groupExpression, logicalProperties, children.get(0)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/UnboundLogicalSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/UnboundLogicalSink.java new file mode 100644 index 0000000000..c02fa1bac8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/UnboundLogicalSink.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.util.Utils; + +import java.util.List; +import java.util.Optional; + +/** abstract logical sink */ +public abstract class UnboundLogicalSink extends LogicalSink { + protected final List nameParts; + protected final List colNames; + protected final DMLCommandType dmlCommandType; + + public UnboundLogicalSink(List nameParts, + PlanType type, + List outputExprs, + Optional groupExpression, + Optional logicalProperties, + List colNames, + DMLCommandType dmlCommandType, + CHILD_TYPE child) { + super(type, outputExprs, groupExpression, logicalProperties, child); + this.colNames = Utils.copyRequiredList(colNames); + this.dmlCommandType = dmlCommandType; + this.nameParts = Utils.copyRequiredList(nameParts); + } + + public DMLCommandType getDMLCommandType() { + return dmlCommandType; + } + + public List getColNames() { + return colNames; + } + + public List getNameParts() { + return nameParts; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java index eee55e3c28..809a91e312 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java @@ -21,8 +21,10 @@ import org.apache.doris.catalog.Column; import org.apache.doris.datasource.hive.HMSExternalDatabase; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DistributionSpecTableSinkHashPartitioned; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.plans.Plan; @@ -33,10 +35,14 @@ import org.apache.doris.nereids.util.Utils; import org.apache.doris.statistics.Statistics; import com.google.common.collect.ImmutableList; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; /** abstract physical hive sink */ public class PhysicalHiveTableSink extends PhysicalSink implements Sink { @@ -44,7 +50,7 @@ public class PhysicalHiveTableSink extends PhysicalSink private final HMSExternalDatabase database; private final HMSExternalTable targetTable; private final List cols; - private final List partitionIds; + private final Set hivePartitionKeys; /** * constructor @@ -52,13 +58,13 @@ public class PhysicalHiveTableSink extends PhysicalSink public PhysicalHiveTableSink(HMSExternalDatabase database, HMSExternalTable targetTable, List cols, - List partitionIds, List outputExprs, Optional groupExpression, LogicalProperties logicalProperties, - CHILD_TYPE child) { - this(database, targetTable, cols, partitionIds, outputExprs, groupExpression, logicalProperties, - PhysicalProperties.GATHER, null, child); + CHILD_TYPE child, + Set hivePartitionKeys) { + this(database, targetTable, cols, outputExprs, groupExpression, logicalProperties, + PhysicalProperties.GATHER, null, child, hivePartitionKeys); } /** @@ -67,25 +73,37 @@ public class PhysicalHiveTableSink extends PhysicalSink public PhysicalHiveTableSink(HMSExternalDatabase database, HMSExternalTable targetTable, List cols, - List partitionIds, List outputExprs, Optional groupExpression, LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics, - CHILD_TYPE child) { + CHILD_TYPE child, + Set hivePartitionKeys) { super(PlanType.PHYSICAL_HIVE_TABLE_SINK, outputExprs, groupExpression, logicalProperties, physicalProperties, statistics, child); this.database = Objects.requireNonNull(database, "database != null in PhysicalHiveTableSink"); this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in PhysicalHiveTableSink"); this.cols = Utils.copyRequiredList(cols); - this.partitionIds = Utils.copyRequiredList(partitionIds); + this.hivePartitionKeys = hivePartitionKeys; + } + + public HMSExternalDatabase getDatabase() { + return database; + } + + public HMSExternalTable getTargetTable() { + return targetTable; + } + + public List getCols() { + return cols; } @Override public Plan withChildren(List children) { - return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, groupExpression, - getLogicalProperties(), physicalProperties, statistics, children.get(0)); + return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, groupExpression, + getLogicalProperties(), physicalProperties, statistics, children.get(0), hivePartitionKeys); } @Override @@ -100,20 +118,48 @@ public class PhysicalHiveTableSink extends PhysicalSink @Override public Plan withGroupExpression(Optional groupExpression) { - return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - groupExpression, getLogicalProperties(), child()); + return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), child(), hivePartitionKeys); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { - return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - groupExpression, logicalProperties.get(), children.get(0)); + return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, + groupExpression, logicalProperties.get(), children.get(0), hivePartitionKeys); } @Override public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { - return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child(), hivePartitionKeys); + } + + /** + * get output physical properties + */ + @Override + public PhysicalProperties getRequirePhysicalProperties() { + Set hivePartitionKeys = targetTable.getRemoteTable() + .getPartitionKeys().stream() + .map(FieldSchema::getName) + .collect(Collectors.toSet()); + if (!hivePartitionKeys.isEmpty()) { + List columnIdx = new ArrayList<>(); + List fullSchema = targetTable.getFullSchema(); + for (int i = 0; i < fullSchema.size(); i++) { + Column column = fullSchema.get(i); + if (hivePartitionKeys.contains(column.getName())) { + columnIdx.add(i); + } + } + List exprIds = columnIdx.stream() + .map(idx -> child().getOutput().get(idx).getExprId()) + .collect(Collectors.toList()); + DistributionSpecTableSinkHashPartitioned shuffleInfo = new DistributionSpecTableSinkHashPartitioned(); + shuffleInfo.setOutputColExprIds(exprIds); + return new PhysicalProperties(shuffleInfo); + } + return PhysicalProperties.SINK_RANDOM_PARTITIONED; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java index 17cf97106e..ed1d6a3a3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java @@ -39,11 +39,11 @@ public abstract class PhysicalSink extends PhysicalUnar protected final List outputExprs; public PhysicalSink(PlanType type, - List outputExprs, - Optional groupExpression, - LogicalProperties logicalProperties, - @Nullable PhysicalProperties physicalProperties, - Statistics statistics, CHILD_TYPE child) { + List outputExprs, + Optional groupExpression, + LogicalProperties logicalProperties, + @Nullable PhysicalProperties physicalProperties, + Statistics statistics, CHILD_TYPE child) { super(type, groupExpression, logicalProperties, physicalProperties, statistics, child); this.outputExprs = ImmutableList.copyOf(Objects.requireNonNull(outputExprs, "outputExprs should not null")); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java index 5932c4f099..7feb53e24b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTableSink.java @@ -42,4 +42,6 @@ public abstract class PhysicalTableSink extends Physica Statistics statistics, CHILD_TYPE child) { super(type, outputExprs, groupExpression, logicalProperties, physicalProperties, statistics, child); } + + public abstract PhysicalProperties getRequirePhysicalProperties(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java index b88cd910a3..26c9d52698 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/SinkVisitor.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.visitor; +import org.apache.doris.nereids.analyzer.UnboundHiveTableSink; import org.apache.doris.nereids.analyzer.UnboundResultSink; import org.apache.doris.nereids.analyzer.UnboundTableSink; import org.apache.doris.nereids.trees.plans.Plan; @@ -56,6 +57,10 @@ public interface SinkVisitor { return visitLogicalSink(unboundTableSink, context); } + default R visitUnboundHiveTableSink(UnboundHiveTableSink unboundTableSink, C context) { + return visitLogicalSink(unboundTableSink, context); + } + default R visitUnboundResultSink(UnboundResultSink unboundResultSink, C context) { return visitLogicalSink(unboundResultSink, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java index 4278f78ec5..a625e4490e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/RelationUtil.java @@ -77,7 +77,7 @@ public class RelationUtil { /** * get database and table */ - public static Pair getDbAndTable(List qualifierName, Env env) { + public static Pair, TableIf> getDbAndTable(List qualifierName, Env env) { String catalogName = qualifierName.get(0); String dbName = qualifierName.get(1); String tableName = qualifierName.get(2); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java index 9c5c375a35..9c6ba83408 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java @@ -58,6 +58,7 @@ public class DataPartition { Preconditions.checkState(!exprs.isEmpty()); Preconditions.checkState(type == TPartitionType.HASH_PARTITIONED || type == TPartitionType.RANGE_PARTITIONED + || type == TPartitionType.TABLE_SINK_HASH_PARTITIONED || type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED); this.type = type; this.partitionExprs = ImmutableList.copyOf(exprs); @@ -66,6 +67,7 @@ public class DataPartition { public DataPartition(TPartitionType type) { Preconditions.checkState(type == TPartitionType.UNPARTITIONED || type == TPartitionType.RANDOM + || type == TPartitionType.TABLE_SINK_RANDOM_PARTITIONED || type == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED); this.type = type; this.partitionExprs = ImmutableList.of(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java index 99d0c6b1b0..5dce86b333 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java @@ -20,17 +20,44 @@ package org.apache.doris.planner; -import org.apache.doris.analysis.Analyzer; +import org.apache.doris.catalog.Column; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.LocationPath; +import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalTable; +import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TDataSink; +import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TFileCompressType; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.THiveBucket; +import org.apache.doris.thrift.THiveColumn; +import org.apache.doris.thrift.THiveColumnType; +import org.apache.doris.thrift.THiveLocationParams; +import org.apache.doris.thrift.THivePartition; +import org.apache.doris.thrift.THiveTableSink; + +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; public class HiveTableSink extends DataSink { + private HMSExternalTable targetTable; protected TDataSink tDataSink; - public HiveTableSink(HMSExternalTable table) { + public HiveTableSink(HMSExternalTable targetTable) { super(); + this.targetTable = targetTable; } @Override @@ -59,9 +86,146 @@ public class HiveTableSink extends DataSink { return DataPartition.RANDOM; } - public void init() { + /** + * check sink params and generate thrift data sink to BE + * @param insertCols target table columns + * @param insertCtx insert info context + * @throws AnalysisException if source file format cannot be read + */ + public void bindDataSink(List insertCols, Optional insertCtx) + throws AnalysisException { + THiveTableSink tSink = new THiveTableSink(); + tSink.setDbName(targetTable.getDbName()); + tSink.setTableName(targetTable.getName()); + Set partNames = new HashSet<>(targetTable.getPartitionColumnNames()); + Set colNames = targetTable.getColumns() + .stream().map(Column::getName) + .collect(Collectors.toSet()); + colNames.removeAll(partNames); + List targetColumns = new ArrayList<>(); + for (Column col : insertCols) { + if (partNames.contains(col.getName())) { + THiveColumn tHiveColumn = new THiveColumn(); + tHiveColumn.setName(col.getName()); + tHiveColumn.setDataType(col.getType().toThrift()); + tHiveColumn.setColumnType(THiveColumnType.PARTITION_KEY); + targetColumns.add(tHiveColumn); + } else if (colNames.contains(col.getName())) { + THiveColumn tHiveColumn = new THiveColumn(); + tHiveColumn.setName(col.getName()); + tHiveColumn.setDataType(col.getType().toThrift()); + tHiveColumn.setColumnType(THiveColumnType.REGULAR); + targetColumns.add(tHiveColumn); + } + } + tSink.setColumns(targetColumns); + + setPartitionValues(tSink); + + StorageDescriptor sd = targetTable.getRemoteTable().getSd(); + THiveBucket bucketInfo = new THiveBucket(); + bucketInfo.setBucketedBy(sd.getBucketCols()); + bucketInfo.setBucketCount(sd.getNumBuckets()); + tSink.setBucketInfo(bucketInfo); + + TFileFormatType formatType = getFileFormatType(sd); + tSink.setFileFormat(formatType); + setCompressType(tSink, formatType); + + THiveLocationParams locationParams = new THiveLocationParams(); + String location = sd.getLocation(); + + String writeTempPath = createTempPath(location); + locationParams.setWritePath(writeTempPath); + locationParams.setTargetPath(location); + locationParams.setFileType(LocationPath.getTFileTypeForBE(location)); + tSink.setLocation(locationParams); + + tSink.setHadoopConfig(targetTable.getHadoopProperties()); + + if (insertCtx.isPresent()) { + HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get(); + tSink.setOverwrite(context.isOverwrite()); + } + tDataSink = new TDataSink(getDataSinkType()); + tDataSink.setHiveTableSink(tSink); } - public void complete(Analyzer analyzer) { + private String createTempPath(String location) { + String user = ConnectContext.get().getUserIdentity().getUser(); + return location + "/.doris_staging/" + user + "/" + UUID.randomUUID().toString().replace("-", ""); + } + + private void setCompressType(THiveTableSink tSink, TFileFormatType formatType) { + String compressType; + switch (formatType) { + case FORMAT_ORC: + compressType = targetTable.getRemoteTable().getParameters().get("orc.compress"); + break; + case FORMAT_PARQUET: + compressType = targetTable.getRemoteTable().getParameters().get("parquet.compression"); + break; + default: + compressType = "uncompressed"; + break; + } + + if ("snappy".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.SNAPPYBLOCK); + } else if ("lz4".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.LZ4BLOCK); + } else if ("lzo".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.LZO); + } else if ("zlib".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.ZLIB); + } else if ("zstd".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.ZSTD); + } else if ("uncompressed".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.PLAIN); + } else { + // try to use plain type to decompress parquet or orc file + tSink.setCompressionType(TFileCompressType.PLAIN); + } + } + + private void setPartitionValues(THiveTableSink tSink) throws AnalysisException { + List partitions = new ArrayList<>(); + List hivePartitions = + ((HMSExternalCatalog) targetTable.getCatalog()) + .getClient().listPartitions(targetTable.getDbName(), targetTable.getName()); + for (org.apache.hadoop.hive.metastore.api.Partition partition : hivePartitions) { + THivePartition hivePartition = new THivePartition(); + StorageDescriptor sd = partition.getSd(); + hivePartition.setFileFormat(getFileFormatType(sd)); + + hivePartition.setValues(partition.getValues()); + THiveLocationParams locationParams = new THiveLocationParams(); + String location = sd.getLocation(); + // pass the same of write path and target path to partition + locationParams.setWritePath(location); + locationParams.setTargetPath(location); + locationParams.setFileType(LocationPath.getTFileTypeForBE(location)); + hivePartition.setLocation(locationParams); + partitions.add(hivePartition); + } + tSink.setPartitions(partitions); + } + + private TFileFormatType getFileFormatType(StorageDescriptor sd) throws AnalysisException { + TFileFormatType fileFormatType; + if (sd.getInputFormat().toLowerCase().contains("orc")) { + fileFormatType = TFileFormatType.FORMAT_ORC; + } else if (sd.getInputFormat().toLowerCase().contains("parquet")) { + fileFormatType = TFileFormatType.FORMAT_PARQUET; + } else if (sd.getInputFormat().toLowerCase().contains("text")) { + fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN; + } else { + throw new AnalysisException("Unsupported input format type: " + sd.getInputFormat()); + } + return fileFormatType; + } + + protected TDataSinkType getDataSinkType() { + return TDataSinkType.HIVE_TABLE_SINK; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java index 5f1abf12e6..2316e65bf6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java @@ -54,9 +54,7 @@ public class HmsCommitTest { private static final String tbWithoutPartition = "test_tb_without_partition"; private static Path warehousePath; static String dbLocation; - private String inputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; - private String outputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"; - private String serde = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; + private String fileFormat = "orc"; @BeforeClass public static void beforeClass() throws Throwable { @@ -103,11 +101,11 @@ public class HmsCommitTest { partitionKeys.add(new FieldSchema("c3", "string", "comment")); HiveTableMetadata tableMetadata = new HiveTableMetadata( dbName, tbWithPartition, columns, partitionKeys, - new HashMap<>(), inputFormat, outputFormat, serde); + new HashMap<>(), fileFormat); hmsClient.createTable(tableMetadata, true); HiveTableMetadata tableMetadata2 = new HiveTableMetadata( dbName, tbWithoutPartition, columns, new ArrayList<>(), - new HashMap<>(), inputFormat, outputFormat, serde); + new HashMap<>(), fileFormat); hmsClient.createTable(tableMetadata2, true); }