[feature](insert)implement hive table sink plan (#31765) (#32386)

from #31765
This commit is contained in:
slothever
2024-03-18 22:49:30 +08:00
committed by GitHub
parent ef2151ae66
commit 711c0cd55c
42 changed files with 1279 additions and 515 deletions

View File

@ -21,6 +21,7 @@ import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropDbStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
@ -176,6 +177,8 @@ public interface CatalogIf<T extends DatabaseIf> {
return log;
}
TableName getTableNameByTableId(Long tableId);
// Return a copy of all db collection.
Collection<DatabaseIf<? extends TableIf>> getAllDbs();

View File

@ -21,6 +21,7 @@ import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.DropDbStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
@ -395,6 +396,16 @@ public abstract class ExternalCatalog
}
}
public TableName getTableNameByTableId(Long tableId) {
for (DatabaseIf<?> db : idToDb.values()) {
TableIf table = db.getTableNullable(tableId);
if (table != null) {
return new TableName(getName(), db.getFullName(), table.getName());
}
}
return null;
}
@Override
public String getResource() {
return catalogProperty.getResource();

View File

@ -307,7 +307,7 @@ public class InternalCatalog implements CatalogIf<Database> {
for (Database db : fullNameToDb.values()) {
Table table = db.getTableNullable(tableId);
if (table != null) {
return new TableName("", db.getFullName(), table.getName());
return new TableName(INTERNAL_CATALOG_NAME, db.getFullName(), table.getName());
}
}
return null;

View File

@ -51,6 +51,8 @@ public interface HMSCachedClient {
List<String> listPartitionNames(String dbName, String tblName);
List<Partition> listPartitions(String dbName, String tblName);
List<String> listPartitionNames(String dbName, String tblName, long maxListPartitionNum);
Partition getPartition(String dbName, String tblName, List<String> partitionValues);

View File

@ -125,17 +125,13 @@ public class HiveMetadataOps implements ExternalMetadataOps {
}
try {
Map<String, String> props = stmt.getExtProperties();
String inputFormat = props.getOrDefault("input_format", Config.hive_default_input_format);
String outputFormat = props.getOrDefault("output_format", Config.hive_default_output_format);
String serDe = props.getOrDefault("serde", Config.hive_default_serde);
String fileFormat = props.getOrDefault("file_format", Config.hive_default_file_format);
HiveTableMetadata catalogTable = HiveTableMetadata.of(dbName,
tblName,
stmt.getColumns(),
parsePartitionKeys(props),
props,
inputFormat,
outputFormat,
serDe);
fileFormat);
client.createTable(catalogTable, stmt.isSetIfNotExists());
db.setUnInitialized(true);

View File

@ -30,9 +30,7 @@ public class HiveTableMetadata implements TableMetadata {
private String tableName;
private List<Column> columns;
private List<FieldSchema> partitionKeys;
private String inputFormat;
private String outputFormat;
private String serDe;
private String fileFormat;
private Map<String, String> properties;
// private String viewSql;
@ -41,16 +39,12 @@ public class HiveTableMetadata implements TableMetadata {
List<Column> columns,
List<FieldSchema> partitionKeys,
Map<String, String> props,
String inputFormat,
String outputFormat,
String serDe) {
String fileFormat) {
this.dbName = dbName;
this.tableName = tblName;
this.columns = columns;
this.partitionKeys = partitionKeys;
this.inputFormat = inputFormat;
this.outputFormat = outputFormat;
this.serDe = serDe;
this.fileFormat = fileFormat;
this.properties = props;
}
@ -77,16 +71,8 @@ public class HiveTableMetadata implements TableMetadata {
return partitionKeys;
}
public String getInputFormat() {
return inputFormat;
}
public String getOutputFormat() {
return outputFormat;
}
public String getSerDe() {
return serDe;
public String getFileFormat() {
return fileFormat;
}
public static HiveTableMetadata of(String dbName,
@ -94,9 +80,7 @@ public class HiveTableMetadata implements TableMetadata {
List<Column> columns,
List<FieldSchema> partitionKeys,
Map<String, String> props,
String inputFormat,
String outputFormat, String serDe) {
return new HiveTableMetadata(dbName, tblName, columns, partitionKeys, props,
inputFormat, outputFormat, serDe);
String fileFormat) {
return new HiveTableMetadata(dbName, tblName, columns, partitionKeys, props, fileFormat);
}
}

View File

@ -123,6 +123,10 @@ public class PostgreSQLJdbcHMSCachedClient extends JdbcHMSCachedClient {
return listPartitionNames(dbName, tblName, (long) -1);
}
public List<Partition> listPartitions(String dbName, String tblName) {
return getPartitionsByNames(dbName, tblName, ImmutableList.of());
}
@Override
public List<String> listPartitionNames(String dbName, String tblName, long maxListPartitionNum) {
String sql = String.format("SELECT \"PART_NAME\" from \"PARTITIONS\" WHERE \"TBL_ID\" = ("
@ -173,15 +177,26 @@ public class PostgreSQLJdbcHMSCachedClient extends JdbcHMSCachedClient {
private List<Partition> getPartitionsByNames(String dbName, String tblName, List<String> partitionNames) {
List<String> partitionNamesWithQuote = partitionNames.stream().map(partitionName -> "'" + partitionName + "'")
.collect(Collectors.toList());
String partitionNamesString = Joiner.on(", ").join(partitionNamesWithQuote);
String sql = String.format("SELECT \"PART_ID\", \"PARTITIONS\".\"CREATE_TIME\","
+ " \"PARTITIONS\".\"LAST_ACCESS_TIME\","
+ " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM \"PARTITIONS\""
+ " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = \"PARTITIONS\".\"TBL_ID\""
+ " join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\""
+ " WHERE \"DBS\".\"NAME\" = '%s' AND \"TBLS\".\"TBL_NAME\"='%s'"
+ " AND \"PART_NAME\" in (%s);",
dbName, tblName, partitionNamesString);
String sql;
if (partitionNamesWithQuote.isEmpty()) {
sql = String.format("SELECT \"PART_ID\", \"PARTITIONS\".\"CREATE_TIME\","
+ " \"PARTITIONS\".\"LAST_ACCESS_TIME\","
+ " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM \"PARTITIONS\""
+ " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = \"PARTITIONS\".\"TBL_ID\""
+ " join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\""
+ " WHERE \"DBS\".\"NAME\" = '%s' AND \"TBLS\".\"TBL_NAME\"='%s';",
dbName, tblName);
} else {
String partitionNamesString = Joiner.on(", ").join(partitionNamesWithQuote);
sql = String.format("SELECT \"PART_ID\", \"PARTITIONS\".\"CREATE_TIME\","
+ " \"PARTITIONS\".\"LAST_ACCESS_TIME\","
+ " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM \"PARTITIONS\""
+ " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = \"PARTITIONS\".\"TBL_ID\""
+ " join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\""
+ " WHERE \"DBS\".\"NAME\" = '%s' AND \"TBLS\".\"TBL_NAME\"='%s'"
+ " AND \"PART_NAME\" in (%s);",
dbName, tblName, partitionNamesString);
}
if (LOG.isDebugEnabled()) {
LOG.debug("getPartitionsByNames exec sql: {}", sql);
}

View File

@ -201,9 +201,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
// table.setRetention(0);
String location = hiveTable.getProperties().get("external_location");
table.setSd(toHiveStorageDesc(hiveTable.getColumns(),
hiveTable.getInputFormat(),
hiveTable.getOutputFormat(),
hiveTable.getSerDe(),
hiveTable.getFileFormat(),
location));
table.setPartitionKeys(hiveTable.getPartitionKeys());
// table.setViewOriginalText(hiveTable.getViewSql());
@ -213,15 +211,10 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
return table;
}
private static StorageDescriptor toHiveStorageDesc(List<Column> columns, String inputFormat, String outputFormat,
String serDe, String location) {
private static StorageDescriptor toHiveStorageDesc(List<Column> columns, String fileFormat, String location) {
StorageDescriptor sd = new StorageDescriptor();
sd.setCols(toHiveColumns(columns));
SerDeInfo serDeInfo = new SerDeInfo();
serDeInfo.setSerializationLib(serDe);
sd.setSerdeInfo(serDeInfo);
sd.setInputFormat(inputFormat);
sd.setOutputFormat(outputFormat);
setFileFormat(fileFormat, sd);
if (StringUtils.isNotEmpty(location)) {
sd.setLocation(location);
}
@ -231,6 +224,28 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
return sd;
}
private static void setFileFormat(String fileFormat, StorageDescriptor sd) {
String inputFormat;
String outputFormat;
String serDe;
if (fileFormat.equalsIgnoreCase("orc")) {
inputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat";
outputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat";
serDe = "org.apache.hadoop.hive.ql.io.orc.OrcSerde";
} else if (fileFormat.equalsIgnoreCase("parquet")) {
inputFormat = "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat";
outputFormat = "'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat";
serDe = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe";
} else {
throw new IllegalArgumentException("Creating table with an unsupported file format: " + fileFormat);
}
SerDeInfo serDeInfo = new SerDeInfo();
serDeInfo.setSerializationLib(serDe);
sd.setSerdeInfo(serDeInfo);
sd.setInputFormat(inputFormat);
sd.setOutputFormat(outputFormat);
}
private static List<FieldSchema> toHiveColumns(List<Column> columns) {
List<FieldSchema> result = new ArrayList<>();
for (Column column : columns) {
@ -297,6 +312,19 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
return listPartitionNames(dbName, tblName, MAX_LIST_PARTITION_NUM);
}
public List<Partition> listPartitions(String dbName, String tblName) {
try (ThriftHMSClient client = getClient()) {
try {
return ugiDoAs(() -> client.client.listPartitions(dbName, tblName, MAX_LIST_PARTITION_NUM));
} catch (Exception e) {
client.setThrowable(e);
throw e;
}
} catch (Exception e) {
throw new HMSClientException("failed to check if table %s in db %s exists", e, tblName, dbName);
}
}
@Override
public List<String> listPartitionNames(String dbName, String tblName, long maxListPartitionNum) {
// list all parts when the limit is greater than the short maximum

View File

@ -0,0 +1,149 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.analyzer;
import org.apache.doris.nereids.exceptions.UnboundException;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.UnboundLogicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.BlockFuncDepsPropagation;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* Represent an hive table sink plan node that has not been bound.
*/
public class UnboundHiveTableSink<CHILD_TYPE extends Plan> extends UnboundLogicalSink<CHILD_TYPE>
implements Unbound, Sink, BlockFuncDepsPropagation {
private final List<String> hints;
private final List<String> partitions;
public UnboundHiveTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, DMLCommandType.NONE,
Optional.empty(), Optional.empty(), child);
}
/**
* constructor
*/
public UnboundHiveTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions,
DMLCommandType dmlCommandType,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(nameParts, PlanType.LOGICAL_UNBOUND_HIVE_TABLE_SINK, ImmutableList.of(), groupExpression,
logicalProperties, colNames, dmlCommandType, child);
this.hints = Utils.copyRequiredList(hints);
this.partitions = Utils.copyRequiredList(partitions);
}
public List<String> getColNames() {
return colNames;
}
public List<String> getPartitions() {
return partitions;
}
public List<String> getHints() {
return hints;
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"UnboundHiveTableSink only accepts one child");
return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, Optional.empty(), children.get(0));
}
@Override
public UnboundHiveTableSink<CHILD_TYPE> withOutputExprs(List<NamedExpression> outputExprs) {
throw new UnboundException("could not call withOutputExprs on UnboundHiveTableSink");
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitUnboundHiveTableSink(this, context);
}
@Override
public List<? extends Expression> getExpressions() {
throw new UnsupportedOperationException(this.getClass().getSimpleName() + " don't support getExpression()");
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UnboundHiveTableSink<?> that = (UnboundHiveTableSink<?>) o;
return Objects.equals(nameParts, that.nameParts)
&& Objects.equals(colNames, that.colNames)
&& Objects.equals(hints, that.hints)
&& Objects.equals(partitions, that.partitions);
}
@Override
public int hashCode() {
return Objects.hash(nameParts, colNames, hints, partitions);
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, logicalProperties, children.get(0));
}
@Override
public LogicalProperties computeLogicalProperties() {
return UnboundLogicalProperties.INSTANCE;
}
@Override
public List<Slot> computeOutput() {
throw new UnboundException("output");
}
}

View File

@ -29,7 +29,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
@ -43,11 +43,8 @@ import java.util.Optional;
/**
* Represent an olap table sink plan node that has not been bound.
*/
public class UnboundTableSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD_TYPE>
public class UnboundTableSink<CHILD_TYPE extends Plan> extends UnboundLogicalSink<CHILD_TYPE>
implements Unbound, Sink, BlockFuncDepsPropagation {
private final List<String> nameParts;
private final List<String> colNames;
private final List<String> hints;
private final boolean temporaryPartition;
private final List<String> partitions;
@ -60,31 +57,6 @@ public class UnboundTableSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD
false, DMLCommandType.NONE, Optional.empty(), Optional.empty(), child);
}
public UnboundTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
boolean temporaryPartition, List<String> partitions, CHILD_TYPE child) {
this(nameParts, colNames, hints, temporaryPartition, partitions,
false, DMLCommandType.NONE, Optional.empty(), Optional.empty(), child);
}
public UnboundTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, boolean isPartialUpdate, CHILD_TYPE child) {
this(nameParts, colNames, hints, false, partitions, isPartialUpdate, DMLCommandType.NONE,
Optional.empty(), Optional.empty(), child);
}
public UnboundTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
boolean temporaryPartition, List<String> partitions, boolean isPartialUpdate, CHILD_TYPE child) {
this(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, DMLCommandType.NONE,
Optional.empty(), Optional.empty(), child);
}
public UnboundTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
boolean temporaryPartition, List<String> partitions,
boolean isPartialUpdate, DMLCommandType dmlCommandType, CHILD_TYPE child) {
this(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, dmlCommandType,
Optional.empty(), Optional.empty(), child);
}
/**
* constructor
*/
@ -93,9 +65,8 @@ public class UnboundTableSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD
boolean isPartialUpdate, DMLCommandType dmlCommandType,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, logicalProperties, child);
this.nameParts = Utils.copyRequiredList(nameParts);
this.colNames = Utils.copyRequiredList(colNames);
super(nameParts, PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression,
logicalProperties, colNames, dmlCommandType, child);
this.hints = Utils.copyRequiredList(hints);
this.temporaryPartition = temporaryPartition;
this.partitions = Utils.copyRequiredList(partitions);
@ -103,14 +74,6 @@ public class UnboundTableSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD
this.dmlCommandType = dmlCommandType;
}
public List<String> getColNames() {
return colNames;
}
public List<String> getNameParts() {
return nameParts;
}
public boolean isTemporaryPartition() {
return temporaryPartition;
}
@ -127,10 +90,6 @@ public class UnboundTableSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD
return isPartialUpdate;
}
public DMLCommandType getDMLCommandType() {
return dmlCommandType;
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "UnboundOlapTableSink only accepts one child");

View File

@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.analyzer;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.qe.ConnectContext;
import java.util.List;
import java.util.Optional;
/**
* Create unbound table sink
*/
public class UnboundTableSinkCreator {
/**
* create unbound sink without DML command
*/
public static LogicalSink<? extends Plan> createUnboundTableSink(List<String> nameParts,
List<String> colNames, List<String> hints, List<String> partitions, Plan query)
throws UserException {
String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0);
CatalogIf<?> curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
if (curCatalog instanceof InternalCatalog) {
return new UnboundTableSink<>(nameParts, colNames, hints, partitions, query);
} else if (curCatalog instanceof HMSExternalCatalog) {
return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, query);
}
throw new UserException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported.");
}
/**
* create unbound sink for DML plan
*/
public static LogicalSink<? extends Plan> createUnboundTableSink(List<String> nameParts,
List<String> colNames, List<String> hints, boolean temporaryPartition, List<String> partitions,
boolean isPartialUpdate, DMLCommandType dmlCommandType, LogicalPlan plan) {
String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0);
CatalogIf<?> curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
if (curCatalog instanceof InternalCatalog) {
return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions,
isPartialUpdate, dmlCommandType, Optional.empty(),
Optional.empty(), plan);
} else if (curCatalog instanceof HMSExternalCatalog) {
return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
}
throw new RuntimeException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported.");
}
}

View File

@ -71,6 +71,8 @@ import org.apache.doris.nereids.properties.DistributionSpecHash;
import org.apache.doris.nereids.properties.DistributionSpecReplicated;
import org.apache.doris.nereids.properties.DistributionSpecStorageAny;
import org.apache.doris.nereids.properties.DistributionSpecStorageGather;
import org.apache.doris.nereids.properties.DistributionSpecTableSinkHashPartitioned;
import org.apache.doris.nereids.properties.DistributionSpecTableSinkRandomPartitioned;
import org.apache.doris.nereids.properties.DistributionSpecTabletIdShuffle;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.properties.PhysicalProperties;
@ -163,6 +165,7 @@ import org.apache.doris.planner.ExceptNode;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.HashJoinNode;
import org.apache.doris.planner.HashJoinNode.DistributionMode;
import org.apache.doris.planner.HiveTableSink;
import org.apache.doris.planner.IntersectNode;
import org.apache.doris.planner.JoinNodeBase;
import org.apache.doris.planner.MultiCastDataSink;
@ -432,6 +435,20 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
public PlanFragment visitPhysicalHiveTableSink(PhysicalHiveTableSink<? extends Plan> hiveTableSink,
PlanTranslatorContext context) {
PlanFragment rootFragment = hiveTableSink.child().accept(this, context);
rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
TupleDescriptor hiveTuple = context.generateTupleDesc();
List<Column> targetTableColumns = hiveTableSink.getTargetTable().getFullSchema();
for (Column column : targetTableColumns) {
SlotDescriptor slotDesc = context.addSlotDesc(hiveTuple);
slotDesc.setIsMaterialized(true);
slotDesc.setType(column.getType());
slotDesc.setColumn(column);
slotDesc.setIsNullable(column.isAllowNull());
slotDesc.setAutoInc(column.isAutoInc());
}
HiveTableSink sink = new HiveTableSink(hiveTableSink.getTargetTable());
rootFragment.setSink(sink);
return rootFragment;
}
@ -2568,6 +2585,19 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
return new DataPartition(partitionType, partitionExprs);
} else if (distributionSpec instanceof DistributionSpecTabletIdShuffle) {
return DataPartition.TABLET_ID;
} else if (distributionSpec instanceof DistributionSpecTableSinkHashPartitioned) {
DistributionSpecTableSinkHashPartitioned partitionSpecHash =
(DistributionSpecTableSinkHashPartitioned) distributionSpec;
List<Expr> partitionExprs = Lists.newArrayList();
List<ExprId> partitionExprIds = partitionSpecHash.getOutputColExprIds();
for (ExprId partitionExprId : partitionExprIds) {
if (childOutputIds.contains(partitionExprId)) {
partitionExprs.add(context.findSlotRef(partitionExprId));
}
}
return new DataPartition(TPartitionType.TABLE_SINK_HASH_PARTITIONED, partitionExprs);
} else if (distributionSpec instanceof DistributionSpecTableSinkRandomPartitioned) {
return new DataPartition(TPartitionType.TABLE_SINK_RANDOM_PARTITIONED);
} else {
throw new RuntimeException("Unknown DistributionSpec: " + distributionSpec);
}

View File

@ -205,7 +205,7 @@ import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundStar;
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator;
import org.apache.doris.nereids.analyzer.UnboundVariable;
import org.apache.doris.nereids.analyzer.UnboundVariable.VariableType;
import org.apache.doris.nereids.exceptions.AnalysisException;
@ -421,6 +421,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
import org.apache.doris.nereids.trees.plans.logical.LogicalSelectHint;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
@ -511,10 +512,11 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
boolean isOverwrite = ctx.INTO() == null;
ImmutableList.Builder<String> tableName = ImmutableList.builder();
if (null != ctx.tableName) {
tableName.addAll(visitMultipartIdentifier(ctx.tableName));
List<String> nameParts = visitMultipartIdentifier(ctx.tableName);
tableName.addAll(nameParts);
} else if (null != ctx.tableId) {
// process group commit insert table command send by be
TableName name = Env.getCurrentEnv().getInternalCatalog()
TableName name = Env.getCurrentEnv().getCurrentCatalog()
.getTableNameByTableId(Long.valueOf(ctx.tableId.getText()));
tableName.add(name.getDb());
tableName.add(name.getTbl());
@ -526,7 +528,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
// TODO visit partitionSpecCtx
Pair<Boolean, List<String>> partitionSpec = visitPartitionSpec(ctx.partitionSpec());
LogicalPlan plan = visitQuery(ctx.query());
UnboundTableSink<?> sink = new UnboundTableSink<>(
LogicalSink<?> sink = UnboundTableSinkCreator.createUnboundTableSink(
tableName.build(),
colNames,
ImmutableList.of(),

View File

@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.properties;
import org.apache.doris.nereids.trees.expressions.ExprId;
import java.util.List;
/**
* use for shuffle data by partition keys before sink.
*/
public class DistributionSpecTableSinkHashPartitioned extends DistributionSpec {
public static final DistributionSpecTableSinkHashPartitioned INSTANCE =
new DistributionSpecTableSinkHashPartitioned();
private List<ExprId> outputColExprIds;
public DistributionSpecTableSinkHashPartitioned() {
super();
}
public List<ExprId> getOutputColExprIds() {
return outputColExprIds;
}
public void setOutputColExprIds(List<ExprId> outputColExprIds) {
this.outputColExprIds = outputColExprIds;
}
@Override
public boolean satisfy(DistributionSpec other) {
return other instanceof DistributionSpecTableSinkHashPartitioned;
}
}

View File

@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.properties;
/**
* use for Round Robin by data sink.
*/
public class DistributionSpecTableSinkRandomPartitioned extends DistributionSpec {
public static final DistributionSpecTableSinkRandomPartitioned INSTANCE =
new DistributionSpecTableSinkRandomPartitioned();
private DistributionSpecTableSinkRandomPartitioned() {
super();
}
@Override
public boolean satisfy(DistributionSpec other) {
return other instanceof DistributionSpecTableSinkRandomPartitioned;
}
}

View File

@ -49,6 +49,9 @@ public class PhysicalProperties {
public static PhysicalProperties TABLET_ID_SHUFFLE
= new PhysicalProperties(DistributionSpecTabletIdShuffle.INSTANCE);
public static PhysicalProperties SINK_RANDOM_PARTITIONED
= new PhysicalProperties(DistributionSpecTableSinkRandomPartitioned.INSTANCE);
private final OrderSpec orderSpec;
private final DistributionSpec distributionSpec;

View File

@ -38,6 +38,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeRes
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
@ -129,6 +130,16 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> {
return null;
}
@Override
public Void visitPhysicalHiveTableSink(PhysicalHiveTableSink<? extends Plan> hiveTableSink, PlanContext context) {
if (connectContext != null && !connectContext.getSessionVariable().enableStrictConsistencyDml) {
addRequestPropertyToChildren(PhysicalProperties.ANY);
} else {
addRequestPropertyToChildren(hiveTableSink.getRequirePhysicalProperties());
}
return null;
}
@Override
public Void visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);

View File

@ -30,7 +30,7 @@ public enum RuleType {
// **** make sure BINDING_UNBOUND_LOGICAL_PLAN is the lowest priority in the rewrite rules. ****
BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_EXTERNAL_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_FILE(RuleTypeClass.REWRITE),
BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE),

View File

@ -27,11 +27,15 @@ import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.pattern.MatchingContext;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
@ -47,6 +51,7 @@ import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
@ -61,10 +66,12 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
@ -75,265 +82,7 @@ public class BindSink implements AnalysisRuleFactory {
@Override
public List<Rule> buildRules() {
return ImmutableList.of(
RuleType.BINDING_INSERT_TARGET_TABLE.build(unboundTableSink().thenApply(ctx -> {
UnboundTableSink<?> sink = ctx.root;
Pair<Database, OlapTable> pair = bind(ctx.cascadesContext, sink);
Database database = pair.first;
OlapTable table = pair.second;
boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType() == KeysType.UNIQUE_KEYS;
LogicalPlan child = ((LogicalPlan) sink.child());
boolean childHasSeqCol = child.getOutput().stream()
.anyMatch(slot -> slot.getName().equals(Column.SEQUENCE_COL));
boolean needExtraSeqCol = isPartialUpdate && !childHasSeqCol && table.hasSequenceCol()
&& table.getSequenceMapCol() != null
&& sink.getColNames().contains(table.getSequenceMapCol());
Pair<List<Column>, Integer> bindColumnsResult =
bindTargetColumns(table, sink.getColNames(), childHasSeqCol, needExtraSeqCol);
List<Column> bindColumns = bindColumnsResult.first;
int extraColumnsNum = bindColumnsResult.second;
LogicalOlapTableSink<?> boundSink = new LogicalOlapTableSink<>(
database,
table,
bindColumns,
bindPartitionIds(table, sink.getPartitions(), sink.isTemporaryPartition()),
child.getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList()),
isPartialUpdate,
sink.getDMLCommandType(),
child);
// we need to insert all the columns of the target table
// although some columns are not mentions.
// so we add a projects to supply the default value.
if (boundSink.getCols().size() != child.getOutput().size() + extraColumnsNum) {
throw new AnalysisException("insert into cols should be corresponding to the query output");
}
try {
// For Unique Key table with sequence column (which default value is not CURRENT_TIMESTAMP),
// user MUST specify the sequence column while inserting data
//
// case1: create table by `function_column.sequence_col`
// a) insert with column list, must include the sequence map column
// b) insert without column list, already contains the column, don't need to check
// case2: create table by `function_column.sequence_type`
// a) insert with column list, must include the hidden column __DORIS_SEQUENCE_COL__
// b) insert without column list, don't include the hidden column __DORIS_SEQUENCE_COL__
// by default, will fail.
if (table.hasSequenceCol()) {
boolean haveInputSeqCol = false;
Optional<Column> seqColInTable = Optional.empty();
if (table.getSequenceMapCol() != null) {
if (!sink.getColNames().isEmpty()) {
if (sink.getColNames().stream()
.anyMatch(c -> c.equalsIgnoreCase(table.getSequenceMapCol()))) {
haveInputSeqCol = true; // case1.a
}
} else {
haveInputSeqCol = true; // case1.b
}
seqColInTable = table.getFullSchema().stream()
.filter(col -> col.getName().equalsIgnoreCase(table.getSequenceMapCol()))
.findFirst();
} else {
if (!sink.getColNames().isEmpty()) {
if (sink.getColNames().stream()
.anyMatch(c -> c.equalsIgnoreCase(Column.SEQUENCE_COL))) {
haveInputSeqCol = true; // case2.a
} // else case2.b
}
}
// Don't require user to provide sequence column for partial updates,
// including the following cases:
// 1. it's a load job with `partial_columns=true`
// 2. UPDATE and DELETE, planner will automatically add these hidden columns
if (!haveInputSeqCol && !isPartialUpdate && (
boundSink.getDmlCommandType() != DMLCommandType.UPDATE
&& boundSink.getDmlCommandType() != DMLCommandType.DELETE)) {
if (!seqColInTable.isPresent() || seqColInTable.get().getDefaultValue() == null
|| !seqColInTable.get().getDefaultValue()
.equalsIgnoreCase(DefaultValue.CURRENT_TIMESTAMP)) {
throw new org.apache.doris.common.AnalysisException("Table " + table.getName()
+ " has sequence column, need to specify the sequence column");
}
}
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
// we need to insert all the columns of the target table
// although some columns are not mentions.
// so we add a projects to supply the default value.
Map<Column, NamedExpression> columnToChildOutput = Maps.newHashMap();
for (int i = 0; i < child.getOutput().size(); ++i) {
columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i));
}
Map<String, NamedExpression> columnToOutput = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
NereidsParser expressionParser = new NereidsParser();
// generate slots not mentioned in sql, mv slots and shaded slots.
for (Column column : boundSink.getTargetTable().getFullSchema()) {
if (column.isMaterializedViewColumn()) {
List<SlotRef> refs = column.getRefColumns();
// now we have to replace the column to slots.
Preconditions.checkArgument(refs != null,
"mv column %s 's ref column cannot be null", column);
Expression parsedExpression = expressionParser.parseExpression(
column.getDefineExpr().toSqlWithoutTbl());
Expression boundSlotExpression = SlotReplacer.INSTANCE
.replace(parsedExpression, columnToOutput);
// the boundSlotExpression is an expression whose slots are bound but function
// may not be bound, we have to bind it again.
// for example: to_bitmap.
Expression boundExpression = FunctionBinder.INSTANCE.rewrite(
boundSlotExpression, new ExpressionRewriteContext(ctx.cascadesContext));
if (boundExpression instanceof Alias) {
boundExpression = ((Alias) boundExpression).child();
}
NamedExpression slot = new Alias(boundExpression, column.getDefineExpr().toSqlWithoutTbl());
columnToOutput.put(column.getName(), slot);
} else if (columnToChildOutput.containsKey(column)
// do not process explicitly use DEFAULT value here:
// insert into table t values(DEFAULT)
&& !(columnToChildOutput.get(column) instanceof DefaultValueSlot)) {
columnToOutput.put(column.getName(), columnToChildOutput.get(column));
} else {
if (table.hasSequenceCol()
&& column.getName().equals(Column.SEQUENCE_COL)
&& table.getSequenceMapCol() != null) {
Optional<Column> seqCol = table.getFullSchema().stream()
.filter(col -> col.getName().equals(table.getSequenceMapCol()))
.findFirst();
if (!seqCol.isPresent()) {
throw new AnalysisException("sequence column is not contained in"
+ " target table " + table.getName());
}
if (columnToOutput.get(seqCol.get().getName()) != null) {
// should generate diff exprId for seq column
NamedExpression seqColumn = columnToOutput.get(seqCol.get().getName());
if (seqColumn instanceof Alias) {
seqColumn = new Alias(((Alias) seqColumn).child(), column.getName());
} else {
seqColumn = new Alias(seqColumn, column.getName());
}
columnToOutput.put(column.getName(), seqColumn);
}
} else if (isPartialUpdate) {
// If the current load is a partial update, the values of unmentioned
// columns will be filled in SegmentWriter. And the output of sink node
// should not contain these unmentioned columns, so we just skip them.
// But if the column has 'on update value', we should unconditionally
// update the value of the column to the current timestamp whenever there
// is an update on the row
if (column.hasOnUpdateDefaultValue()) {
Expression defualtValueExpression = FunctionBinder.INSTANCE.rewrite(
new NereidsParser().parseExpression(
column.getOnUpdateDefaultValueExpr().toSqlWithoutTbl()),
new ExpressionRewriteContext(ctx.cascadesContext));
columnToOutput.put(column.getName(),
new Alias(defualtValueExpression, column.getName()));
} else {
continue;
}
} else if (column.getDefaultValue() == null) {
// throw exception if explicitly use Default value but no default value present
// insert into table t values(DEFAULT)
if (columnToChildOutput.get(column) instanceof DefaultValueSlot) {
throw new AnalysisException("Column has no default value,"
+ " column=" + column.getName());
}
// Otherwise, the unmentioned columns should be filled with default values
// or null values
columnToOutput.put(column.getName(), new Alias(
new NullLiteral(DataType.fromCatalogType(column.getType())),
column.getName()
));
} else {
try {
// it comes from the original planner, if default value expression is
// null, we use the literal string of the default value, or it may be
// default value function, like CURRENT_TIMESTAMP.
if (column.getDefaultValueExpr() == null) {
columnToOutput.put(column.getName(),
new Alias(Literal.of(column.getDefaultValue())
.checkedCastTo(DataType.fromCatalogType(column.getType())),
column.getName()));
} else {
Expression defualtValueExpression = FunctionBinder.INSTANCE.rewrite(
new NereidsParser().parseExpression(
column.getDefaultValueExpr().toSqlWithoutTbl()),
new ExpressionRewriteContext(ctx.cascadesContext));
if (defualtValueExpression instanceof Alias) {
defualtValueExpression = ((Alias) defualtValueExpression).child();
}
columnToOutput.put(column.getName(),
new Alias(defualtValueExpression, column.getName()));
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
}
}
}
List<NamedExpression> fullOutputExprs = ImmutableList.copyOf(columnToOutput.values());
if (child instanceof LogicalOneRowRelation) {
// remove default value slot in one row relation
child = ((LogicalOneRowRelation) child).withProjects(((LogicalOneRowRelation) child)
.getProjects().stream()
.filter(p -> !(p instanceof DefaultValueSlot))
.collect(ImmutableList.toImmutableList()));
}
LogicalProject<?> fullOutputProject = new LogicalProject<>(fullOutputExprs, child);
// add cast project
List<NamedExpression> castExprs = Lists.newArrayList();
for (int i = 0; i < table.getFullSchema().size(); ++i) {
Column col = table.getFullSchema().get(i);
NamedExpression expr = columnToOutput.get(col.getName());
if (expr == null) {
// If `expr` is null, it means that the current load is a partial update
// and `col` should not be contained in the output of the sink node so
// we skip it.
continue;
}
DataType inputType = expr.getDataType();
DataType targetType = DataType.fromCatalogType(table.getFullSchema().get(i).getType());
Expression castExpr = expr;
// TODO move string like type logic into TypeCoercionUtils#castIfNotSameType
if (isSourceAndTargetStringLikeType(inputType, targetType) && !inputType.equals(targetType)) {
int sourceLength = ((CharacterType) inputType).getLen();
int targetLength = ((CharacterType) targetType).getLen();
if (sourceLength == targetLength) {
castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType);
} else if (sourceLength > targetLength && targetLength >= 0) {
castExpr = new Substring(castExpr, Literal.of(1), Literal.of(targetLength));
} else if (targetType.isStringType()) {
castExpr = new Cast(castExpr, StringType.INSTANCE);
}
} else {
castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType);
}
if (castExpr instanceof NamedExpression) {
castExprs.add(((NamedExpression) castExpr));
} else {
castExprs.add(new Alias(castExpr));
}
}
if (!castExprs.equals(fullOutputExprs)) {
fullOutputProject = new LogicalProject<Plan>(castExprs, fullOutputProject);
}
return boundSink.withChildAndUpdateOutput(fullOutputProject);
})),
RuleType.BINDING_INSERT_TARGET_TABLE.build(unboundTableSink().thenApply(this::bindOlapTableSink)),
RuleType.BINDING_INSERT_FILE.build(
logicalFileSink().when(s -> s.getOutputExprs().isEmpty())
.then(fileSink -> fileSink.withOutputExprs(
@ -342,20 +91,316 @@ public class BindSink implements AnalysisRuleFactory {
.collect(ImmutableList.toImmutableList())))
),
// TODO: bind hive taget table
RuleType.BINDING_INSERT_TARGET_EXTERNAL_TABLE.build(
logicalHiveTableSink().when(s -> s.getOutputExprs().isEmpty())
.then(hiveTableSink -> hiveTableSink.withOutputExprs(
hiveTableSink.child().getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList())))
)
RuleType.BINDING_INSERT_HIVE_TABLE.build(unboundHiveTableSink().thenApply(this::bindHiveTableSink))
);
}
private Plan bindOlapTableSink(MatchingContext<UnboundTableSink<Plan>> ctx) {
UnboundTableSink<?> sink = ctx.root;
Pair<Database, OlapTable> pair = bind(ctx.cascadesContext, sink);
Database database = pair.first;
OlapTable table = pair.second;
boolean isPartialUpdate = sink.isPartialUpdate() && table.getKeysType() == KeysType.UNIQUE_KEYS;
LogicalPlan child = ((LogicalPlan) sink.child());
boolean childHasSeqCol = child.getOutput().stream()
.anyMatch(slot -> slot.getName().equals(Column.SEQUENCE_COL));
boolean needExtraSeqCol = isPartialUpdate && !childHasSeqCol && table.hasSequenceCol()
&& table.getSequenceMapCol() != null
&& sink.getColNames().contains(table.getSequenceMapCol());
Pair<List<Column>, Integer> bindColumnsResult =
bindTargetColumns(table, sink.getColNames(), childHasSeqCol, needExtraSeqCol);
List<Column> bindColumns = bindColumnsResult.first;
int extraColumnsNum = bindColumnsResult.second;
LogicalOlapTableSink<?> boundSink = new LogicalOlapTableSink<>(
database,
table,
bindColumns,
bindPartitionIds(table, sink.getPartitions(), sink.isTemporaryPartition()),
child.getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList()),
isPartialUpdate,
sink.getDMLCommandType(),
child);
// we need to insert all the columns of the target table
// although some columns are not mentions.
// so we add a projects to supply the default value.
if (boundSink.getCols().size() != child.getOutput().size() + extraColumnsNum) {
throw new AnalysisException("insert into cols should be corresponding to the query output");
}
try {
// For Unique Key table with sequence column (which default value is not CURRENT_TIMESTAMP),
// user MUST specify the sequence column while inserting data
//
// case1: create table by `function_column.sequence_col`
// a) insert with column list, must include the sequence map column
// b) insert without column list, already contains the column, don't need to check
// case2: create table by `function_column.sequence_type`
// a) insert with column list, must include the hidden column __DORIS_SEQUENCE_COL__
// b) insert without column list, don't include the hidden column __DORIS_SEQUENCE_COL__
// by default, will fail.
if (table.hasSequenceCol()) {
boolean haveInputSeqCol = false;
Optional<Column> seqColInTable = Optional.empty();
if (table.getSequenceMapCol() != null) {
if (!sink.getColNames().isEmpty()) {
if (sink.getColNames().stream()
.anyMatch(c -> c.equalsIgnoreCase(table.getSequenceMapCol()))) {
haveInputSeqCol = true; // case1.a
}
} else {
haveInputSeqCol = true; // case1.b
}
seqColInTable = table.getFullSchema().stream()
.filter(col -> col.getName().equalsIgnoreCase(table.getSequenceMapCol()))
.findFirst();
} else {
if (!sink.getColNames().isEmpty()) {
if (sink.getColNames().stream()
.anyMatch(c -> c.equalsIgnoreCase(Column.SEQUENCE_COL))) {
haveInputSeqCol = true; // case2.a
} // else case2.b
}
}
// Don't require user to provide sequence column for partial updates,
// including the following cases:
// 1. it's a load job with `partial_columns=true`
// 2. UPDATE and DELETE, planner will automatically add these hidden columns
if (!haveInputSeqCol && !isPartialUpdate && (
boundSink.getDmlCommandType() != DMLCommandType.UPDATE
&& boundSink.getDmlCommandType() != DMLCommandType.DELETE)) {
if (!seqColInTable.isPresent() || seqColInTable.get().getDefaultValue() == null
|| !seqColInTable.get().getDefaultValue()
.equalsIgnoreCase(DefaultValue.CURRENT_TIMESTAMP)) {
throw new org.apache.doris.common.AnalysisException("Table " + table.getName()
+ " has sequence column, need to specify the sequence column");
}
}
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
// we need to insert all the columns of the target table
// although some columns are not mentions.
// so we add a projects to supply the default value.
Map<Column, NamedExpression> columnToChildOutput = Maps.newHashMap();
for (int i = 0; i < child.getOutput().size(); ++i) {
columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i));
}
Map<String, NamedExpression> columnToOutput = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
NereidsParser expressionParser = new NereidsParser();
// generate slots not mentioned in sql, mv slots and shaded slots.
for (Column column : boundSink.getTargetTable().getFullSchema()) {
if (column.isMaterializedViewColumn()) {
List<SlotRef> refs = column.getRefColumns();
// now we have to replace the column to slots.
Preconditions.checkArgument(refs != null,
"mv column %s 's ref column cannot be null", column);
Expression parsedExpression = expressionParser.parseExpression(
column.getDefineExpr().toSqlWithoutTbl());
Expression boundSlotExpression = SlotReplacer.INSTANCE
.replace(parsedExpression, columnToOutput);
// the boundSlotExpression is an expression whose slots are bound but function
// may not be bound, we have to bind it again.
// for example: to_bitmap.
Expression boundExpression = FunctionBinder.INSTANCE.rewrite(
boundSlotExpression, new ExpressionRewriteContext(ctx.cascadesContext));
if (boundExpression instanceof Alias) {
boundExpression = ((Alias) boundExpression).child();
}
NamedExpression slot = new Alias(boundExpression, column.getDefineExpr().toSqlWithoutTbl());
columnToOutput.put(column.getName(), slot);
} else if (columnToChildOutput.containsKey(column)
// do not process explicitly use DEFAULT value here:
// insert into table t values(DEFAULT)
&& !(columnToChildOutput.get(column) instanceof DefaultValueSlot)) {
columnToOutput.put(column.getName(), columnToChildOutput.get(column));
} else {
if (table.hasSequenceCol()
&& column.getName().equals(Column.SEQUENCE_COL)
&& table.getSequenceMapCol() != null) {
Optional<Column> seqCol = table.getFullSchema().stream()
.filter(col -> col.getName().equals(table.getSequenceMapCol()))
.findFirst();
if (!seqCol.isPresent()) {
throw new AnalysisException("sequence column is not contained in"
+ " target table " + table.getName());
}
if (columnToOutput.get(seqCol.get().getName()) != null) {
// should generate diff exprId for seq column
NamedExpression seqColumn = columnToOutput.get(seqCol.get().getName());
if (seqColumn instanceof Alias) {
seqColumn = new Alias(((Alias) seqColumn).child(), column.getName());
} else {
seqColumn = new Alias(seqColumn, column.getName());
}
columnToOutput.put(column.getName(), seqColumn);
}
} else if (isPartialUpdate) {
// If the current load is a partial update, the values of unmentioned
// columns will be filled in SegmentWriter. And the output of sink node
// should not contain these unmentioned columns, so we just skip them.
// But if the column has 'on update value', we should unconditionally
// update the value of the column to the current timestamp whenever there
// is an update on the row
if (column.hasOnUpdateDefaultValue()) {
Expression defualtValueExpression = FunctionBinder.INSTANCE.rewrite(
new NereidsParser().parseExpression(
column.getOnUpdateDefaultValueExpr().toSqlWithoutTbl()),
new ExpressionRewriteContext(ctx.cascadesContext));
columnToOutput.put(column.getName(),
new Alias(defualtValueExpression, column.getName()));
} else {
continue;
}
} else if (column.getDefaultValue() == null) {
// throw exception if explicitly use Default value but no default value present
// insert into table t values(DEFAULT)
if (columnToChildOutput.get(column) instanceof DefaultValueSlot) {
throw new AnalysisException("Column has no default value,"
+ " column=" + column.getName());
}
// Otherwise, the unmentioned columns should be filled with default values
// or null values
columnToOutput.put(column.getName(), new Alias(
new NullLiteral(DataType.fromCatalogType(column.getType())),
column.getName()
));
} else {
try {
// it comes from the original planner, if default value expression is
// null, we use the literal string of the default value, or it may be
// default value function, like CURRENT_TIMESTAMP.
if (column.getDefaultValueExpr() == null) {
columnToOutput.put(column.getName(),
new Alias(Literal.of(column.getDefaultValue())
.checkedCastTo(DataType.fromCatalogType(column.getType())),
column.getName()));
} else {
Expression defualtValueExpression = FunctionBinder.INSTANCE.rewrite(
new NereidsParser().parseExpression(
column.getDefaultValueExpr().toSqlWithoutTbl()),
new ExpressionRewriteContext(ctx.cascadesContext));
if (defualtValueExpression instanceof Alias) {
defualtValueExpression = ((Alias) defualtValueExpression).child();
}
columnToOutput.put(column.getName(),
new Alias(defualtValueExpression, column.getName()));
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
}
}
}
List<NamedExpression> fullOutputExprs = ImmutableList.copyOf(columnToOutput.values());
if (child instanceof LogicalOneRowRelation) {
// remove default value slot in one row relation
child = ((LogicalOneRowRelation) child).withProjects(((LogicalOneRowRelation) child)
.getProjects().stream()
.filter(p -> !(p instanceof DefaultValueSlot))
.collect(ImmutableList.toImmutableList()));
}
LogicalProject<?> fullOutputProject = new LogicalProject<>(fullOutputExprs, child);
// add cast project
List<NamedExpression> castExprs = Lists.newArrayList();
for (int i = 0; i < table.getFullSchema().size(); ++i) {
Column col = table.getFullSchema().get(i);
NamedExpression expr = columnToOutput.get(col.getName());
if (expr == null) {
// If `expr` is null, it means that the current load is a partial update
// and `col` should not be contained in the output of the sink node so
// we skip it.
continue;
}
DataType inputType = expr.getDataType();
DataType targetType = DataType.fromCatalogType(table.getFullSchema().get(i).getType());
Expression castExpr = expr;
// TODO move string like type logic into TypeCoercionUtils#castIfNotSameType
if (isSourceAndTargetStringLikeType(inputType, targetType) && !inputType.equals(targetType)) {
int sourceLength = ((CharacterType) inputType).getLen();
int targetLength = ((CharacterType) targetType).getLen();
if (sourceLength == targetLength) {
castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType);
} else if (sourceLength > targetLength && targetLength >= 0) {
castExpr = new Substring(castExpr, Literal.of(1), Literal.of(targetLength));
} else if (targetType.isStringType()) {
castExpr = new Cast(castExpr, StringType.INSTANCE);
}
} else {
castExpr = TypeCoercionUtils.castIfNotSameType(castExpr, targetType);
}
if (castExpr instanceof NamedExpression) {
castExprs.add(((NamedExpression) castExpr));
} else {
castExprs.add(new Alias(castExpr));
}
}
if (!castExprs.equals(fullOutputExprs)) {
fullOutputProject = new LogicalProject<Plan>(castExprs, fullOutputProject);
}
return boundSink.withChildAndUpdateOutput(fullOutputProject);
}
private Plan bindHiveTableSink(MatchingContext<UnboundHiveTableSink<Plan>> ctx) {
UnboundHiveTableSink<?> sink = ctx.root;
Pair<HMSExternalDatabase, HMSExternalTable> pair = bind(ctx.cascadesContext, sink);
HMSExternalDatabase database = pair.first;
HMSExternalTable table = pair.second;
LogicalPlan child = ((LogicalPlan) sink.child());
List<Column> bindColumns;
if (sink.getColNames().isEmpty()) {
bindColumns = table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList());
} else {
bindColumns = sink.getColNames().stream().map(cn -> {
Column column = table.getColumn(cn);
if (column == null) {
throw new AnalysisException(String.format("column %s is not found in table %s",
cn, table.getName()));
}
return column;
}).collect(ImmutableList.toImmutableList());
}
Set<String> hivePartitionKeys = table.getRemoteTable()
.getPartitionKeys().stream()
.map(FieldSchema::getName)
.collect(Collectors.toSet());
LogicalHiveTableSink<?> boundSink = new LogicalHiveTableSink<>(
database,
table,
bindColumns,
hivePartitionKeys,
child.getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList()),
sink.getDMLCommandType(),
Optional.empty(),
Optional.empty(),
child);
// we need to insert all the columns of the target table
if (boundSink.getCols().size() != child.getOutput().size()) {
throw new AnalysisException("insert into cols should be corresponding to the query output");
}
return boundSink;
}
private Pair<Database, OlapTable> bind(CascadesContext cascadesContext, UnboundTableSink<? extends Plan> sink) {
List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
sink.getNameParts());
Pair<DatabaseIf, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier,
Pair<DatabaseIf<?>, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier,
cascadesContext.getConnectContext().getEnv());
if (!(pair.second instanceof OlapTable)) {
try {
@ -368,6 +413,18 @@ public class BindSink implements AnalysisRuleFactory {
return Pair.of(((Database) pair.first), (OlapTable) pair.second);
}
private Pair<HMSExternalDatabase, HMSExternalTable> bind(CascadesContext cascadesContext,
UnboundHiveTableSink<? extends Plan> sink) {
List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
sink.getNameParts());
Pair<DatabaseIf<?>, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier,
cascadesContext.getConnectContext().getEnv());
if (pair.second instanceof HMSExternalTable) {
return Pair.of(((HMSExternalDatabase) pair.first), (HMSExternalTable) pair.second);
}
throw new AnalysisException("the target table of insert into is not a Hive table");
}
private List<Long> bindPartitionIds(OlapTable table, List<String> partitions, boolean temp) {
return partitions.isEmpty()
? ImmutableList.of()

View File

@ -37,13 +37,13 @@ public class LogicalHiveTableSinkToPhysicalHiveTableSink extends OneImplementati
sink.getDatabase(),
sink.getTargetTable(),
sink.getCols(),
sink.getPartitionIds(),
sink.getOutputExprs(),
Optional.empty(),
sink.getLogicalProperties(),
null,
null,
sink.child());
sink.child(),
sink.getHivePartitionKeys());
}).toRule(RuleType.LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE);
}
}

View File

@ -46,8 +46,10 @@ public enum PlanType {
// logical sinks
LOGICAL_FILE_SINK,
LOGICAL_OLAP_TABLE_SINK,
LOGICAL_HIVE_TABLE_SINK,
LOGICAL_RESULT_SINK,
LOGICAL_UNBOUND_OLAP_TABLE_SINK,
LOGICAL_UNBOUND_HIVE_TABLE_SINK,
LOGICAL_UNBOUND_RESULT_SINK,
// logical others

View File

@ -25,7 +25,7 @@ import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator;
import org.apache.doris.nereids.annotation.Developing;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.PhysicalProperties;
@ -153,8 +153,8 @@ public class CreateTableCommand extends Command implements ForwardWithSync {
throw new AnalysisException(e.getMessage(), e.getCause());
}
query = new UnboundTableSink<>(createTableInfo.getTableNameParts(), ImmutableList.of(), ImmutableList.of(),
ImmutableList.of(), query);
query = UnboundTableSinkCreator.createUnboundTableSink(createTableInfo.getTableNameParts(),
ImmutableList.of(), ImmutableList.of(), ImmutableList.of(), query);
try {
new InsertIntoTableCommand(query, Optional.empty(), Optional.empty()).run(ctx, executor);
if (ctx.getState().getStateType() == MysqlStateType.ERR) {

View File

@ -21,7 +21,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
@ -115,7 +115,7 @@ public class DeleteFromUsingCommand extends Command implements ForwardWithSync,
&& cols.size() < targetTable.getColumns().size();
// make UnboundTableSink
return new UnboundTableSink<>(nameParts, cols, ImmutableList.of(),
return UnboundTableSinkCreator.createUnboundTableSink(nameParts, cols, ImmutableList.of(),
isTempPart, partitions, isPartialUpdate, DMLCommandType.DELETE, logicalQuery);
}

View File

@ -37,7 +37,7 @@ import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundStar;
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
@ -236,7 +236,7 @@ public class LoadCommand extends Command implements ForwardWithSync {
checkAndAddSequenceCol(olapTable, dataDesc, sinkCols, selectLists);
boolean isPartialUpdate = olapTable.getEnableUniqueKeyMergeOnWrite()
&& sinkCols.size() < olapTable.getColumns().size();
return new UnboundTableSink<>(dataDesc.getNameParts(), sinkCols, ImmutableList.of(),
return UnboundTableSinkCreator.createUnboundTableSink(dataDesc.getNameParts(), sinkCols, ImmutableList.of(),
false, dataDesc.getPartitionNames(), isPartialUpdate, DMLCommandType.LOAD, tvfLogicalPlan);
}

View File

@ -24,7 +24,7 @@ import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.rules.analysis.SlotBinder;
@ -189,8 +189,8 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab
logicalQuery = ((LogicalPlan) cte.get().withChildren(logicalQuery));
}
// make UnboundTableSink
return new UnboundTableSink<>(nameParts, isPartialUpdate ? partialUpdateColNames : ImmutableList.of(),
ImmutableList.of(),
return UnboundTableSinkCreator.createUnboundTableSink(nameParts,
isPartialUpdate ? partialUpdateColNames : ImmutableList.of(), ImmutableList.of(),
false, ImmutableList.of(), isPartialUpdate, DMLCommandType.UPDATE, logicalQuery);
}

View File

@ -24,9 +24,10 @@ import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.UserException;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.GreaterThanEqual;
@ -42,6 +43,7 @@ import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.RelationUtil;
@ -76,7 +78,7 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
* @return command
*/
public static UpdateMvByPartitionCommand from(MTMV mv, Set<Long> partitionIds,
Map<TableIf, String> tableWithPartKey) {
Map<TableIf, String> tableWithPartKey) throws UserException {
NereidsParser parser = new NereidsParser();
Map<TableIf, Set<Expression>> predicates =
constructTableWithPredicates(mv, partitionIds, tableWithPartKey);
@ -86,9 +88,8 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
if (plan instanceof Sink) {
plan = plan.child(0);
}
UnboundTableSink<? extends Plan> sink =
new UnboundTableSink<>(mv.getFullQualifiers(), ImmutableList.of(), ImmutableList.of(),
parts, plan);
LogicalSink<? extends Plan> sink = UnboundTableSinkCreator.createUnboundTableSink(mv.getFullQualifiers(),
ImmutableList.of(), ImmutableList.of(), parts, plan);
return new UpdateMvByPartitionCommand(sink);
}

View File

@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
/**
* For Hive Table
*/
public class HiveInsertCommandContext extends InsertCommandContext {
private boolean overwrite = true;
public boolean isOverwrite() {
return overwrite;
}
public void setOverwrite(boolean overwrite) {
this.overwrite = overwrite;
}
}

View File

@ -17,20 +17,24 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.HiveTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -43,6 +47,7 @@ public class HiveInsertExecutor extends AbstractInsertExecutor {
private static final Logger LOG = LogManager.getLogger(HiveInsertExecutor.class);
private static final long INVALID_TXN_ID = -1L;
private long txnId = INVALID_TXN_ID;
private TransactionStatus txnStatus = TransactionStatus.ABORTED;
/**
* constructor
@ -59,20 +64,15 @@ public class HiveInsertExecutor extends AbstractInsertExecutor {
@Override
public void beginTransaction() {
// TODO: use hive txn rather than internal txn
}
@Override
protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) {
HiveTableSink hiveTableSink = (HiveTableSink) sink;
// PhysicalHiveTableSink physicalHiveTableSink = (PhysicalHiveTableSink) physicalSink;
PhysicalHiveTableSink<? extends Plan> physicalHiveSink = (PhysicalHiveTableSink<? extends Plan>) physicalSink;
try {
hiveTableSink.init();
hiveTableSink.complete(new Analyzer(Env.getCurrentEnv(), ctx));
TransactionState state = Env.getCurrentGlobalTransactionMgr().getTransactionState(database.getId(), txnId);
if (state == null) {
throw new AnalysisException("txn does not exist: " + txnId);
}
hiveTableSink.bindDataSink(physicalHiveSink.getCols(), insertCtx);
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}
@ -80,21 +80,36 @@ public class HiveInsertExecutor extends AbstractInsertExecutor {
@Override
protected void beforeExec() {
// check params
}
@Override
protected void onComplete() throws UserException {
if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) {
LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier());
} else {
txnStatus = TransactionStatus.COMMITTED;
}
}
@Override
protected void onFail(Throwable t) {
errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
String queryId = DebugUtil.printId(ctx.queryId());
// if any throwable being thrown during insert operation, first we should abort this txn
LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t);
if (txnId != INVALID_TXN_ID) {
LOG.warn("insert [{}] with query id {} abort txn {} failed", labelName, queryId, txnId);
StringBuilder sb = new StringBuilder(t.getMessage());
if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
sb.append(". url: ").append(coordinator.getTrackingUrl());
}
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, sb.toString());
}
}
@Override
protected void afterExec(StmtExecutor executor) {
// TODO: set THivePartitionUpdate
}
}

View File

@ -141,9 +141,8 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
if (ctx.getMysqlChannel() != null) {
ctx.getMysqlChannel().reset();
}
Optional<PhysicalOlapTableSink<?>> plan = (planner.getPhysicalPlan()
.<Set<PhysicalOlapTableSink<?>>>collect(PhysicalSink.class::isInstance)).stream()
Optional<PhysicalSink<?>> plan = (planner.getPhysicalPlan()
.<Set<PhysicalSink<?>>>collect(PhysicalSink.class::isInstance)).stream()
.findAny();
Preconditions.checkArgument(plan.isPresent(), "insert into command must contain target table");
PhysicalSink physicalSink = plan.get();
@ -165,6 +164,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
} else if (physicalSink instanceof PhysicalHiveTableSink) {
HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf;
insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner, insertCtx);
// set hive query options
} else {
// TODO: support other table types
throw new AnalysisException("insert into command only support olap table");

View File

@ -24,10 +24,13 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.InternalDatabaseUtil;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.insertoverwrite.InsertOverwriteUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.TreeNode;
@ -37,6 +40,7 @@ import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
@ -95,8 +99,8 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
}
TableIf targetTableIf = InsertUtils.getTargetTable(logicalQuery, ctx);
if (!(targetTableIf instanceof OlapTable)) {
throw new AnalysisException("insert into overwrite only support OLAP table."
if (!(targetTableIf instanceof OlapTable || targetTableIf instanceof HMSExternalTable)) {
throw new AnalysisException("insert into overwrite only support OLAP and HMS table."
+ " But current table type is " + targetTableIf.getType());
}
this.logicalQuery = (LogicalPlan) InsertUtils.normalizePlan(logicalQuery, targetTableIf);
@ -156,20 +160,40 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS
*/
private void insertInto(ConnectContext ctx, StmtExecutor executor, List<String> tempPartitionNames)
throws Exception {
UnboundTableSink<?> sink = (UnboundTableSink<?>) logicalQuery;
UnboundTableSink<?> copySink = new UnboundTableSink<>(
sink.getNameParts(),
sink.getColNames(),
sink.getHints(),
true,
tempPartitionNames,
sink.isPartialUpdate(),
sink.getDMLCommandType(),
(LogicalPlan) (sink.child(0)));
// for overwrite situation, we disable auto create partition.
OlapInsertCommandContext insertCtx = new OlapInsertCommandContext();
insertCtx.setAllowAutoPartition(false);
InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(copySink, labelName, Optional.of(insertCtx));
UnboundLogicalSink<?> copySink;
InsertCommandContext insertCtx;
if (logicalQuery instanceof UnboundTableSink) {
UnboundTableSink<?> sink = (UnboundTableSink<?>) logicalQuery;
copySink = (UnboundLogicalSink<?>) UnboundTableSinkCreator.createUnboundTableSink(
sink.getNameParts(),
sink.getColNames(),
sink.getHints(),
true,
tempPartitionNames,
sink.isPartialUpdate(),
sink.getDMLCommandType(),
(LogicalPlan) (sink.child(0)));
// for overwrite situation, we disable auto create partition.
insertCtx = new OlapInsertCommandContext();
((OlapInsertCommandContext) insertCtx).setAllowAutoPartition(false);
} else if (logicalQuery instanceof UnboundHiveTableSink) {
UnboundHiveTableSink<?> sink = (UnboundHiveTableSink<?>) logicalQuery;
copySink = (UnboundLogicalSink<?>) UnboundTableSinkCreator.createUnboundTableSink(
sink.getNameParts(),
sink.getColNames(),
sink.getHints(),
false,
sink.getPartitions(),
false,
sink.getDMLCommandType(),
(LogicalPlan) (sink.child(0)));
insertCtx = new HiveInsertCommandContext();
((HiveInsertCommandContext) insertCtx).setOverwrite(false);
} else {
throw new RuntimeException("Current catalog does not support insert overwrite yet.");
}
InsertIntoTableCommand insertCommand =
new InsertIntoTableCommand(copySink, labelName, Optional.of(insertCtx));
insertCommand.run(ctx, executor);
if (ctx.getState().getStateType() == MysqlStateType.ERR) {
String errMsg = Strings.emptyToNull(ctx.getState().getErrorMessage());

View File

@ -27,6 +27,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
@ -46,6 +47,7 @@ import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.nereids.util.TypeCoercionUtils;
@ -228,39 +230,39 @@ public class InsertUtils {
* normalize plan to let it could be process correctly by nereids
*/
public static Plan normalizePlan(Plan plan, TableIf table) {
UnboundTableSink<? extends Plan> unboundTableSink = (UnboundTableSink<? extends Plan>) plan;
if (table instanceof OlapTable && ((OlapTable) table).getKeysType() == KeysType.UNIQUE_KEYS
&& unboundTableSink.isPartialUpdate()) {
// check the necessary conditions for partial updates
OlapTable olapTable = (OlapTable) table;
if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("Partial update is only allowed on "
+ "unique table with merge-on-write enabled.");
}
if (unboundTableSink.getDMLCommandType() == DMLCommandType.INSERT) {
if (unboundTableSink.getColNames().isEmpty()) {
throw new AnalysisException("You must explicitly specify the columns to be updated when "
+ "updating partial columns using the INSERT statement.");
}
for (Column col : olapTable.getFullSchema()) {
Optional<String> insertCol = unboundTableSink.getColNames().stream()
.filter(c -> c.equalsIgnoreCase(col.getName())).findFirst();
if (col.isKey() && !insertCol.isPresent()) {
throw new AnalysisException("Partial update should include all key columns, missing: "
+ col.getName());
}
}
}
}
UnboundLogicalSink<? extends Plan> unboundLogicalSink = (UnboundLogicalSink<? extends Plan>) plan;
if (table instanceof HMSExternalTable) {
// TODO: check HMSExternalTable
HMSExternalTable hiveTable = (HMSExternalTable) table;
if (hiveTable.isView()) {
throw new AnalysisException("View is not support in hive external table.");
}
}
Plan query = unboundTableSink.child();
if (table instanceof OlapTable && ((OlapTable) table).getKeysType() == KeysType.UNIQUE_KEYS) {
if (unboundLogicalSink instanceof UnboundTableSink
&& ((UnboundTableSink<? extends Plan>) unboundLogicalSink).isPartialUpdate()) {
// check the necessary conditions for partial updates
OlapTable olapTable = (OlapTable) table;
if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("Partial update is only allowed on "
+ "unique table with merge-on-write enabled.");
}
if (unboundLogicalSink.getDMLCommandType() == DMLCommandType.INSERT) {
if (unboundLogicalSink.getColNames().isEmpty()) {
throw new AnalysisException("You must explicitly specify the columns to be updated when "
+ "updating partial columns using the INSERT statement.");
}
for (Column col : olapTable.getFullSchema()) {
Optional<String> insertCol = unboundLogicalSink.getColNames().stream()
.filter(c -> c.equalsIgnoreCase(col.getName())).findFirst();
if (col.isKey() && !insertCol.isPresent()) {
throw new AnalysisException("Partial update should include all key columns, missing: "
+ col.getName());
}
}
}
}
}
Plan query = unboundLogicalSink.child();
if (!(query instanceof LogicalInlineTable)) {
return plan;
}
@ -271,28 +273,28 @@ public class InsertUtils {
for (List<NamedExpression> values : logicalInlineTable.getConstantExprsList()) {
ImmutableList.Builder<NamedExpression> constantExprs = ImmutableList.builder();
if (values.isEmpty()) {
if (CollectionUtils.isNotEmpty(unboundTableSink.getColNames())) {
if (CollectionUtils.isNotEmpty(unboundLogicalSink.getColNames())) {
throw new AnalysisException("value list should not be empty if columns are specified");
}
for (Column column : columns) {
constantExprs.add(generateDefaultExpression(column));
}
} else {
if (CollectionUtils.isNotEmpty(unboundTableSink.getColNames())) {
if (values.size() != unboundTableSink.getColNames().size()) {
if (CollectionUtils.isNotEmpty(unboundLogicalSink.getColNames())) {
if (values.size() != unboundLogicalSink.getColNames().size()) {
throw new AnalysisException("Column count doesn't match value count");
}
for (int i = 0; i < values.size(); i++) {
Column sameNameColumn = null;
for (Column column : table.getBaseSchema(true)) {
if (unboundTableSink.getColNames().get(i).equalsIgnoreCase(column.getName())) {
if (unboundLogicalSink.getColNames().get(i).equalsIgnoreCase(column.getName())) {
sameNameColumn = column;
break;
}
}
if (sameNameColumn == null) {
throw new AnalysisException("Unknown column '"
+ unboundTableSink.getColNames().get(i) + "' in target table.");
+ unboundLogicalSink.getColNames().get(i) + "' in target table.");
}
if (values.get(i) instanceof DefaultValueSlot) {
constantExprs.add(generateDefaultExpression(sameNameColumn));
@ -340,11 +342,15 @@ public class InsertUtils {
* get target table from names.
*/
public static TableIf getTargetTable(Plan plan, ConnectContext ctx) {
if (!(plan instanceof UnboundTableSink)) {
throw new AnalysisException("the root of plan should be UnboundTableSink"
UnboundLogicalSink<? extends Plan> unboundTableSink;
if (plan instanceof UnboundTableSink) {
unboundTableSink = (UnboundTableSink<? extends Plan>) plan;
} else if (plan instanceof UnboundHiveTableSink) {
unboundTableSink = (UnboundHiveTableSink<? extends Plan>) plan;
} else {
throw new AnalysisException("the root of plan should be UnboundTableSink or UnboundHiveTableSink"
+ " but it is " + plan.getType());
}
UnboundTableSink<? extends Plan> unboundTableSink = (UnboundTableSink<? extends Plan>) plan;
List<String> tableQualifier = RelationUtil.getQualifierName(ctx, unboundTableSink.getNameParts());
return RelationUtil.getDbAndTable(tableQualifier, ctx.getEnv()).second;
}

View File

@ -37,6 +37,7 @@ import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/**
* logical hive table sink for insert command
@ -47,41 +48,46 @@ public class LogicalHiveTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
private final HMSExternalDatabase database;
private final HMSExternalTable targetTable;
private final List<Column> cols;
private final List<Long> partitionIds;
private final Set<String> hivePartitionKeys;
private final DMLCommandType dmlCommandType;
/**
* constructor
*/
public LogicalHiveTableSink(HMSExternalDatabase database, HMSExternalTable targetTable, List<Column> cols,
List<Long> partitionIds, List<NamedExpression> outputExprs,
DMLCommandType dmlCommandType, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, child);
public LogicalHiveTableSink(HMSExternalDatabase database,
HMSExternalTable targetTable,
List<Column> cols,
Set<String> hivePartitionKeys,
List<NamedExpression> outputExprs,
DMLCommandType dmlCommandType,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(PlanType.LOGICAL_HIVE_TABLE_SINK, outputExprs, groupExpression, logicalProperties, child);
this.database = Objects.requireNonNull(database, "database != null in LogicalHiveTableSink");
this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalHiveTableSink");
this.cols = Utils.copyRequiredList(cols);
this.dmlCommandType = dmlCommandType;
this.partitionIds = Utils.copyRequiredList(partitionIds);
this.hivePartitionKeys = hivePartitionKeys;
}
public Plan withChildAndUpdateOutput(Plan child) {
List<NamedExpression> output = child.getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList());
return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, output,
return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, output,
dmlCommandType, Optional.empty(), Optional.empty(), child);
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "LogicalHiveTableSink only accepts one child");
return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs,
return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, outputExprs,
dmlCommandType, Optional.empty(), Optional.empty(), children.get(0));
}
public LogicalHiveTableSink<CHILD_TYPE> withOutputExprs(List<NamedExpression> outputExprs) {
return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs,
return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, outputExprs,
dmlCommandType, Optional.empty(), Optional.empty(), child());
}
@ -97,8 +103,8 @@ public class LogicalHiveTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
return cols;
}
public List<Long> getPartitionIds() {
return partitionIds;
public Set<String> getHivePartitionKeys() {
return hivePartitionKeys;
}
public DMLCommandType getDmlCommandType() {
@ -119,13 +125,12 @@ public class LogicalHiveTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
LogicalHiveTableSink<?> that = (LogicalHiveTableSink<?>) o;
return dmlCommandType == that.dmlCommandType
&& Objects.equals(database, that.database)
&& Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols)
&& Objects.equals(partitionIds, that.partitionIds);
&& Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), database, targetTable, cols, partitionIds, dmlCommandType);
return Objects.hash(super.hashCode(), database, targetTable, cols, dmlCommandType);
}
@Override
@ -135,7 +140,7 @@ public class LogicalHiveTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
"database", database.getFullName(),
"targetTable", targetTable.getName(),
"cols", cols,
"partitionIds", partitionIds,
"hivePartitionKeys", hivePartitionKeys,
"dmlCommandType", dmlCommandType
);
}
@ -147,14 +152,14 @@ public class LogicalHiveTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs,
return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, outputExprs,
dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new LogicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs,
return new LogicalHiveTableSink<>(database, targetTable, cols, hivePartitionKeys, outputExprs,
dmlCommandType, groupExpression, logicalProperties, children.get(0));
}
}

View File

@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.util.Utils;
import java.util.List;
import java.util.Optional;
/** abstract logical sink */
public abstract class UnboundLogicalSink<CHILD_TYPE extends Plan> extends LogicalSink<CHILD_TYPE> {
protected final List<String> nameParts;
protected final List<String> colNames;
protected final DMLCommandType dmlCommandType;
public UnboundLogicalSink(List<String> nameParts,
PlanType type,
List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties,
List<String> colNames,
DMLCommandType dmlCommandType,
CHILD_TYPE child) {
super(type, outputExprs, groupExpression, logicalProperties, child);
this.colNames = Utils.copyRequiredList(colNames);
this.dmlCommandType = dmlCommandType;
this.nameParts = Utils.copyRequiredList(nameParts);
}
public DMLCommandType getDMLCommandType() {
return dmlCommandType;
}
public List<String> getColNames() {
return colNames;
}
public List<String> getNameParts() {
return nameParts;
}
}

View File

@ -21,8 +21,10 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpecTableSinkHashPartitioned;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.Plan;
@ -33,10 +35,14 @@ import org.apache.doris.nereids.util.Utils;
import org.apache.doris.statistics.Statistics;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/** abstract physical hive sink */
public class PhysicalHiveTableSink<CHILD_TYPE extends Plan> extends PhysicalSink<CHILD_TYPE> implements Sink {
@ -44,7 +50,7 @@ public class PhysicalHiveTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
private final HMSExternalDatabase database;
private final HMSExternalTable targetTable;
private final List<Column> cols;
private final List<Long> partitionIds;
private final Set<String> hivePartitionKeys;
/**
* constructor
@ -52,13 +58,13 @@ public class PhysicalHiveTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
public PhysicalHiveTableSink(HMSExternalDatabase database,
HMSExternalTable targetTable,
List<Column> cols,
List<Long> partitionIds,
List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
CHILD_TYPE child) {
this(database, targetTable, cols, partitionIds, outputExprs, groupExpression, logicalProperties,
PhysicalProperties.GATHER, null, child);
CHILD_TYPE child,
Set<String> hivePartitionKeys) {
this(database, targetTable, cols, outputExprs, groupExpression, logicalProperties,
PhysicalProperties.GATHER, null, child, hivePartitionKeys);
}
/**
@ -67,25 +73,37 @@ public class PhysicalHiveTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
public PhysicalHiveTableSink(HMSExternalDatabase database,
HMSExternalTable targetTable,
List<Column> cols,
List<Long> partitionIds,
List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
PhysicalProperties physicalProperties,
Statistics statistics,
CHILD_TYPE child) {
CHILD_TYPE child,
Set<String> hivePartitionKeys) {
super(PlanType.PHYSICAL_HIVE_TABLE_SINK, outputExprs, groupExpression,
logicalProperties, physicalProperties, statistics, child);
this.database = Objects.requireNonNull(database, "database != null in PhysicalHiveTableSink");
this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in PhysicalHiveTableSink");
this.cols = Utils.copyRequiredList(cols);
this.partitionIds = Utils.copyRequiredList(partitionIds);
this.hivePartitionKeys = hivePartitionKeys;
}
public HMSExternalDatabase getDatabase() {
return database;
}
public HMSExternalTable getTargetTable() {
return targetTable;
}
public List<Column> getCols() {
return cols;
}
@Override
public Plan withChildren(List<Plan> children) {
return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, groupExpression,
getLogicalProperties(), physicalProperties, statistics, children.get(0));
return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, groupExpression,
getLogicalProperties(), physicalProperties, statistics, children.get(0), hivePartitionKeys);
}
@Override
@ -100,20 +118,48 @@ public class PhysicalHiveTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs,
groupExpression, getLogicalProperties(), child());
return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs,
groupExpression, getLogicalProperties(), child(), hivePartitionKeys);
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs,
groupExpression, logicalProperties.get(), children.get(0));
return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs,
groupExpression, logicalProperties.get(), children.get(0), hivePartitionKeys);
}
@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child(), hivePartitionKeys);
}
/**
* get output physical properties
*/
@Override
public PhysicalProperties getRequirePhysicalProperties() {
Set<String> hivePartitionKeys = targetTable.getRemoteTable()
.getPartitionKeys().stream()
.map(FieldSchema::getName)
.collect(Collectors.toSet());
if (!hivePartitionKeys.isEmpty()) {
List<Integer> columnIdx = new ArrayList<>();
List<Column> fullSchema = targetTable.getFullSchema();
for (int i = 0; i < fullSchema.size(); i++) {
Column column = fullSchema.get(i);
if (hivePartitionKeys.contains(column.getName())) {
columnIdx.add(i);
}
}
List<ExprId> exprIds = columnIdx.stream()
.map(idx -> child().getOutput().get(idx).getExprId())
.collect(Collectors.toList());
DistributionSpecTableSinkHashPartitioned shuffleInfo = new DistributionSpecTableSinkHashPartitioned();
shuffleInfo.setOutputColExprIds(exprIds);
return new PhysicalProperties(shuffleInfo);
}
return PhysicalProperties.SINK_RANDOM_PARTITIONED;
}
}

View File

@ -39,11 +39,11 @@ public abstract class PhysicalSink<CHILD_TYPE extends Plan> extends PhysicalUnar
protected final List<NamedExpression> outputExprs;
public PhysicalSink(PlanType type,
List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
@Nullable PhysicalProperties physicalProperties,
Statistics statistics, CHILD_TYPE child) {
List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
@Nullable PhysicalProperties physicalProperties,
Statistics statistics, CHILD_TYPE child) {
super(type, groupExpression, logicalProperties, physicalProperties, statistics, child);
this.outputExprs = ImmutableList.copyOf(Objects.requireNonNull(outputExprs, "outputExprs should not null"));
}

View File

@ -42,4 +42,6 @@ public abstract class PhysicalTableSink<CHILD_TYPE extends Plan> extends Physica
Statistics statistics, CHILD_TYPE child) {
super(type, outputExprs, groupExpression, logicalProperties, physicalProperties, statistics, child);
}
public abstract PhysicalProperties getRequirePhysicalProperties();
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.nereids.trees.plans.visitor;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.trees.plans.Plan;
@ -56,6 +57,10 @@ public interface SinkVisitor<R, C> {
return visitLogicalSink(unboundTableSink, context);
}
default R visitUnboundHiveTableSink(UnboundHiveTableSink<? extends Plan> unboundTableSink, C context) {
return visitLogicalSink(unboundTableSink, context);
}
default R visitUnboundResultSink(UnboundResultSink<? extends Plan> unboundResultSink, C context) {
return visitLogicalSink(unboundResultSink, context);
}

View File

@ -77,7 +77,7 @@ public class RelationUtil {
/**
* get database and table
*/
public static Pair<DatabaseIf, TableIf> getDbAndTable(List<String> qualifierName, Env env) {
public static Pair<DatabaseIf<?>, TableIf> getDbAndTable(List<String> qualifierName, Env env) {
String catalogName = qualifierName.get(0);
String dbName = qualifierName.get(1);
String tableName = qualifierName.get(2);

View File

@ -58,6 +58,7 @@ public class DataPartition {
Preconditions.checkState(!exprs.isEmpty());
Preconditions.checkState(type == TPartitionType.HASH_PARTITIONED
|| type == TPartitionType.RANGE_PARTITIONED
|| type == TPartitionType.TABLE_SINK_HASH_PARTITIONED
|| type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED);
this.type = type;
this.partitionExprs = ImmutableList.copyOf(exprs);
@ -66,6 +67,7 @@ public class DataPartition {
public DataPartition(TPartitionType type) {
Preconditions.checkState(type == TPartitionType.UNPARTITIONED
|| type == TPartitionType.RANDOM
|| type == TPartitionType.TABLE_SINK_RANDOM_PARTITIONED
|| type == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED);
this.type = type;
this.partitionExprs = ImmutableList.of();

View File

@ -20,17 +20,44 @@
package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.THiveBucket;
import org.apache.doris.thrift.THiveColumn;
import org.apache.doris.thrift.THiveColumnType;
import org.apache.doris.thrift.THiveLocationParams;
import org.apache.doris.thrift.THivePartition;
import org.apache.doris.thrift.THiveTableSink;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
public class HiveTableSink extends DataSink {
private HMSExternalTable targetTable;
protected TDataSink tDataSink;
public HiveTableSink(HMSExternalTable table) {
public HiveTableSink(HMSExternalTable targetTable) {
super();
this.targetTable = targetTable;
}
@Override
@ -59,9 +86,146 @@ public class HiveTableSink extends DataSink {
return DataPartition.RANDOM;
}
public void init() {
/**
* check sink params and generate thrift data sink to BE
* @param insertCols target table columns
* @param insertCtx insert info context
* @throws AnalysisException if source file format cannot be read
*/
public void bindDataSink(List<Column> insertCols, Optional<InsertCommandContext> insertCtx)
throws AnalysisException {
THiveTableSink tSink = new THiveTableSink();
tSink.setDbName(targetTable.getDbName());
tSink.setTableName(targetTable.getName());
Set<String> partNames = new HashSet<>(targetTable.getPartitionColumnNames());
Set<String> colNames = targetTable.getColumns()
.stream().map(Column::getName)
.collect(Collectors.toSet());
colNames.removeAll(partNames);
List<THiveColumn> targetColumns = new ArrayList<>();
for (Column col : insertCols) {
if (partNames.contains(col.getName())) {
THiveColumn tHiveColumn = new THiveColumn();
tHiveColumn.setName(col.getName());
tHiveColumn.setDataType(col.getType().toThrift());
tHiveColumn.setColumnType(THiveColumnType.PARTITION_KEY);
targetColumns.add(tHiveColumn);
} else if (colNames.contains(col.getName())) {
THiveColumn tHiveColumn = new THiveColumn();
tHiveColumn.setName(col.getName());
tHiveColumn.setDataType(col.getType().toThrift());
tHiveColumn.setColumnType(THiveColumnType.REGULAR);
targetColumns.add(tHiveColumn);
}
}
tSink.setColumns(targetColumns);
setPartitionValues(tSink);
StorageDescriptor sd = targetTable.getRemoteTable().getSd();
THiveBucket bucketInfo = new THiveBucket();
bucketInfo.setBucketedBy(sd.getBucketCols());
bucketInfo.setBucketCount(sd.getNumBuckets());
tSink.setBucketInfo(bucketInfo);
TFileFormatType formatType = getFileFormatType(sd);
tSink.setFileFormat(formatType);
setCompressType(tSink, formatType);
THiveLocationParams locationParams = new THiveLocationParams();
String location = sd.getLocation();
String writeTempPath = createTempPath(location);
locationParams.setWritePath(writeTempPath);
locationParams.setTargetPath(location);
locationParams.setFileType(LocationPath.getTFileTypeForBE(location));
tSink.setLocation(locationParams);
tSink.setHadoopConfig(targetTable.getHadoopProperties());
if (insertCtx.isPresent()) {
HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get();
tSink.setOverwrite(context.isOverwrite());
}
tDataSink = new TDataSink(getDataSinkType());
tDataSink.setHiveTableSink(tSink);
}
public void complete(Analyzer analyzer) {
private String createTempPath(String location) {
String user = ConnectContext.get().getUserIdentity().getUser();
return location + "/.doris_staging/" + user + "/" + UUID.randomUUID().toString().replace("-", "");
}
private void setCompressType(THiveTableSink tSink, TFileFormatType formatType) {
String compressType;
switch (formatType) {
case FORMAT_ORC:
compressType = targetTable.getRemoteTable().getParameters().get("orc.compress");
break;
case FORMAT_PARQUET:
compressType = targetTable.getRemoteTable().getParameters().get("parquet.compression");
break;
default:
compressType = "uncompressed";
break;
}
if ("snappy".equalsIgnoreCase(compressType)) {
tSink.setCompressionType(TFileCompressType.SNAPPYBLOCK);
} else if ("lz4".equalsIgnoreCase(compressType)) {
tSink.setCompressionType(TFileCompressType.LZ4BLOCK);
} else if ("lzo".equalsIgnoreCase(compressType)) {
tSink.setCompressionType(TFileCompressType.LZO);
} else if ("zlib".equalsIgnoreCase(compressType)) {
tSink.setCompressionType(TFileCompressType.ZLIB);
} else if ("zstd".equalsIgnoreCase(compressType)) {
tSink.setCompressionType(TFileCompressType.ZSTD);
} else if ("uncompressed".equalsIgnoreCase(compressType)) {
tSink.setCompressionType(TFileCompressType.PLAIN);
} else {
// try to use plain type to decompress parquet or orc file
tSink.setCompressionType(TFileCompressType.PLAIN);
}
}
private void setPartitionValues(THiveTableSink tSink) throws AnalysisException {
List<THivePartition> partitions = new ArrayList<>();
List<org.apache.hadoop.hive.metastore.api.Partition> hivePartitions =
((HMSExternalCatalog) targetTable.getCatalog())
.getClient().listPartitions(targetTable.getDbName(), targetTable.getName());
for (org.apache.hadoop.hive.metastore.api.Partition partition : hivePartitions) {
THivePartition hivePartition = new THivePartition();
StorageDescriptor sd = partition.getSd();
hivePartition.setFileFormat(getFileFormatType(sd));
hivePartition.setValues(partition.getValues());
THiveLocationParams locationParams = new THiveLocationParams();
String location = sd.getLocation();
// pass the same of write path and target path to partition
locationParams.setWritePath(location);
locationParams.setTargetPath(location);
locationParams.setFileType(LocationPath.getTFileTypeForBE(location));
hivePartition.setLocation(locationParams);
partitions.add(hivePartition);
}
tSink.setPartitions(partitions);
}
private TFileFormatType getFileFormatType(StorageDescriptor sd) throws AnalysisException {
TFileFormatType fileFormatType;
if (sd.getInputFormat().toLowerCase().contains("orc")) {
fileFormatType = TFileFormatType.FORMAT_ORC;
} else if (sd.getInputFormat().toLowerCase().contains("parquet")) {
fileFormatType = TFileFormatType.FORMAT_PARQUET;
} else if (sd.getInputFormat().toLowerCase().contains("text")) {
fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
} else {
throw new AnalysisException("Unsupported input format type: " + sd.getInputFormat());
}
return fileFormatType;
}
protected TDataSinkType getDataSinkType() {
return TDataSinkType.HIVE_TABLE_SINK;
}
}