[feature](iceberg)Support write to iceberg for 2.1 (#35103) #34257 #33629

bp: #34257 #33629
This commit is contained in:
wuwenchi
2024-05-21 22:46:37 +08:00
committed by GitHub
parent 903ff32021
commit 009ab77c25
43 changed files with 2076 additions and 320 deletions

View File

@ -22,6 +22,7 @@ import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.operations.ExternalMetadataOperations;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.transaction.TransactionManagerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.Constants;
@ -51,7 +52,9 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
@Override
protected void initLocalObjectsImpl() {
initCatalog();
metadataOps = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog);
IcebergMetadataOps ops = ExternalMetadataOperations.newIcebergMetadataOps(this, catalog);
transactionManager = TransactionManagerFactory.createIcebergTransactionManager(ops);
metadataOps = ops;
}
public Catalog getCatalog() {

View File

@ -28,9 +28,14 @@ import org.apache.doris.thrift.TIcebergTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Table;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class IcebergExternalTable extends ExternalTable {
@ -83,4 +88,15 @@ public class IcebergExternalTable extends ExternalTable {
makeSureInitialized();
return IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName());
}
public Table getIcebergTable() {
return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName());
}
@Override
public Set<String> getPartitionNames() {
getIcebergTable();
return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName())
.spec().fields().stream().map(PartitionField::name).collect(Collectors.toSet());
}
}

View File

@ -0,0 +1,186 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/trinodb/trino/blob/438/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
// and modified by Doris
package org.apache.doris.datasource.iceberg;
import org.apache.doris.common.UserException;
import org.apache.doris.thrift.TFileContent;
import org.apache.doris.thrift.TIcebergCommitData;
import org.apache.doris.transaction.Transaction;
import com.google.common.base.VerifyException;
import com.google.common.collect.Lists;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileContent;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
public class IcebergTransaction implements Transaction {
private static final Logger LOG = LogManager.getLogger(IcebergTransaction.class);
private final IcebergMetadataOps ops;
private org.apache.iceberg.Transaction transaction;
private final List<TIcebergCommitData> commitDataList = Lists.newArrayList();
public IcebergTransaction(IcebergMetadataOps ops) {
this.ops = ops;
}
public void updateIcebergCommitData(List<TIcebergCommitData> commitDataList) {
synchronized (this) {
this.commitDataList.addAll(commitDataList);
}
}
public void beginInsert(String dbName, String tbName) {
Table icebergTable = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbName));
transaction = icebergTable.newTransaction();
}
public void finishInsert() {
Table icebergTable = transaction.table();
AppendFiles appendFiles = transaction.newAppend();
for (CommitTaskData task : convertToCommitTaskData()) {
DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
.withPath(task.getPath())
.withFileSizeInBytes(task.getFileSizeInBytes())
.withFormat(IcebergUtils.getFileFormat(icebergTable))
.withMetrics(task.getMetrics());
if (icebergTable.spec().isPartitioned()) {
List<String> partitionValues = task.getPartitionValues()
.orElseThrow(() -> new VerifyException("No partition data for partitioned table"));
builder.withPartitionValues(partitionValues);
}
appendFiles.appendFile(builder.build());
}
// in appendFiles.commit, it will generate metadata(manifest and snapshot)
// after appendFiles.commit, in current transaction, you can already see the new snapshot
appendFiles.commit();
}
public List<CommitTaskData> convertToCommitTaskData() {
List<CommitTaskData> commitTaskData = new ArrayList<>();
for (TIcebergCommitData data : this.commitDataList) {
commitTaskData.add(new CommitTaskData(
data.getFilePath(),
data.getFileSize(),
new Metrics(
data.getRowCount(),
Collections.EMPTY_MAP,
Collections.EMPTY_MAP,
Collections.EMPTY_MAP,
Collections.EMPTY_MAP
),
data.isSetPartitionValues() ? Optional.of(data.getPartitionValues()) : Optional.empty(),
convertToFileContent(data.getFileContent()),
data.isSetReferencedDataFiles() ? Optional.of(data.getReferencedDataFiles()) : Optional.empty()
));
}
return commitTaskData;
}
private FileContent convertToFileContent(TFileContent content) {
if (content.equals(TFileContent.DATA)) {
return FileContent.DATA;
} else if (content.equals(TFileContent.POSITION_DELETES)) {
return FileContent.POSITION_DELETES;
} else {
return FileContent.EQUALITY_DELETES;
}
}
@Override
public void commit() throws UserException {
// Externally readable
// Manipulate the relevant data so that others can also see the latest table, such as:
// 1. hadoop: it will change the version number information in 'version-hint.text'
// 2. hive: it will change the table properties, the most important thing is to revise 'metadata_location'
// 3. and so on ...
transaction.commitTransaction();
}
@Override
public void rollback() {
}
public long getUpdateCnt() {
return commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum();
}
public static class CommitTaskData {
private final String path;
private final long fileSizeInBytes;
private final Metrics metrics;
private final Optional<List<String>> partitionValues;
private final FileContent content;
private final Optional<List<String>> referencedDataFiles;
public CommitTaskData(String path,
long fileSizeInBytes,
Metrics metrics,
Optional<List<String>> partitionValues,
FileContent content,
Optional<List<String>> referencedDataFiles) {
this.path = path;
this.fileSizeInBytes = fileSizeInBytes;
this.metrics = metrics;
this.partitionValues = partitionValues;
this.content = content;
this.referencedDataFiles = referencedDataFiles;
}
public String getPath() {
return path;
}
public long getFileSizeInBytes() {
return fileSizeInBytes;
}
public Metrics getMetrics() {
return metrics;
}
public Optional<List<String>> getPartitionValues() {
return partitionValues;
}
public FileContent getContent() {
return content;
}
public Optional<List<String>> getReferencedDataFiles() {
return referencedDataFiles;
}
}
}

View File

@ -46,6 +46,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.thrift.TExprOpcode;
import com.google.common.collect.Lists;
@ -53,6 +54,7 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.expressions.And;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
@ -61,6 +63,7 @@ import org.apache.iceberg.expressions.Or;
import org.apache.iceberg.expressions.Unbound;
import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.LocationUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -90,6 +93,13 @@ public class IcebergUtils {
public static final String TOTAL_POSITION_DELETES = "total-position-deletes";
public static final String TOTAL_EQUALITY_DELETES = "total-equality-deletes";
// nickname in flink and spark
public static final String WRITE_FORMAT = "write-format";
public static final String COMPRESSION_CODEC = "compression-codec";
// nickname in spark
public static final String SPARK_SQL_COMPRESSION_CODEC = "spark.sql.iceberg.compression-codec";
public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
if (expr == null) {
return null;
@ -573,4 +583,51 @@ public class IcebergUtils {
}
return -1;
}
public static String getFileFormat(Table table) {
Map<String, String> properties = table.properties();
if (properties.containsKey(WRITE_FORMAT)) {
return properties.get(WRITE_FORMAT);
}
if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) {
return properties.get(TableProperties.DEFAULT_FILE_FORMAT);
}
return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
}
public static String getFileCompress(Table table) {
Map<String, String> properties = table.properties();
if (properties.containsKey(COMPRESSION_CODEC)) {
return properties.get(COMPRESSION_CODEC);
} else if (properties.containsKey(SPARK_SQL_COMPRESSION_CODEC)) {
return properties.get(SPARK_SQL_COMPRESSION_CODEC);
}
String fileFormat = getFileFormat(table);
if (fileFormat.equalsIgnoreCase("parquet")) {
return properties.getOrDefault(
TableProperties.PARQUET_COMPRESSION, TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
} else if (fileFormat.equalsIgnoreCase("orc")) {
return properties.getOrDefault(
TableProperties.ORC_COMPRESSION, TableProperties.ORC_COMPRESSION_DEFAULT);
}
throw new NotSupportedException("Unsupported file format: " + fileFormat);
}
public static String dataLocation(Table table) {
Map<String, String> properties = table.properties();
if (properties.containsKey(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)) {
throw new NotSupportedException(
"Table " + table.name() + " specifies " + properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)
+ " as a location provider. "
+ "Writing to Iceberg tables with custom location provider is not supported.");
}
String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION);
if (dataLocation == null) {
dataLocation = properties.get(TableProperties.WRITE_FOLDER_STORAGE_LOCATION);
if (dataLocation == null) {
dataLocation = String.format("%s/data", LocationUtil.stripTrailingSlash(table.location()));
}
}
return dataLocation;
}
}

View File

@ -24,11 +24,11 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import java.util.Map;
@ -61,14 +61,7 @@ public class IcebergApiSource implements IcebergSource {
@Override
public String getFileFormat() {
Map<String, String> properties = originTable.properties();
if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) {
return properties.get(TableProperties.DEFAULT_FILE_FORMAT);
}
if (properties.containsKey(FLINK_WRITE_FORMAT)) {
return properties.get(FLINK_WRITE_FORMAT);
}
return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
return IcebergUtils.getFileFormat(originTable);
}
@Override

View File

@ -26,12 +26,11 @@ import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.iceberg.TableProperties;
import java.util.Map;
public class IcebergHMSSource implements IcebergSource {
@ -59,14 +58,7 @@ public class IcebergHMSSource implements IcebergSource {
@Override
public String getFileFormat() throws DdlException, MetaNotFoundException {
Map<String, String> properties = hmsTable.getRemoteTable().getParameters();
if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) {
return properties.get(TableProperties.DEFAULT_FILE_FORMAT);
}
if (properties.containsKey(FLINK_WRITE_FORMAT)) {
return properties.get(FLINK_WRITE_FORMAT);
}
return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
return IcebergUtils.getFileFormat(icebergTable);
}
public org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException {

View File

@ -27,9 +27,6 @@ import org.apache.doris.thrift.TFileAttributes;
public interface IcebergSource {
// compatible with flink, which is "write.format.default" in spark
String FLINK_WRITE_FORMAT = "write-format";
TupleDescriptor getDesc();
org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException;

View File

@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.analyzer;
import org.apache.doris.nereids.exceptions.UnboundException;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.UnboundLogicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.BlockFuncDepsPropagation;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
import org.apache.doris.nereids.util.Utils;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* Represent an external table sink plan node that has not been bound.
*/
public abstract class UnboundBaseExternalTableSink<CHILD_TYPE extends Plan> extends UnboundLogicalSink<CHILD_TYPE>
implements Unbound, Sink, BlockFuncDepsPropagation {
List<String> hints;
List<String> partitions;
/**
* constructor
*/
public UnboundBaseExternalTableSink(List<String> nameParts,
PlanType type,
List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties,
List<String> colNames,
DMLCommandType dmlCommandType,
CHILD_TYPE child,
List<String> hints,
List<String> partitions) {
super(nameParts, type, outputExprs, groupExpression,
logicalProperties, colNames, dmlCommandType, child);
this.hints = Utils.copyRequiredList(hints);
this.partitions = Utils.copyRequiredList(partitions);
}
public List<String> getColNames() {
return colNames;
}
public List<String> getPartitions() {
return partitions;
}
public List<String> getHints() {
return hints;
}
@Override
public UnboundBaseExternalTableSink<CHILD_TYPE> withOutputExprs(List<NamedExpression> outputExprs) {
throw new UnboundException("could not call withOutputExprs on " + this.getClass().getSimpleName());
}
@Override
public List<? extends Expression> getExpressions() {
throw new UnsupportedOperationException(this.getClass().getSimpleName() + " don't support getExpression()");
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UnboundBaseExternalTableSink<?> that = (UnboundBaseExternalTableSink<?>) o;
return Objects.equals(nameParts, that.nameParts)
&& Objects.equals(colNames, that.colNames)
&& Objects.equals(hints, that.hints)
&& Objects.equals(partitions, that.partitions);
}
@Override
public int hashCode() {
return Objects.hash(nameParts, colNames, hints, partitions);
}
@Override
public LogicalProperties computeLogicalProperties() {
return UnboundLogicalProperties.INSTANCE;
}
@Override
public List<Slot> computeOutput() {
throw new UnboundException("output");
}
}

View File

@ -17,36 +17,23 @@
package org.apache.doris.nereids.analyzer;
import org.apache.doris.nereids.exceptions.UnboundException;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.UnboundLogicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.BlockFuncDepsPropagation;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* Represent an hive table sink plan node that has not been bound.
* Represent a hive table sink plan node that has not been bound.
*/
public class UnboundHiveTableSink<CHILD_TYPE extends Plan> extends UnboundLogicalSink<CHILD_TYPE>
implements Unbound, Sink, BlockFuncDepsPropagation {
private final List<String> hints;
private final List<String> partitions;
public class UnboundHiveTableSink<CHILD_TYPE extends Plan> extends UnboundBaseExternalTableSink<CHILD_TYPE> {
public UnboundHiveTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, CHILD_TYPE child) {
@ -57,41 +44,16 @@ public class UnboundHiveTableSink<CHILD_TYPE extends Plan> extends UnboundLogica
/**
* constructor
*/
public UnboundHiveTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
public UnboundHiveTableSink(List<String> nameParts,
List<String> colNames,
List<String> hints,
List<String> partitions,
DMLCommandType dmlCommandType,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(nameParts, PlanType.LOGICAL_UNBOUND_HIVE_TABLE_SINK, ImmutableList.of(), groupExpression,
logicalProperties, colNames, dmlCommandType, child);
this.hints = Utils.copyRequiredList(hints);
this.partitions = Utils.copyRequiredList(partitions);
}
public List<String> getColNames() {
return colNames;
}
public List<String> getPartitions() {
return partitions;
}
public List<String> getHints() {
return hints;
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"UnboundHiveTableSink only accepts one child");
return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, Optional.empty(), children.get(0));
}
@Override
public UnboundHiveTableSink<CHILD_TYPE> withOutputExprs(List<NamedExpression> outputExprs) {
throw new UnboundException("could not call withOutputExprs on UnboundHiveTableSink");
logicalProperties, colNames, dmlCommandType, child, hints, partitions);
}
@Override
@ -100,50 +62,23 @@ public class UnboundHiveTableSink<CHILD_TYPE extends Plan> extends UnboundLogica
}
@Override
public List<? extends Expression> getExpressions() {
throw new UnsupportedOperationException(this.getClass().getSimpleName() + " don't support getExpression()");
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UnboundHiveTableSink<?> that = (UnboundHiveTableSink<?>) o;
return Objects.equals(nameParts, that.nameParts)
&& Objects.equals(colNames, that.colNames)
&& Objects.equals(hints, that.hints)
&& Objects.equals(partitions, that.partitions);
}
@Override
public int hashCode() {
return Objects.hash(nameParts, colNames, hints, partitions);
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"UnboundHiveTableSink only accepts one child");
return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, Optional.empty(), children.get(0));
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child());
dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, logicalProperties, children.get(0));
}
@Override
public LogicalProperties computeLogicalProperties() {
return UnboundLogicalProperties.INSTANCE;
}
@Override
public List<Slot> computeOutput() {
throw new UnboundException("output");
dmlCommandType, groupExpression, logicalProperties, children.get(0));
}
}

View File

@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.analyzer;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Optional;
/**
* Represent an iceberg table sink plan node that has not been bound.
*/
public class UnboundIcebergTableSink<CHILD_TYPE extends Plan> extends UnboundBaseExternalTableSink<CHILD_TYPE> {
public UnboundIcebergTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, DMLCommandType.NONE,
Optional.empty(), Optional.empty(), child);
}
/**
* constructor
*/
public UnboundIcebergTableSink(List<String> nameParts,
List<String> colNames,
List<String> hints,
List<String> partitions,
DMLCommandType dmlCommandType,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(nameParts, PlanType.LOGICAL_UNBOUND_HIVE_TABLE_SINK, ImmutableList.of(), groupExpression,
logicalProperties, colNames, dmlCommandType, child, hints, partitions);
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"UnboundHiveTableSink only accepts one child");
return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, Optional.empty(), children.get(0));
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitUnboundIcebergTableSink(this, context);
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, logicalProperties, children.get(0));
}
}

View File

@ -22,6 +22,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.exceptions.ParseException;
import org.apache.doris.nereids.trees.plans.Plan;
@ -53,6 +54,8 @@ public class UnboundTableSinkCreator {
return new UnboundTableSink<>(nameParts, colNames, hints, partitions, query);
} else if (curCatalog instanceof HMSExternalCatalog) {
return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, query);
} else if (curCatalog instanceof IcebergExternalCatalog) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions, query);
}
throw new UserException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported.");
}
@ -72,6 +75,9 @@ public class UnboundTableSinkCreator {
} else if (curCatalog instanceof HMSExternalCatalog) {
return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
} else if (curCatalog instanceof IcebergExternalCatalog) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
}
throw new RuntimeException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported.");
}
@ -101,6 +107,9 @@ public class UnboundTableSinkCreator {
} else if (curCatalog instanceof HMSExternalCatalog && !isAutoDetectPartition) {
return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
} else if (curCatalog instanceof IcebergExternalCatalog && !isAutoDetectPartition) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, Optional.empty(), Optional.empty(), plan);
}
throw new AnalysisException(
"Auto overwrite data to " + curCatalog.getClass().getSimpleName() + " is not supported."

View File

@ -121,6 +121,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
@ -166,6 +167,7 @@ import org.apache.doris.planner.GroupCommitBlockSink;
import org.apache.doris.planner.HashJoinNode;
import org.apache.doris.planner.HashJoinNode.DistributionMode;
import org.apache.doris.planner.HiveTableSink;
import org.apache.doris.planner.IcebergTableSink;
import org.apache.doris.planner.IntersectNode;
import org.apache.doris.planner.JoinNodeBase;
import org.apache.doris.planner.MultiCastDataSink;
@ -455,7 +457,28 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
slotDesc.setIsNullable(column.isAllowNull());
slotDesc.setAutoInc(column.isAutoInc());
}
HiveTableSink sink = new HiveTableSink(hiveTableSink.getTargetTable());
HiveTableSink sink = new HiveTableSink((HMSExternalTable) hiveTableSink.getTargetTable());
rootFragment.setSink(sink);
return rootFragment;
}
@Override
public PlanFragment visitPhysicalIcebergTableSink(PhysicalIcebergTableSink<? extends Plan> icebergTableSink,
PlanTranslatorContext context) {
PlanFragment rootFragment = icebergTableSink.child().accept(this, context);
rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
TupleDescriptor hiveTuple = context.generateTupleDesc();
List<Column> targetTableColumns = icebergTableSink.getTargetTable().getFullSchema();
for (Column column : targetTableColumns) {
SlotDescriptor slotDesc = context.addSlotDesc(hiveTuple);
slotDesc.setIsMaterialized(true);
slotDesc.setType(column.getType());
slotDesc.setColumn(column);
slotDesc.setIsNullable(column.isAllowNull());
slotDesc.setAutoInc(column.isAutoInc());
}
IcebergTableSink sink = new IcebergTableSink((IcebergExternalTable) icebergTableSink.getTargetTable());
rootFragment.setSink(sink);
return rootFragment;
}

View File

@ -25,6 +25,7 @@ import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
@ -59,6 +60,13 @@ public class TurnOffPageCacheForInsertIntoSelect extends PlanPreprocessor {
return tableSink;
}
@Override
public Plan visitLogicalIcebergTableSink(
LogicalIcebergTableSink<? extends Plan> tableSink, StatementContext context) {
turnOffPageCache(context);
return tableSink;
}
private void turnOffPageCache(StatementContext context) {
SessionVariable sessionVariable = context.getConnectContext().getSessionVariable();
// set temporary session value, and then revert value in the 'finally block' of StmtExecutor#execute

View File

@ -39,6 +39,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
@ -140,6 +141,17 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> {
return null;
}
@Override
public Void visitPhysicalIcebergTableSink(
PhysicalIcebergTableSink<? extends Plan> icebergTableSink, PlanContext context) {
if (connectContext != null && !connectContext.getSessionVariable().enableStrictConsistencyDml) {
addRequestPropertyToChildren(PhysicalProperties.ANY);
} else {
addRequestPropertyToChildren(icebergTableSink.getRequirePhysicalProperties());
}
return null;
}
@Override
public Void visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);

View File

@ -66,6 +66,7 @@ import org.apache.doris.nereids.rules.implementation.LogicalFileSinkToPhysicalFi
import org.apache.doris.nereids.rules.implementation.LogicalFilterToPhysicalFilter;
import org.apache.doris.nereids.rules.implementation.LogicalGenerateToPhysicalGenerate;
import org.apache.doris.nereids.rules.implementation.LogicalHiveTableSinkToPhysicalHiveTableSink;
import org.apache.doris.nereids.rules.implementation.LogicalIcebergTableSinkToPhysicalIcebergTableSink;
import org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect;
import org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan;
import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin;
@ -191,6 +192,7 @@ public class RuleSet {
.add(new LogicalGenerateToPhysicalGenerate())
.add(new LogicalOlapTableSinkToPhysicalOlapTableSink())
.add(new LogicalHiveTableSinkToPhysicalHiveTableSink())
.add(new LogicalIcebergTableSinkToPhysicalIcebergTableSink())
.add(new LogicalFileSinkToPhysicalFileSink())
.add(new LogicalResultSinkToPhysicalResultSink())
.add(new LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink())

View File

@ -31,6 +31,7 @@ public enum RuleType {
// **** make sure BINDING_UNBOUND_LOGICAL_PLAN is the lowest priority in the rewrite rules. ****
BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_ICEBERG_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_FILE(RuleTypeClass.REWRITE),
BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE),
@ -413,6 +414,7 @@ public enum RuleType {
LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ICEBERG_TABLE_SINK_TO_PHYSICAL_ICEBERG_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),

View File

@ -29,8 +29,11 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
@ -54,6 +57,7 @@ import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewri
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
@ -102,7 +106,9 @@ public class BindSink implements AnalysisRuleFactory {
})
),
// TODO: bind hive taget table
RuleType.BINDING_INSERT_HIVE_TABLE.build(unboundHiveTableSink().thenApply(this::bindHiveTableSink))
RuleType.BINDING_INSERT_HIVE_TABLE.build(unboundHiveTableSink().thenApply(this::bindHiveTableSink)),
RuleType.BINDING_INSERT_ICEBERG_TABLE.build(
unboundIcebergTableSink().thenApply(this::bindIcebergTableSink))
);
}
@ -393,12 +399,53 @@ public class BindSink implements AnalysisRuleFactory {
Column column = table.getColumn(cn);
if (column == null) {
throw new AnalysisException(String.format("column %s is not found in table %s",
cn, table.getName()));
cn, table.getName()));
}
return column;
}).collect(ImmutableList.toImmutableList());
}
LogicalHiveTableSink<?> boundSink = new LogicalHiveTableSink<>(
database,
table,
bindColumns,
child.getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList()),
sink.getDMLCommandType(),
Optional.empty(),
Optional.empty(),
child);
// we need to insert all the columns of the target table
if (boundSink.getCols().size() != child.getOutput().size()) {
throw new AnalysisException("insert into cols should be corresponding to the query output");
}
Map<String, NamedExpression> columnToOutput = getColumnToOutput(ctx, table, false,
boundSink, child);
LogicalProject<?> fullOutputProject = getOutputProjectByCoercion(table.getFullSchema(), child, columnToOutput);
return boundSink.withChildAndUpdateOutput(fullOutputProject);
}
private Plan bindIcebergTableSink(MatchingContext<UnboundIcebergTableSink<Plan>> ctx) {
UnboundIcebergTableSink<?> sink = ctx.root;
Pair<IcebergExternalDatabase, IcebergExternalTable> pair = bind(ctx.cascadesContext, sink);
IcebergExternalDatabase database = pair.first;
IcebergExternalTable table = pair.second;
LogicalPlan child = ((LogicalPlan) sink.child());
List<Column> bindColumns;
if (sink.getColNames().isEmpty()) {
bindColumns = table.getBaseSchema(true).stream().collect(ImmutableList.toImmutableList());
} else {
bindColumns = sink.getColNames().stream().map(cn -> {
Column column = table.getColumn(cn);
if (column == null) {
throw new AnalysisException(String.format("column %s is not found in table %s",
cn, table.getName()));
}
return column;
}).collect(ImmutableList.toImmutableList());
}
LogicalIcebergTableSink<?> boundSink = new LogicalIcebergTableSink<>(
database,
table,
bindColumns,
@ -442,11 +489,26 @@ public class BindSink implements AnalysisRuleFactory {
Pair<DatabaseIf<?>, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier,
cascadesContext.getConnectContext().getEnv());
if (pair.second instanceof HMSExternalTable) {
return Pair.of(((HMSExternalDatabase) pair.first), (HMSExternalTable) pair.second);
HMSExternalTable table = (HMSExternalTable) pair.second;
if (table.getDlaType() == HMSExternalTable.DLAType.HIVE) {
return Pair.of(((HMSExternalDatabase) pair.first), table);
}
}
throw new AnalysisException("the target table of insert into is not a Hive table");
}
private Pair<IcebergExternalDatabase, IcebergExternalTable> bind(CascadesContext cascadesContext,
UnboundIcebergTableSink<? extends Plan> sink) {
List<String> tableQualifier = RelationUtil.getQualifierName(cascadesContext.getConnectContext(),
sink.getNameParts());
Pair<DatabaseIf<?>, TableIf> pair = RelationUtil.getDbAndTable(tableQualifier,
cascadesContext.getConnectContext().getEnv());
if (pair.second instanceof IcebergExternalTable) {
return Pair.of(((IcebergExternalDatabase) pair.first), (IcebergExternalTable) pair.second);
}
throw new AnalysisException("the target table of insert into is not an iceberg table");
}
private List<Long> bindPartitionIds(OlapTable table, List<String> partitions, boolean temp) {
return partitions.isEmpty()
? ImmutableList.of()

View File

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.implementation;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import java.util.Optional;
/**
* Implementation rule that convert logical IcebergTableSink to physical IcebergTableSink.
*/
public class LogicalIcebergTableSinkToPhysicalIcebergTableSink extends OneImplementationRuleFactory {
@Override
public Rule build() {
return logicalIcebergTableSink().thenApply(ctx -> {
LogicalIcebergTableSink<? extends Plan> sink = ctx.root;
return new PhysicalIcebergTableSink<>(
sink.getDatabase(),
sink.getTargetTable(),
sink.getCols(),
sink.getOutputExprs(),
Optional.empty(),
sink.getLogicalProperties(),
null,
null,
sink.child());
}).toRule(RuleType.LOGICAL_ICEBERG_TABLE_SINK_TO_PHYSICAL_ICEBERG_TABLE_SINK_RULE);
}
}

View File

@ -48,6 +48,7 @@ public enum PlanType {
LOGICAL_FILE_SINK,
LOGICAL_OLAP_TABLE_SINK,
LOGICAL_HIVE_TABLE_SINK,
LOGICAL_ICEBERG_TABLE_SINK,
LOGICAL_RESULT_SINK,
LOGICAL_UNBOUND_OLAP_TABLE_SINK,
LOGICAL_UNBOUND_HIVE_TABLE_SINK,
@ -100,6 +101,7 @@ public enum PlanType {
PHYSICAL_FILE_SINK,
PHYSICAL_OLAP_TABLE_SINK,
PHYSICAL_HIVE_TABLE_SINK,
PHYSICAL_ICEBERG_TABLE_SINK,
PHYSICAL_RESULT_SINK,
// physical others

View File

@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
/**
* For Base External Table
*/
public class BaseExternalTableInsertCommandContext extends InsertCommandContext {
protected boolean overwrite = false;
public boolean isOverwrite() {
return overwrite;
}
public void setOverwrite(boolean overwrite) {
this.overwrite = overwrite;
}
}

View File

@ -0,0 +1,161 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.BaseExternalTableDataSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.transaction.TransactionManager;
import org.apache.doris.transaction.TransactionStatus;
import org.apache.doris.transaction.TransactionType;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Optional;
/**
* Insert executor for base external table
*/
public abstract class BaseExternalTableInsertExecutor extends AbstractInsertExecutor {
private static final Logger LOG = LogManager.getLogger(BaseExternalTableInsertExecutor.class);
private static final long INVALID_TXN_ID = -1L;
protected long txnId = INVALID_TXN_ID;
protected TransactionStatus txnStatus = TransactionStatus.ABORTED;
protected final TransactionManager transactionManager;
protected final String catalogName;
protected Optional<SummaryProfile> summaryProfile = Optional.empty();
/**
* constructor
*/
public BaseExternalTableInsertExecutor(ConnectContext ctx, ExternalTable table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
catalogName = table.getCatalog().getName();
transactionManager = table.getCatalog().getTransactionManager();
if (ConnectContext.get().getExecutor() != null) {
summaryProfile = Optional.of(ConnectContext.get().getExecutor().getSummaryProfile());
}
}
public long getTxnId() {
return txnId;
}
/**
* collect commit infos from BEs
*/
protected abstract void setCollectCommitInfoFunc();
/**
* At this time, FE has successfully collected all commit information from BEs.
* Before commit this txn, commit information need to be analyzed and processed.
*/
protected abstract void doBeforeCommit() throws UserException;
/**
* The type of the current transaction
*/
protected abstract TransactionType transactionType();
@Override
public void beginTransaction() {
txnId = transactionManager.begin();
setCollectCommitInfoFunc();
}
@Override
protected void onComplete() throws UserException {
if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) {
LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier());
} else {
doBeforeCommit();
summaryProfile.ifPresent(profile -> profile.setTransactionBeginTime(transactionType()));
transactionManager.commit(txnId);
summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime);
txnStatus = TransactionStatus.COMMITTED;
Env.getCurrentEnv().getRefreshManager().refreshTable(
catalogName,
table.getDatabase().getFullName(),
table.getName(),
true);
}
}
@Override
protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) {
try {
((BaseExternalTableDataSink) sink).bindDataSink(insertCtx);
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}
}
@Override
protected void onFail(Throwable t) {
errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
String queryId = DebugUtil.printId(ctx.queryId());
// if any throwable being thrown during insert operation, first we should abort this txn
LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t);
StringBuilder sb = new StringBuilder(t.getMessage());
if (txnId != INVALID_TXN_ID) {
LOG.warn("insert [{}] with query id {} abort txn {} failed", labelName, queryId, txnId);
if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
sb.append(". url: ").append(coordinator.getTrackingUrl());
}
}
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage());
transactionManager.rollback(txnId);
}
@Override
protected void afterExec(StmtExecutor executor) {
StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append("'status':'")
.append(ctx.isTxnModel() ? TransactionStatus.PREPARE.name() : txnStatus.name());
sb.append("', 'txnId':'").append(txnId).append("'");
if (!Strings.isNullOrEmpty(errMsg)) {
sb.append(", 'err':'").append(errMsg).append("'");
}
sb.append("}");
ctx.getState().setOk(loadedRows, 0, sb.toString());
// set insert result in connection context,
// so that user can use `show insert result` to get info of the last insert operation.
ctx.setOrUpdateInsertResult(txnId, labelName, database.getFullName(), table.getName(),
txnStatus, loadedRows, 0);
// update it, so that user can get loaded rows in fe.audit.log
ctx.updateReturnRows((int) loadedRows);
}
}

View File

@ -20,19 +20,10 @@ package org.apache.doris.nereids.trees.plans.commands.insert;
/**
* For Hive Table
*/
public class HiveInsertCommandContext extends InsertCommandContext {
private boolean overwrite = false;
public class HiveInsertCommandContext extends BaseExternalTableInsertCommandContext {
private String writePath;
private String queryId;
public boolean isOverwrite() {
return overwrite;
}
public void setOverwrite(boolean overwrite) {
this.overwrite = overwrite;
}
public String getWritePath() {
return writePath;
}

View File

@ -17,47 +17,26 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSTransaction;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.HiveTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionManager;
import org.apache.doris.transaction.TransactionStatus;
import org.apache.doris.transaction.TransactionType;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Optional;
/**
* Insert executor for olap table
* Insert executor for hive table
*/
public class HiveInsertExecutor extends AbstractInsertExecutor {
public class HiveInsertExecutor extends BaseExternalTableInsertExecutor {
private static final Logger LOG = LogManager.getLogger(HiveInsertExecutor.class);
private static final long INVALID_TXN_ID = -1L;
private long txnId = INVALID_TXN_ID;
private TransactionStatus txnStatus = TransactionStatus.ABORTED;
private final TransactionManager transactionManager;
private final String catalogName;
private Optional<SummaryProfile> summaryProfile = Optional.empty();
/**
* constructor
@ -66,36 +45,14 @@ public class HiveInsertExecutor extends AbstractInsertExecutor {
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
catalogName = table.getCatalog().getName();
transactionManager = table.getCatalog().getTransactionManager();
if (ConnectContext.get().getExecutor() != null) {
summaryProfile = Optional.of(ConnectContext.get().getExecutor().getSummaryProfile());
}
}
public long getTxnId() {
return txnId;
}
@Override
public void beginTransaction() {
txnId = transactionManager.begin();
public void setCollectCommitInfoFunc() {
HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId);
coordinator.setHivePartitionUpdateFunc(transaction::updateHivePartitionUpdates);
}
@Override
protected void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink physicalSink) {
HiveTableSink hiveTableSink = (HiveTableSink) sink;
PhysicalHiveTableSink<? extends Plan> physicalHiveSink = (PhysicalHiveTableSink<? extends Plan>) physicalSink;
try {
hiveTableSink.bindDataSink(physicalHiveSink.getCols(), insertCtx);
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e);
}
}
@Override
protected void beforeExec() {
// check params
@ -109,61 +66,16 @@ public class HiveInsertExecutor extends AbstractInsertExecutor {
}
@Override
protected void onComplete() throws UserException {
if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) {
LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier());
} else {
HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId);
loadedRows = transaction.getUpdateCnt();
String dbName = ((HMSExternalTable) table).getDbName();
String tbName = table.getName();
transaction.finishInsertTable(dbName, tbName);
summaryProfile.ifPresent(profile -> profile.setTransactionBeginTime(TransactionType.HMS));
transactionManager.commit(txnId);
summaryProfile.ifPresent(SummaryProfile::setTransactionEndTime);
txnStatus = TransactionStatus.COMMITTED;
Env.getCurrentEnv().getRefreshManager().refreshTable(
catalogName,
dbName,
tbName,
true);
}
protected void doBeforeCommit() throws UserException {
HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId);
loadedRows = transaction.getUpdateCnt();
String dbName = ((HMSExternalTable) table).getDbName();
String tbName = table.getName();
transaction.finishInsertTable(dbName, tbName);
}
@Override
protected void onFail(Throwable t) {
errMsg = t.getMessage() == null ? "unknown reason" : t.getMessage();
String queryId = DebugUtil.printId(ctx.queryId());
// if any throwable being thrown during insert operation, first we should abort this txn
LOG.warn("insert [{}] with query id {} failed", labelName, queryId, t);
StringBuilder sb = new StringBuilder(t.getMessage());
if (txnId != INVALID_TXN_ID) {
LOG.warn("insert [{}] with query id {} abort txn {} failed", labelName, queryId, txnId);
if (!Strings.isNullOrEmpty(coordinator.getTrackingUrl())) {
sb.append(". url: ").append(coordinator.getTrackingUrl());
}
}
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage());
transactionManager.rollback(txnId);
}
@Override
protected void afterExec(StmtExecutor executor) {
StringBuilder sb = new StringBuilder();
sb.append("{");
sb.append("'status':'")
.append(ctx.isTxnModel() ? TransactionStatus.PREPARE.name() : txnStatus.name());
sb.append("', 'txnId':'").append(txnId).append("'");
if (!Strings.isNullOrEmpty(errMsg)) {
sb.append(", 'err':'").append(errMsg).append("'");
}
sb.append("}");
ctx.getState().setOk(loadedRows, 0, sb.toString());
// set insert result in connection context,
// so that user can use `show insert result` to get info of the last insert operation.
ctx.setOrUpdateInsertResult(txnId, labelName, database.getFullName(), table.getName(),
txnStatus, loadedRows, 0);
// update it, so that user can get loaded rows in fe.audit.log
ctx.updateReturnRows((int) loadedRows);
protected TransactionType transactionType() {
return TransactionType.HMS;
}
}

View File

@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergTransaction;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.transaction.TransactionType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Optional;
/**
* Insert executor for iceberg table
*/
public class IcebergInsertExecutor extends BaseExternalTableInsertExecutor {
private static final Logger LOG = LogManager.getLogger(IcebergInsertExecutor.class);
/**
* constructor
*/
public IcebergInsertExecutor(ConnectContext ctx, IcebergExternalTable table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
}
@Override
public void setCollectCommitInfoFunc() {
IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId);
coordinator.setIcebergCommitDataFunc(transaction::updateIcebergCommitData);
}
@Override
protected void doBeforeCommit() throws UserException {
IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId);
loadedRows = transaction.getUpdateCnt();
transaction.finishInsert();
}
@Override
protected TransactionType transactionType() {
return TransactionType.ICEBERG;
}
@Override
protected void beforeExec() {
IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId);
transaction.beginInsert(((IcebergExternalTable) table).getDbName(), table.getName());
}
}

View File

@ -24,6 +24,7 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.util.ProfileManager.ProfileType;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.load.loadv2.LoadStatistic;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.NereidsPlanner;
@ -37,6 +38,7 @@ import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@ -177,9 +179,13 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner,
Optional.of(insertCtx.orElse((new HiveInsertCommandContext()))));
// set hive query options
} else if (physicalSink instanceof PhysicalIcebergTableSink) {
IcebergExternalTable icebergExternalTable = (IcebergExternalTable) targetTableIf;
insertExecutor = new IcebergInsertExecutor(ctx, icebergExternalTable, label, planner,
Optional.of(insertCtx.orElse((new BaseExternalTableInsertCommandContext()))));
} else {
// TODO: support other table types
throw new AnalysisException("insert into command only support olap table");
throw new AnalysisException("insert into command only support [olap, hive, iceberg] table");
}
insertExecutor.beginTransaction();

View File

@ -28,6 +28,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.analyzer.UnboundAlias;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
@ -369,8 +370,11 @@ public class InsertUtils {
unboundTableSink = (UnboundTableSink<? extends Plan>) plan;
} else if (plan instanceof UnboundHiveTableSink) {
unboundTableSink = (UnboundHiveTableSink<? extends Plan>) plan;
} else if (plan instanceof UnboundIcebergTableSink) {
unboundTableSink = (UnboundIcebergTableSink<? extends Plan>) plan;
} else {
throw new AnalysisException("the root of plan should be UnboundTableSink or UnboundHiveTableSink"
throw new AnalysisException("the root of plan should be"
+ " [UnboundTableSink, UnboundHiveTableSink, UnboundIcebergTableSink],"
+ " but it is " + plan.getType());
}
List<String> tableQualifier = RelationUtil.getQualifierName(ctx, unboundTableSink.getNameParts());

View File

@ -0,0 +1,150 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.PropagateFuncDeps;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* logical hive table sink for insert command
*/
public class LogicalIcebergTableSink<CHILD_TYPE extends Plan> extends LogicalTableSink<CHILD_TYPE>
implements Sink, PropagateFuncDeps {
// bound data sink
private final IcebergExternalDatabase database;
private final IcebergExternalTable targetTable;
private final DMLCommandType dmlCommandType;
/**
* constructor
*/
public LogicalIcebergTableSink(IcebergExternalDatabase database,
IcebergExternalTable targetTable,
List<Column> cols,
List<NamedExpression> outputExprs,
DMLCommandType dmlCommandType,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(PlanType.LOGICAL_ICEBERG_TABLE_SINK, outputExprs, groupExpression, logicalProperties, cols, child);
this.database = Objects.requireNonNull(database, "database != null in LogicalHiveTableSink");
this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalHiveTableSink");
this.dmlCommandType = dmlCommandType;
}
public Plan withChildAndUpdateOutput(Plan child) {
List<NamedExpression> output = child.getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList());
return new LogicalIcebergTableSink<>(database, targetTable, cols, output,
dmlCommandType, Optional.empty(), Optional.empty(), child);
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "LogicalHiveTableSink only accepts one child");
return new LogicalIcebergTableSink<>(database, targetTable, cols, outputExprs,
dmlCommandType, Optional.empty(), Optional.empty(), children.get(0));
}
public LogicalIcebergTableSink<CHILD_TYPE> withOutputExprs(List<NamedExpression> outputExprs) {
return new LogicalIcebergTableSink<>(database, targetTable, cols, outputExprs,
dmlCommandType, Optional.empty(), Optional.empty(), child());
}
public IcebergExternalDatabase getDatabase() {
return database;
}
public IcebergExternalTable getTargetTable() {
return targetTable;
}
public DMLCommandType getDmlCommandType() {
return dmlCommandType;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
LogicalIcebergTableSink<?> that = (LogicalIcebergTableSink<?>) o;
return dmlCommandType == that.dmlCommandType
&& Objects.equals(database, that.database)
&& Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), database, targetTable, cols, dmlCommandType);
}
@Override
public String toString() {
return Utils.toSqlString("LogicalHiveTableSink[" + id.asInt() + "]",
"outputExprs", outputExprs,
"database", database.getFullName(),
"targetTable", targetTable.getName(),
"cols", cols,
"dmlCommandType", dmlCommandType
);
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitLogicalIcebergTableSink(this, context);
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalIcebergTableSink<>(database, targetTable, cols, outputExprs,
dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new LogicalIcebergTableSink<>(database, targetTable, cols, outputExprs,
dmlCommandType, groupExpression, logicalProperties, children.get(0));
}
}

View File

@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.statistics.Statistics;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/** abstract physical external table sink */
public abstract class PhysicalBaseExternalTableSink<CHILD_TYPE extends Plan> extends PhysicalTableSink<CHILD_TYPE>
implements Sink {
protected final ExternalDatabase database;
protected final ExternalTable targetTable;
protected final List<Column> cols;
/**
* constructor
*/
public PhysicalBaseExternalTableSink(PlanType type,
ExternalDatabase database,
ExternalTable targetTable,
List<Column> cols,
List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
PhysicalProperties physicalProperties,
Statistics statistics,
CHILD_TYPE child) {
super(type, outputExprs, groupExpression,
logicalProperties, physicalProperties, statistics, child);
this.database = Objects.requireNonNull(
database, "database != null in " + this.getClass().getSimpleName());
this.targetTable = Objects.requireNonNull(
targetTable, "targetTable != null in " + this.getClass().getSimpleName());
this.cols = Utils.copyRequiredList(cols);
}
public ExternalTable getTargetTable() {
return targetTable;
}
@Override
public List<? extends Expression> getExpressions() {
return ImmutableList.of();
}
}

View File

@ -25,30 +25,20 @@ import org.apache.doris.nereids.properties.DistributionSpecTableSinkHashPartitio
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.statistics.Statistics;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/** abstract physical hive sink */
public class PhysicalHiveTableSink<CHILD_TYPE extends Plan> extends PhysicalTableSink<CHILD_TYPE> implements Sink {
private final HMSExternalDatabase database;
private final HMSExternalTable targetTable;
private final List<Column> cols;
/** physical hive sink */
public class PhysicalHiveTableSink<CHILD_TYPE extends Plan> extends PhysicalBaseExternalTableSink<CHILD_TYPE> {
/**
* constructor
@ -76,28 +66,14 @@ public class PhysicalHiveTableSink<CHILD_TYPE extends Plan> extends PhysicalTabl
PhysicalProperties physicalProperties,
Statistics statistics,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_HIVE_TABLE_SINK, outputExprs, groupExpression,
super(PlanType.PHYSICAL_HIVE_TABLE_SINK, database, targetTable, cols, outputExprs, groupExpression,
logicalProperties, physicalProperties, statistics, child);
this.database = Objects.requireNonNull(database, "database != null in PhysicalHiveTableSink");
this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in PhysicalHiveTableSink");
this.cols = Utils.copyRequiredList(cols);
}
public HMSExternalDatabase getDatabase() {
return database;
}
public HMSExternalTable getTargetTable() {
return targetTable;
}
public List<Column> getCols() {
return cols;
}
@Override
public Plan withChildren(List<Plan> children) {
return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, groupExpression,
return new PhysicalHiveTableSink<>(
(HMSExternalDatabase) database, (HMSExternalTable) targetTable, cols, outputExprs, groupExpression,
getLogicalProperties(), physicalProperties, statistics, children.get(0));
}
@ -106,27 +82,25 @@ public class PhysicalHiveTableSink<CHILD_TYPE extends Plan> extends PhysicalTabl
return visitor.visitPhysicalHiveTableSink(this, context);
}
@Override
public List<? extends Expression> getExpressions() {
return ImmutableList.of();
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs,
return new PhysicalHiveTableSink<>(
(HMSExternalDatabase) database, (HMSExternalTable) targetTable, cols, outputExprs,
groupExpression, getLogicalProperties(), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs,
return new PhysicalHiveTableSink<>(
(HMSExternalDatabase) database, (HMSExternalTable) targetTable, cols, outputExprs,
groupExpression, logicalProperties.get(), children.get(0));
}
@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs,
return new PhysicalHiveTableSink<>(
(HMSExternalDatabase) database, (HMSExternalTable) targetTable, cols, outputExprs,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}
@ -135,7 +109,7 @@ public class PhysicalHiveTableSink<CHILD_TYPE extends Plan> extends PhysicalTabl
*/
@Override
public PhysicalProperties getRequirePhysicalProperties() {
Set<String> hivePartitionKeys = targetTable.getPartitionColumnNames();
Set<String> hivePartitionKeys = ((HMSExternalTable) targetTable).getPartitionColumnNames();
if (!hivePartitionKeys.isEmpty()) {
List<Integer> columnIdx = new ArrayList<>();
List<Column> fullSchema = targetTable.getFullSchema();

View File

@ -0,0 +1,133 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.catalog.Column;
import org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpecTableSinkHashPartitioned;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.statistics.Statistics;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/** physical iceberg sink */
public class PhysicalIcebergTableSink<CHILD_TYPE extends Plan> extends PhysicalBaseExternalTableSink<CHILD_TYPE> {
/**
* constructor
*/
public PhysicalIcebergTableSink(IcebergExternalDatabase database,
IcebergExternalTable targetTable,
List<Column> cols,
List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
CHILD_TYPE child) {
this(database, targetTable, cols, outputExprs, groupExpression, logicalProperties,
PhysicalProperties.GATHER, null, child);
}
/**
* constructor
*/
public PhysicalIcebergTableSink(IcebergExternalDatabase database,
IcebergExternalTable targetTable,
List<Column> cols,
List<NamedExpression> outputExprs,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
PhysicalProperties physicalProperties,
Statistics statistics,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_ICEBERG_TABLE_SINK, database, targetTable, cols, outputExprs, groupExpression,
logicalProperties, physicalProperties, statistics, child);
}
@Override
public Plan withChildren(List<Plan> children) {
return new PhysicalIcebergTableSink<>(
(IcebergExternalDatabase) database, (IcebergExternalTable) targetTable,
cols, outputExprs, groupExpression,
getLogicalProperties(), physicalProperties, statistics, children.get(0));
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitPhysicalIcebergTableSink(this, context);
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalIcebergTableSink<>(
(IcebergExternalDatabase) database, (IcebergExternalTable) targetTable, cols, outputExprs,
groupExpression, getLogicalProperties(), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalIcebergTableSink<>(
(IcebergExternalDatabase) database, (IcebergExternalTable) targetTable, cols, outputExprs,
groupExpression, logicalProperties.get(), children.get(0));
}
@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
return new PhysicalIcebergTableSink<>(
(IcebergExternalDatabase) database, (IcebergExternalTable) targetTable, cols, outputExprs,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}
/**
* get output physical properties
*/
@Override
public PhysicalProperties getRequirePhysicalProperties() {
Set<String> partitionNames = targetTable.getPartitionNames();
if (!partitionNames.isEmpty()) {
List<Integer> columnIdx = new ArrayList<>();
List<Column> fullSchema = targetTable.getFullSchema();
for (int i = 0; i < fullSchema.size(); i++) {
Column column = fullSchema.get(i);
if (partitionNames.contains(column.getName())) {
columnIdx.add(i);
}
}
// mapping partition id
List<ExprId> exprIds = columnIdx.stream()
.map(idx -> child().getOutput().get(idx).getExprId())
.collect(Collectors.toList());
DistributionSpecTableSinkHashPartitioned shuffleInfo = new DistributionSpecTableSinkHashPartitioned();
shuffleInfo.setOutputColExprIds(exprIds);
return new PhysicalProperties(shuffleInfo);
}
return PhysicalProperties.SINK_RANDOM_PARTITIONED;
}
}

View File

@ -18,12 +18,14 @@
package org.apache.doris.nereids.trees.plans.visitor;
import org.apache.doris.nereids.analyzer.UnboundHiveTableSink;
import org.apache.doris.nereids.analyzer.UnboundIcebergTableSink;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
@ -31,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
@ -61,6 +64,10 @@ public interface SinkVisitor<R, C> {
return visitLogicalSink(unboundTableSink, context);
}
default R visitUnboundIcebergTableSink(UnboundIcebergTableSink<? extends Plan> unboundTableSink, C context) {
return visitLogicalSink(unboundTableSink, context);
}
default R visitUnboundResultSink(UnboundResultSink<? extends Plan> unboundResultSink, C context) {
return visitLogicalSink(unboundResultSink, context);
}
@ -85,6 +92,10 @@ public interface SinkVisitor<R, C> {
return visitLogicalTableSink(hiveTableSink, context);
}
default R visitLogicalIcebergTableSink(LogicalIcebergTableSink<? extends Plan> icebergTableSink, C context) {
return visitLogicalTableSink(icebergTableSink, context);
}
default R visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResultSink, C context) {
return visitLogicalSink(logicalResultSink, context);
}
@ -114,6 +125,10 @@ public interface SinkVisitor<R, C> {
return visitPhysicalTableSink(hiveTableSink, context);
}
default R visitPhysicalIcebergTableSink(PhysicalIcebergTableSink<? extends Plan> icebergTableSink, C context) {
return visitPhysicalTableSink(icebergTableSink, context);
}
default R visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, C context) {
return visitPhysicalSink(physicalResultSink, context);
}

View File

@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/DataSink.java
// and modified by Doris
package org.apache.doris.planner;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import java.util.Optional;
import java.util.Set;
public abstract class BaseExternalTableDataSink extends DataSink {
protected TDataSink tDataSink;
@Override
protected TDataSink toThrift() {
return tDataSink;
}
@Override
public PlanNodeId getExchNodeId() {
return null;
}
@Override
public DataPartition getOutputPartition() {
return DataPartition.RANDOM;
}
/**
* File format types supported by the current table
*/
protected abstract Set<TFileFormatType> supportedFileFormatTypes();
protected TFileFormatType getTFileFormatType(String format) throws AnalysisException {
TFileFormatType fileFormatType = TFileFormatType.FORMAT_UNKNOWN;
String lowerCase = format.toLowerCase();
if (lowerCase.contains("orc")) {
fileFormatType = TFileFormatType.FORMAT_ORC;
} else if (lowerCase.contains("parquet")) {
fileFormatType = TFileFormatType.FORMAT_PARQUET;
} else if (lowerCase.contains("text")) {
fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
}
if (!supportedFileFormatTypes().contains(fileFormatType)) {
throw new AnalysisException("Unsupported input format type: " + format);
}
return fileFormatType;
}
protected TFileCompressType getTFileCompressType(String compressType) {
if ("snappy".equalsIgnoreCase(compressType)) {
return TFileCompressType.SNAPPYBLOCK;
} else if ("lz4".equalsIgnoreCase(compressType)) {
return TFileCompressType.LZ4BLOCK;
} else if ("lzo".equalsIgnoreCase(compressType)) {
return TFileCompressType.LZO;
} else if ("zlib".equalsIgnoreCase(compressType)) {
return TFileCompressType.ZLIB;
} else if ("zstd".equalsIgnoreCase(compressType)) {
return TFileCompressType.ZSTD;
} else if ("uncompressed".equalsIgnoreCase(compressType)) {
return TFileCompressType.PLAIN;
} else {
// try to use plain type to decompress parquet or orc file
return TFileCompressType.PLAIN;
}
}
/**
* check sink params and generate thrift data sink to BE
* @param insertCtx insert info context
* @throws AnalysisException if source file format cannot be read
*/
public abstract void bindDataSink(Optional<InsertCommandContext> insertCtx) throws AnalysisException;
}

View File

@ -25,6 +25,7 @@ import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.odbc.sink.OdbcTableSink;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TExplainLevel;
@ -70,6 +71,8 @@ public abstract class DataSink {
return new OdbcTableSink((OdbcTable) table);
} else if (table instanceof HMSExternalTable) {
return new HiveTableSink((HMSExternalTable) table);
} else if (table instanceof IcebergExternalTable) {
return new IcebergTableSink((IcebergExternalTable) table);
} else {
throw new AnalysisException("Unknown table type " + table.getType());
}

View File

@ -31,7 +31,6 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.THiveBucket;
@ -50,16 +49,24 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class HiveTableSink extends DataSink {
public class HiveTableSink extends BaseExternalTableDataSink {
private HMSExternalTable targetTable;
protected TDataSink tDataSink;
private final HMSExternalTable targetTable;
private static final HashSet<TFileFormatType> supportedTypes = new HashSet<TFileFormatType>() {{
add(TFileFormatType.FORMAT_ORC);
add(TFileFormatType.FORMAT_PARQUET);
}};
public HiveTableSink(HMSExternalTable targetTable) {
super();
this.targetTable = targetTable;
}
@Override
protected Set<TFileFormatType> supportedFileFormatTypes() {
return supportedTypes;
}
@Override
public String getExplainString(String prefix, TExplainLevel explainLevel) {
StringBuilder strBuilder = new StringBuilder();
@ -72,26 +79,7 @@ public class HiveTableSink extends DataSink {
}
@Override
protected TDataSink toThrift() {
return tDataSink;
}
@Override
public PlanNodeId getExchNodeId() {
return null;
}
@Override
public DataPartition getOutputPartition() {
return DataPartition.RANDOM;
}
/**
* check sink params and generate thrift data sink to BE
* @param insertCtx insert info context
* @throws AnalysisException if source file format cannot be read
*/
public void bindDataSink(List<Column> insertCols, Optional<InsertCommandContext> insertCtx)
public void bindDataSink(Optional<InsertCommandContext> insertCtx)
throws AnalysisException {
THiveTableSink tSink = new THiveTableSink();
tSink.setDbName(targetTable.getDbName());
@ -124,7 +112,7 @@ public class HiveTableSink extends DataSink {
bucketInfo.setBucketCount(sd.getNumBuckets());
tSink.setBucketInfo(bucketInfo);
TFileFormatType formatType = getFileFormatType(sd);
TFileFormatType formatType = getTFileFormatType(sd.getInputFormat());
tSink.setFileFormat(formatType);
setCompressType(tSink, formatType);
@ -180,23 +168,7 @@ public class HiveTableSink extends DataSink {
compressType = "uncompressed";
break;
}
if ("snappy".equalsIgnoreCase(compressType)) {
tSink.setCompressionType(TFileCompressType.SNAPPYBLOCK);
} else if ("lz4".equalsIgnoreCase(compressType)) {
tSink.setCompressionType(TFileCompressType.LZ4BLOCK);
} else if ("lzo".equalsIgnoreCase(compressType)) {
tSink.setCompressionType(TFileCompressType.LZO);
} else if ("zlib".equalsIgnoreCase(compressType)) {
tSink.setCompressionType(TFileCompressType.ZLIB);
} else if ("zstd".equalsIgnoreCase(compressType)) {
tSink.setCompressionType(TFileCompressType.ZSTD);
} else if ("uncompressed".equalsIgnoreCase(compressType)) {
tSink.setCompressionType(TFileCompressType.PLAIN);
} else {
// try to use plain type to decompress parquet or orc file
tSink.setCompressionType(TFileCompressType.PLAIN);
}
tSink.setCompressionType(getTFileCompressType(compressType));
}
private void setPartitionValues(THiveTableSink tSink) throws AnalysisException {
@ -207,7 +179,7 @@ public class HiveTableSink extends DataSink {
for (org.apache.hadoop.hive.metastore.api.Partition partition : hivePartitions) {
THivePartition hivePartition = new THivePartition();
StorageDescriptor sd = partition.getSd();
hivePartition.setFileFormat(getFileFormatType(sd));
hivePartition.setFileFormat(getTFileFormatType(sd.getInputFormat()));
hivePartition.setValues(partition.getValues());
THiveLocationParams locationParams = new THiveLocationParams();
@ -222,20 +194,6 @@ public class HiveTableSink extends DataSink {
tSink.setPartitions(partitions);
}
private TFileFormatType getFileFormatType(StorageDescriptor sd) throws AnalysisException {
TFileFormatType fileFormatType;
if (sd.getInputFormat().toLowerCase().contains("orc")) {
fileFormatType = TFileFormatType.FORMAT_ORC;
} else if (sd.getInputFormat().toLowerCase().contains("parquet")) {
fileFormatType = TFileFormatType.FORMAT_PARQUET;
} else if (sd.getInputFormat().toLowerCase().contains("text")) {
fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
} else {
throw new AnalysisException("Unsupported input format type: " + sd.getInputFormat());
}
return fileFormatType;
}
protected TDataSinkType getDataSinkType() {
return TDataSinkType.HIVE_TABLE_SINK;
}

View File

@ -0,0 +1,146 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TIcebergTableSink;
import org.apache.doris.thrift.TSortField;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.types.Types;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
public class IcebergTableSink extends BaseExternalTableDataSink {
private final IcebergExternalTable targetTable;
private static final HashSet<TFileFormatType> supportedTypes = new HashSet<TFileFormatType>() {{
add(TFileFormatType.FORMAT_ORC);
add(TFileFormatType.FORMAT_PARQUET);
}};
public IcebergTableSink(IcebergExternalTable targetTable) {
super();
this.targetTable = targetTable;
}
@Override
protected Set<TFileFormatType> supportedFileFormatTypes() {
return supportedTypes;
}
@Override
public String getExplainString(String prefix, TExplainLevel explainLevel) {
StringBuilder strBuilder = new StringBuilder();
strBuilder.append(prefix).append("ICEBERG TABLE SINK\n");
if (explainLevel == TExplainLevel.BRIEF) {
return strBuilder.toString();
}
// TODO: explain partitions
return strBuilder.toString();
}
@Override
public void bindDataSink(Optional<InsertCommandContext> insertCtx)
throws AnalysisException {
TIcebergTableSink tSink = new TIcebergTableSink();
Table icebergTable = targetTable.getIcebergTable();
tSink.setDbName(targetTable.getDbName());
tSink.setTbName(targetTable.getName());
// schema
tSink.setSchemaJson(SchemaParser.toJson(icebergTable.schema()));
// partition spec
if (icebergTable.spec().isPartitioned()) {
tSink.setPartitionSpecsJson(Maps.transformValues(icebergTable.specs(), PartitionSpecParser::toJson));
tSink.setPartitionSpecId(icebergTable.spec().specId());
}
// sort order
if (icebergTable.sortOrder().isSorted()) {
SortOrder sortOrder = icebergTable.sortOrder();
Set<Integer> baseColumnFieldIds = icebergTable.schema().columns().stream()
.map(Types.NestedField::fieldId)
.collect(ImmutableSet.toImmutableSet());
ImmutableList.Builder<TSortField> sortFields = ImmutableList.builder();
for (SortField sortField : sortOrder.fields()) {
if (!sortField.transform().isIdentity()) {
continue;
}
if (!baseColumnFieldIds.contains(sortField.sourceId())) {
continue;
}
TSortField tSortField = new TSortField();
tSortField.setSourceColumnId(sortField.sourceId());
tSortField.setAscending(sortField.direction().equals(SortDirection.ASC));
tSortField.setNullFirst(sortField.nullOrder().equals(NullOrder.NULLS_FIRST));
sortFields.add(tSortField);
}
tSink.setSortFields(sortFields.build());
}
// file info
tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable)));
tSink.setCompressionType(getTFileCompressType(IcebergUtils.getFileCompress(icebergTable)));
// hadoop config
HashMap<String, String> props = new HashMap<>(icebergTable.properties());
Map<String, String> catalogProps = targetTable.getCatalog().getProperties();
props.putAll(catalogProps);
tSink.setHadoopConfig(props);
// location
LocationPath locationPath = new LocationPath(IcebergUtils.dataLocation(icebergTable), catalogProps);
tSink.setOutputPath(locationPath.toStorageLocation().toString());
tSink.setOriginalOutputPath(locationPath.toString());
tSink.setFileType(locationPath.getTFileTypeForBE());
if (insertCtx.isPresent()) {
BaseExternalTableInsertCommandContext context = (BaseExternalTableInsertCommandContext) insertCtx.get();
tSink.setOverwrite(context.isOverwrite());
}
tDataSink = new TDataSink(TDataSinkType.ICEBERG_TABLE_SINK);
tDataSink.setIcebergTableSink(tSink);
}
}

View File

@ -93,6 +93,7 @@ import org.apache.doris.thrift.TExternalScanRange;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TIcebergCommitData;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloScanRange;
import org.apache.doris.thrift.TPipelineFragmentParams;
@ -246,6 +247,9 @@ public class Coordinator implements CoordInterface {
// Collect all hivePartitionUpdates obtained from be
Consumer<List<THivePartitionUpdate>> hivePartitionUpdateFunc;
// Collect all icebergCommitData obtained from be
Consumer<List<TIcebergCommitData>> icebergCommitDataFunc;
// Input parameter
private long jobId = -1; // job which this task belongs to
private TUniqueId queryId;
@ -2481,6 +2485,10 @@ public class Coordinator implements CoordInterface {
this.hivePartitionUpdateFunc = hivePartitionUpdateFunc;
}
public void setIcebergCommitDataFunc(Consumer<List<TIcebergCommitData>> icebergCommitDataFunc) {
this.icebergCommitDataFunc = icebergCommitDataFunc;
}
// update job progress from BE
public void updateFragmentExecStatus(TReportExecStatusParams params) {
if (enablePipelineXEngine) {
@ -2531,6 +2539,9 @@ public class Coordinator implements CoordInterface {
if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) {
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
}
if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null) {
icebergCommitDataFunc.accept(params.getIcebergCommitDatas());
}
Preconditions.checkArgument(params.isSetDetailedReport());
if (ctx.done) {
@ -2596,6 +2607,9 @@ public class Coordinator implements CoordInterface {
if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) {
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
}
if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null) {
icebergCommitDataFunc.accept(params.getIcebergCommitDatas());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Query {} instance {} is marked done",
DebugUtil.printId(queryId), DebugUtil.printId(params.getFragmentInstanceId()));
@ -2668,6 +2682,9 @@ public class Coordinator implements CoordInterface {
if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) {
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
}
if (params.isSetIcebergCommitDatas() && icebergCommitDataFunc != null) {
icebergCommitDataFunc.accept(params.getIcebergCommitDatas());
}
instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L);
}
}

View File

@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.transaction;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
import org.apache.doris.datasource.iceberg.IcebergTransaction;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class IcebergTransactionManager implements TransactionManager {
private final Map<Long, IcebergTransaction> transactions = new ConcurrentHashMap<>();
private final IcebergMetadataOps ops;
public IcebergTransactionManager(IcebergMetadataOps ops) {
this.ops = ops;
}
@Override
public long begin() {
long id = Env.getCurrentEnv().getNextId();
IcebergTransaction icebergTransaction = new IcebergTransaction(ops);
transactions.put(id, icebergTransaction);
return id;
}
@Override
public void commit(long id) throws UserException {
getTransactionWithException(id).commit();
transactions.remove(id);
}
@Override
public void rollback(long id) {
getTransactionWithException(id).rollback();
transactions.remove(id);
}
@Override
public Transaction getTransaction(long id) {
return getTransactionWithException(id);
}
public Transaction getTransactionWithException(long id) {
Transaction icebergTransaction = transactions.get(id);
if (icebergTransaction == null) {
throw new RuntimeException("Can't find transaction for " + id);
}
return icebergTransaction;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.doris.transaction;
import org.apache.doris.datasource.hive.HiveMetadataOps;
import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
import org.apache.doris.fs.FileSystemProvider;
import java.util.concurrent.Executor;
@ -28,4 +29,8 @@ public class TransactionManagerFactory {
FileSystemProvider fileSystemProvider, Executor fileSystemExecutor) {
return new HiveTransactionManager(ops, fileSystemProvider, fileSystemExecutor);
}
public static TransactionManager createIcebergTransactionManager(IcebergMetadataOps ops) {
return new IcebergTransactionManager(ops);
}
}