[bugfix](hive/iceberg)align with Hive insert overwrite table functionality (#39840) (#40724)

bp #39840
This commit is contained in:
wuwenchi
2024-09-12 19:20:15 +08:00
committed by GitHub
parent 23b21fcebf
commit 4b7b43b5ca
9 changed files with 366 additions and 13 deletions

View File

@ -35,6 +35,8 @@ import org.apache.doris.fs.remote.S3FileSystem;
import org.apache.doris.fs.remote.SwitchingFileSystem;
import org.apache.doris.nereids.trees.plans.commands.insert.HiveInsertCommandContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.THiveLocationParams;
import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TS3MPUPendingUpload;
import org.apache.doris.thrift.TUpdateMode;
@ -63,6 +65,7 @@ import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -86,6 +89,8 @@ public class HMSTransaction implements Transaction {
private final FileSystem fs;
private Optional<SummaryProfile> summaryProfile = Optional.empty();
private String queryId;
private boolean isOverwrite = false;
TFileType fileType;
private final Map<SimpleTableInfo, Action<TableAndMore>> tableActions = new HashMap<>();
private final Map<SimpleTableInfo, Map<List<String>, Action<PartitionAndMore>>>
@ -96,6 +101,7 @@ public class HMSTransaction implements Transaction {
private HmsCommitter hmsCommitter;
private List<THivePartitionUpdate> hivePartitionUpdates = Lists.newArrayList();
private String declaredIntentionsToWrite;
private boolean isMockedPartitionUpdate = false;
private static class UncompletedMpuPendingUpload {
@ -173,9 +179,38 @@ public class HMSTransaction implements Transaction {
public void beginInsertTable(HiveInsertCommandContext ctx) {
declaredIntentionsToWrite = ctx.getWritePath();
queryId = ctx.getQueryId();
isOverwrite = ctx.isOverwrite();
fileType = ctx.getFileType();
}
public void finishInsertTable(SimpleTableInfo tableInfo) {
Table table = getTable(tableInfo);
if (hivePartitionUpdates.isEmpty() && isOverwrite && table.getPartitionKeysSize() == 0) {
// use an empty hivePartitionUpdate to clean source table
isMockedPartitionUpdate = true;
THivePartitionUpdate emptyUpdate = new THivePartitionUpdate() {{
setUpdateMode(TUpdateMode.OVERWRITE);
setFileSize(0);
setRowCount(0);
setFileNames(Collections.emptyList());
if (fileType == TFileType.FILE_S3) {
setS3MpuPendingUploads(Lists.newArrayList(new TS3MPUPendingUpload()));
setLocation(new THiveLocationParams() {{
setWritePath(table.getSd().getLocation());
}
});
} else {
fs.makeDir(declaredIntentionsToWrite);
setLocation(new THiveLocationParams() {{
setWritePath(declaredIntentionsToWrite);
}
});
}
}
};
hivePartitionUpdates = Lists.newArrayList(emptyUpdate);
}
List<THivePartitionUpdate> mergedPUs = mergePartitions(hivePartitionUpdates);
for (THivePartitionUpdate pu : mergedPUs) {
if (pu.getS3MpuPendingUploads() != null) {
@ -185,7 +220,6 @@ public class HMSTransaction implements Transaction {
}
}
}
Table table = getTable(tableInfo);
List<Pair<THivePartitionUpdate, HivePartitionStatistics>> insertExistsPartitions = new ArrayList<>();
for (THivePartitionUpdate pu : mergedPUs) {
TUpdateMode updateMode = pu.getUpdateMode();
@ -1534,6 +1568,12 @@ public class HMSTransaction implements Transaction {
private void s3Commit(Executor fileSystemExecutor, List<CompletableFuture<?>> asyncFileSystemTaskFutures,
AtomicBoolean fileSystemTaskCancelled, THivePartitionUpdate hivePartitionUpdate, String path) {
List<TS3MPUPendingUpload> s3MpuPendingUploads = hivePartitionUpdate.getS3MpuPendingUploads();
if (isMockedPartitionUpdate) {
return;
}
S3FileSystem s3FileSystem = (S3FileSystem) ((SwitchingFileSystem) fs).fileSystem(path);
S3Client s3Client;
try {
@ -1542,7 +1582,7 @@ public class HMSTransaction implements Transaction {
throw new RuntimeException(e);
}
for (TS3MPUPendingUpload s3MPUPendingUpload : hivePartitionUpdate.getS3MpuPendingUploads()) {
for (TS3MPUPendingUpload s3MPUPendingUpload : s3MpuPendingUploads) {
asyncFileSystemTaskFutures.add(CompletableFuture.runAsync(() -> {
if (fileSystemTaskCancelled.get()) {
return;

View File

@ -33,14 +33,19 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.OverwriteFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.WriteResult;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@ -91,10 +96,15 @@ public class IcebergTransaction implements Transaction {
PartitionSpec spec = table.spec();
FileFormat fileFormat = IcebergUtils.getFileFormat(table);
//convert commitDataList to writeResult
WriteResult writeResult = IcebergWriterHelper
.convertToWriterResult(fileFormat, spec, commitDataList);
List<WriteResult> pendingResults = Lists.newArrayList(writeResult);
List<WriteResult> pendingResults;
if (commitDataList.isEmpty()) {
pendingResults = Collections.emptyList();
} else {
//convert commitDataList to writeResult
WriteResult writeResult = IcebergWriterHelper
.convertToWriterResult(fileFormat, spec, commitDataList);
pendingResults = Lists.newArrayList(writeResult);
}
if (updateMode == TUpdateMode.APPEND) {
commitAppendTxn(table, pendingResults);
@ -138,6 +148,22 @@ public class IcebergTransaction implements Transaction {
private void commitReplaceTxn(Table table, List<WriteResult> pendingResults) {
if (pendingResults.isEmpty()) {
// such as : insert overwrite table `dst_tb` select * from `empty_tb`
// 1. if dst_tb is a partitioned table, it will return directly.
// 2. if dst_tb is an unpartitioned table, the `dst_tb` table will be emptied.
if (!table.spec().isPartitioned()) {
OverwriteFiles overwriteFiles = table.newOverwrite();
try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) {
fileScanTasks.forEach(f -> overwriteFiles.deleteFile(f.file()));
} catch (IOException e) {
throw new RuntimeException(e);
}
overwriteFiles.commit();
}
return;
}
// commit replace partitions
ReplacePartitions appendPartitionOp = table.newReplacePartitions();
for (WriteResult result : pendingResults) {

View File

@ -17,12 +17,15 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.thrift.TFileType;
/**
* For Hive Table
*/
public class HiveInsertCommandContext extends BaseExternalTableInsertCommandContext {
private String writePath;
private String queryId;
private TFileType fileType;
public String getWritePath() {
return writePath;
@ -39,4 +42,12 @@ public class HiveInsertCommandContext extends BaseExternalTableInsertCommandCont
public void setQueryId(String queryId) {
this.queryId = queryId;
}
public TFileType getFileType() {
return fileType;
}
public void setFileType(TFileType fileType) {
this.fileType = fileType;
}
}

View File

@ -129,6 +129,7 @@ public class HiveTableSink extends BaseExternalTableDataSink {
HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get();
tSink.setOverwrite(context.isOverwrite());
context.setWritePath(storageLocation);
context.setFileType(fileType);
}
} else {
String writeTempPath = createTempPath(location);
@ -139,6 +140,7 @@ public class HiveTableSink extends BaseExternalTableDataSink {
HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get();
tSink.setOverwrite(context.isOverwrite());
context.setWritePath(writeTempPath);
context.setFileType(fileType);
}
}
locationParams.setFileType(fileType);