diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java index 290fd39711..3044b214ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSTransaction.java @@ -35,6 +35,8 @@ import org.apache.doris.fs.remote.S3FileSystem; import org.apache.doris.fs.remote.SwitchingFileSystem; import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.THiveLocationParams; import org.apache.doris.thrift.THivePartitionUpdate; import org.apache.doris.thrift.TS3MPUPendingUpload; import org.apache.doris.thrift.TUpdateMode; @@ -63,6 +65,7 @@ import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; import software.amazon.awssdk.services.s3.model.CompletedPart; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -86,6 +89,8 @@ public class HMSTransaction implements Transaction { private final FileSystem fs; private Optional summaryProfile = Optional.empty(); private String queryId; + private boolean isOverwrite = false; + TFileType fileType; private final Map> tableActions = new HashMap<>(); private final Map, Action>> @@ -96,6 +101,7 @@ public class HMSTransaction implements Transaction { private HmsCommitter hmsCommitter; private List hivePartitionUpdates = Lists.newArrayList(); private String declaredIntentionsToWrite; + private boolean isMockedPartitionUpdate = false; private static class UncompletedMpuPendingUpload { @@ -173,9 +179,38 @@ public class HMSTransaction implements Transaction { public void beginInsertTable(HiveInsertCommandContext ctx) { declaredIntentionsToWrite = ctx.getWritePath(); queryId = ctx.getQueryId(); + isOverwrite = ctx.isOverwrite(); + fileType = ctx.getFileType(); } public void finishInsertTable(SimpleTableInfo tableInfo) { + Table table = getTable(tableInfo); + if (hivePartitionUpdates.isEmpty() && isOverwrite && table.getPartitionKeysSize() == 0) { + // use an empty hivePartitionUpdate to clean source table + isMockedPartitionUpdate = true; + THivePartitionUpdate emptyUpdate = new THivePartitionUpdate() {{ + setUpdateMode(TUpdateMode.OVERWRITE); + setFileSize(0); + setRowCount(0); + setFileNames(Collections.emptyList()); + if (fileType == TFileType.FILE_S3) { + setS3MpuPendingUploads(Lists.newArrayList(new TS3MPUPendingUpload())); + setLocation(new THiveLocationParams() {{ + setWritePath(table.getSd().getLocation()); + } + }); + } else { + fs.makeDir(declaredIntentionsToWrite); + setLocation(new THiveLocationParams() {{ + setWritePath(declaredIntentionsToWrite); + } + }); + } + } + }; + hivePartitionUpdates = Lists.newArrayList(emptyUpdate); + } + List mergedPUs = mergePartitions(hivePartitionUpdates); for (THivePartitionUpdate pu : mergedPUs) { if (pu.getS3MpuPendingUploads() != null) { @@ -185,7 +220,6 @@ public class HMSTransaction implements Transaction { } } } - Table table = getTable(tableInfo); List> insertExistsPartitions = new ArrayList<>(); for (THivePartitionUpdate pu : mergedPUs) { TUpdateMode updateMode = pu.getUpdateMode(); @@ -1534,6 +1568,12 @@ public class HMSTransaction implements Transaction { private void s3Commit(Executor fileSystemExecutor, List> asyncFileSystemTaskFutures, AtomicBoolean fileSystemTaskCancelled, THivePartitionUpdate hivePartitionUpdate, String path) { + + List s3MpuPendingUploads = hivePartitionUpdate.getS3MpuPendingUploads(); + if (isMockedPartitionUpdate) { + return; + } + S3FileSystem s3FileSystem = (S3FileSystem) ((SwitchingFileSystem) fs).fileSystem(path); S3Client s3Client; try { @@ -1542,7 +1582,7 @@ public class HMSTransaction implements Transaction { throw new RuntimeException(e); } - for (TS3MPUPendingUpload s3MPUPendingUpload : hivePartitionUpdate.getS3MpuPendingUploads()) { + for (TS3MPUPendingUpload s3MPUPendingUpload : s3MpuPendingUploads) { asyncFileSystemTaskFutures.add(CompletableFuture.runAsync(() -> { if (fileSystemTaskCancelled.get()) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index edc613ad05..c198b58b2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -33,14 +33,19 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.OverwriteFiles; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.WriteResult; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -91,10 +96,15 @@ public class IcebergTransaction implements Transaction { PartitionSpec spec = table.spec(); FileFormat fileFormat = IcebergUtils.getFileFormat(table); - //convert commitDataList to writeResult - WriteResult writeResult = IcebergWriterHelper - .convertToWriterResult(fileFormat, spec, commitDataList); - List pendingResults = Lists.newArrayList(writeResult); + List pendingResults; + if (commitDataList.isEmpty()) { + pendingResults = Collections.emptyList(); + } else { + //convert commitDataList to writeResult + WriteResult writeResult = IcebergWriterHelper + .convertToWriterResult(fileFormat, spec, commitDataList); + pendingResults = Lists.newArrayList(writeResult); + } if (updateMode == TUpdateMode.APPEND) { commitAppendTxn(table, pendingResults); @@ -138,6 +148,22 @@ public class IcebergTransaction implements Transaction { private void commitReplaceTxn(Table table, List pendingResults) { + if (pendingResults.isEmpty()) { + // such as : insert overwrite table `dst_tb` select * from `empty_tb` + // 1. if dst_tb is a partitioned table, it will return directly. + // 2. if dst_tb is an unpartitioned table, the `dst_tb` table will be emptied. + if (!table.spec().isPartitioned()) { + OverwriteFiles overwriteFiles = table.newOverwrite(); + try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { + fileScanTasks.forEach(f -> overwriteFiles.deleteFile(f.file())); + } catch (IOException e) { + throw new RuntimeException(e); + } + overwriteFiles.commit(); + } + return; + } + // commit replace partitions ReplacePartitions appendPartitionOp = table.newReplacePartitions(); for (WriteResult result : pendingResults) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java index 1e68a5cd22..ce7f1c9128 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertCommandContext.java @@ -17,12 +17,15 @@ package org.apache.doris.nereids.trees.plans.commands.insert; +import org.apache.doris.thrift.TFileType; + /** * For Hive Table */ public class HiveInsertCommandContext extends BaseExternalTableInsertCommandContext { private String writePath; private String queryId; + private TFileType fileType; public String getWritePath() { return writePath; @@ -39,4 +42,12 @@ public class HiveInsertCommandContext extends BaseExternalTableInsertCommandCont public void setQueryId(String queryId) { this.queryId = queryId; } + + public TFileType getFileType() { + return fileType; + } + + public void setFileType(TFileType fileType) { + this.fileType = fileType; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java index bbd278afef..185832fd5e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java @@ -129,6 +129,7 @@ public class HiveTableSink extends BaseExternalTableDataSink { HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get(); tSink.setOverwrite(context.isOverwrite()); context.setWritePath(storageLocation); + context.setFileType(fileType); } } else { String writeTempPath = createTempPath(location); @@ -139,6 +140,7 @@ public class HiveTableSink extends BaseExternalTableDataSink { HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get(); tSink.setOverwrite(context.isOverwrite()); context.setWritePath(writeTempPath); + context.setFileType(fileType); } } locationParams.setFileType(fileType); diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java index 4375dc5c02..432cc47f30 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java @@ -20,6 +20,7 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.common.UserException; import org.apache.doris.common.info.SimpleTableInfo; import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.nereids.trees.plans.commands.insert.IcebergInsertCommandContext; import org.apache.doris.thrift.TFileContent; import org.apache.doris.thrift.TIcebergCommitData; @@ -199,7 +200,7 @@ public class IcebergTransactionTest { txn.finishInsert(tableInfo, Optional.empty()); txn.commit(); - checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", "6"); + checkSnapshotAddProperties(table.currentSnapshot().summary(), "6", "2", "6"); checkPushDownByPartitionForTs(table, "ts1"); checkPushDownByPartitionForTs(table, "ts2"); checkPushDownByPartitionForTs(table, "ts3"); @@ -287,7 +288,7 @@ public class IcebergTransactionTest { ctd1.setFileSize(2); TIcebergCommitData ctd2 = new TIcebergCommitData(); - ctd2.setFilePath("f1.parquet"); + ctd2.setFilePath("f2.parquet"); ctd2.setFileContent(TFileContent.DATA); ctd2.setRowCount(4); ctd2.setFileSize(4); @@ -310,22 +311,31 @@ public class IcebergTransactionTest { txn.finishInsert(tableInfo, Optional.empty()); txn.commit(); - checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", "6"); + checkSnapshotAddProperties(table.currentSnapshot().summary(), "6", "2", "6"); } private IcebergTransaction getTxn() { return new IcebergTransaction(ops); } - private void checkSnapshotProperties(Map props, - String addRecords, - String addFileCnt, - String addFileSize) { + private void checkSnapshotAddProperties(Map props, + String addRecords, + String addFileCnt, + String addFileSize) { Assert.assertEquals(addRecords, props.get("added-records")); Assert.assertEquals(addFileCnt, props.get("added-data-files")); Assert.assertEquals(addFileSize, props.get("added-files-size")); } + private void checkSnapshotTotalProperties(Map props, + String totalRecords, + String totalFileCnt, + String totalFileSize) { + Assert.assertEquals(totalRecords, props.get("total-records")); + Assert.assertEquals(totalFileCnt, props.get("total-data-files")); + Assert.assertEquals(totalFileSize, props.get("total-files-size")); + } + private String numToYear(Integer num) { Transform year = Transforms.year(); return year.toHumanString(Types.IntegerType.get(), num); @@ -368,4 +378,75 @@ public class IcebergTransactionTest { Assert.assertEquals("2024-12-11", numToDay(dt)); } + @Test + public void testUnPartitionedTableOverwriteWithData() throws UserException { + + testUnPartitionedTable(); + + ArrayList ctdList = new ArrayList<>(); + TIcebergCommitData ctd1 = new TIcebergCommitData(); + ctd1.setFilePath("f3.parquet"); + ctd1.setFileContent(TFileContent.DATA); + ctd1.setRowCount(6); + ctd1.setFileSize(6); + + TIcebergCommitData ctd2 = new TIcebergCommitData(); + ctd2.setFilePath("f4.parquet"); + ctd2.setFileContent(TFileContent.DATA); + ctd2.setRowCount(8); + ctd2.setFileSize(8); + + TIcebergCommitData ctd3 = new TIcebergCommitData(); + ctd3.setFilePath("f5.parquet"); + ctd3.setFileContent(TFileContent.DATA); + ctd3.setRowCount(10); + ctd3.setFileSize(10); + + ctdList.add(ctd1); + ctdList.add(ctd2); + ctdList.add(ctd3); + + Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbWithoutPartition)); + new MockUp() { + @Mock + public Table getRemoteTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) { + return table; + } + }; + + IcebergTransaction txn = getTxn(); + txn.updateIcebergCommitData(ctdList); + SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbWithPartition); + txn.beginInsert(tableInfo); + IcebergInsertCommandContext ctx = new IcebergInsertCommandContext(); + ctx.setOverwrite(true); + txn.finishInsert(tableInfo, Optional.of(ctx)); + txn.commit(); + + checkSnapshotTotalProperties(table.currentSnapshot().summary(), "24", "3", "24"); + } + + @Test + public void testUnpartitionedTableOverwriteWithoutData() throws UserException { + + testUnPartitionedTableOverwriteWithData(); + + Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbWithoutPartition)); + new MockUp() { + @Mock + public Table getRemoteTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) { + return table; + } + }; + + IcebergTransaction txn = getTxn(); + SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbWithPartition); + txn.beginInsert(tableInfo); + IcebergInsertCommandContext ctx = new IcebergInsertCommandContext(); + ctx.setOverwrite(true); + txn.finishInsert(tableInfo, Optional.of(ctx)); + txn.commit(); + + checkSnapshotTotalProperties(table.currentSnapshot().summary(), "0", "0", "0"); + } } diff --git a/regression-test/data/external_table_p0/hive/write/test_hive_insert_overwrite_with_empty_table.out b/regression-test/data/external_table_p0/hive/write/test_hive_insert_overwrite_with_empty_table.out new file mode 100644 index 0000000000..042af1d473 --- /dev/null +++ b/regression-test/data/external_table_p0/hive/write/test_hive_insert_overwrite_with_empty_table.out @@ -0,0 +1,18 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q0 -- +1 1 +1 2 +1 3 + +-- !q1 -- +1 1 +1 2 +1 3 + +-- !q2 -- +1 1 +1 2 +1 3 + +-- !q3 -- + diff --git a/regression-test/data/external_table_p0/iceberg/write/test_iceberg_insert_overwrite_with_empty_table.out b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_insert_overwrite_with_empty_table.out new file mode 100644 index 0000000000..042af1d473 --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/write/test_iceberg_insert_overwrite_with_empty_table.out @@ -0,0 +1,18 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !q0 -- +1 1 +1 2 +1 3 + +-- !q1 -- +1 1 +1 2 +1 3 + +-- !q2 -- +1 1 +1 2 +1 3 + +-- !q3 -- + diff --git a/regression-test/suites/external_table_p0/hive/write/test_hive_insert_overwrite_with_empty_table.groovy b/regression-test/suites/external_table_p0/hive/write/test_hive_insert_overwrite_with_empty_table.groovy new file mode 100644 index 0000000000..eea2e7a486 --- /dev/null +++ b/regression-test/suites/external_table_p0/hive/write/test_hive_insert_overwrite_with_empty_table.groovy @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_insert_overwrite_with_empty_table", "p0,external,hive,external_docker,external_docker_hive") { + + for (String hivePrefix : ["hive2"]) { + + String enabled = context.config.otherConfigs.get("enableHiveTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable Hive test.") + return; + } + + setHivePrefix(hivePrefix) + try { + + String hms_port = context.config.otherConfigs.get(hivePrefix + "HmsPort") + String hdfs_port = context.config.otherConfigs.get(hivePrefix + "HdfsPort") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog = "test_hive_insert_overwrite_with_empty_table" + String db1 = catalog + "_db" + String tb1 = db1 + "_tb1" + String tb2 = db1 + "_tb2" + String tb3 = db1 + "_tb3" + + sql """drop catalog if exists ${catalog}""" + sql """create catalog if not exists ${catalog} properties ( + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}' + );""" + + sql """ use ${catalog}.${db1} """ + + sql """ drop table if exists ${db1}.${tb1} """ + sql """ drop table if exists ${db1}.${tb2} """ + sql """ drop database if exists ${db1} """ + + sql """ create database ${db1} """ + sql """ create table ${db1}.${tb1} (id int, val int) partition by list (val)() """ + sql """ create table ${db1}.${tb2} (id int, val int) """ + sql """ create table ${db1}.${tb3} (id int, val int) """ + + sql """ use ${db1} """ + sql """ insert into ${tb1} values (1,1), (1,2), (1,3) """ + sql """ insert into ${tb2} values (1,1), (1,2), (1,3) """ + + order_qt_q0 """ select * from ${tb1} """ + order_qt_q1 """ select * from ${tb2} """ + + sql """ insert overwrite table ${tb1} select * from ${tb3} """ + sql """ insert overwrite table ${tb2} select * from ${tb3} """ + + order_qt_q2 """ select * from ${tb1} """ // should have 3 records + order_qt_q3 """ select * from ${tb2} """ // should have no records + + sql """ drop table ${db1}.${tb1} """ + sql """ drop table ${db1}.${tb2} """ + sql """ drop table ${db1}.${tb3} """ + sql """ drop database ${db1} """ + sql """ drop catalog ${catalog} """ + + } finally { + } + } +} + diff --git a/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_insert_overwrite_with_empty_table.groovy b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_insert_overwrite_with_empty_table.groovy new file mode 100644 index 0000000000..84c0f287ab --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/write/test_iceberg_insert_overwrite_with_empty_table.groovy @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_insert_overwrite_with_empty_table", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable iceberg test.") + return + } + + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "test_iceberg_insert_overwrite_with_empty_table" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + sql """ switch ${catalog_name} """ + + String db1 = catalog_name + "_db1" + String tb1 = db1 + "_tb1" + String tb2 = db1 + "_tb2" + String tb3 = db1 + "_tb3" + + sql """ drop table if exists ${db1}.${tb1} """ + sql """ drop table if exists ${db1}.${tb2} """ + sql """ drop database if exists ${db1} """ + + sql """ create database ${db1} """ + sql """ create table ${db1}.${tb1} (id int, val int) partition by list (val)() """ + sql """ create table ${db1}.${tb2} (id int, val int) """ + sql """ create table ${db1}.${tb3} (id int, val int) """ + + sql """ use ${db1} """ + sql """ insert into ${tb1} values (1,1), (1,2), (1,3) """ + sql """ insert into ${tb2} values (1,1), (1,2), (1,3) """ + + order_qt_q0 """ select * from ${tb1} """ + order_qt_q1 """ select * from ${tb2} """ + + sql """ insert overwrite table ${tb1} select * from ${tb3} """ + sql """ insert overwrite table ${tb2} select * from ${tb3} """ + + order_qt_q2 """ select * from ${tb1} """ // should have 3 records + order_qt_q3 """ select * from ${tb2} """ // should have no records + + sql """ drop table ${db1}.${tb1} """ + sql """ drop table ${db1}.${tb2} """ + sql """ drop table ${db1}.${tb3} """ + sql """ drop database ${db1} """ + sql """ drop catalog ${catalog_name} """ + +}