From 140fb4c6e693a0df70814f7dc83bef878c125678 Mon Sep 17 00:00:00 2001 From: "Mingyu Chen (Rayner)" Date: Tue, 15 Jul 2025 20:46:36 -0700 Subject: [PATCH] branch-2.1: [fix](iceberg) Fix transaction issues (#52716) (#53238) bp #52716 Co-authored-by: wuwenchi Co-authored-by: wuwenchi.wwc --- .../iceberg/IcebergTransaction.java | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index 797caea0de..c7d7212335 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -111,8 +111,8 @@ public class IcebergTransaction implements Transaction { } private void updateManifestAfterInsert(TUpdateMode updateMode) { - PartitionSpec spec = table.spec(); - FileFormat fileFormat = IcebergUtils.getFileFormat(table); + PartitionSpec spec = transaction.table().spec(); + FileFormat fileFormat = IcebergUtils.getFileFormat(transaction.table()); List pendingResults; if (commitDataList.isEmpty()) { @@ -125,9 +125,9 @@ public class IcebergTransaction implements Transaction { } if (updateMode == TUpdateMode.APPEND) { - commitAppendTxn(table, pendingResults); + commitAppendTxn(pendingResults); } else { - commitReplaceTxn(table, pendingResults); + commitReplaceTxn(pendingResults); } } @@ -146,16 +146,15 @@ public class IcebergTransaction implements Transaction { return commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum(); } - private synchronized Table getNativeTable(SimpleTableInfo tableInfo) { Objects.requireNonNull(tableInfo); ExternalCatalog externalCatalog = ops.getExternalCatalog(); return IcebergUtils.getRemoteTable(externalCatalog, tableInfo); } - private void commitAppendTxn(Table table, List pendingResults) { + private void commitAppendTxn(List pendingResults) { // commit append files. - AppendFiles appendFiles = table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth()); + AppendFiles appendFiles = transaction.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth()); for (WriteResult result : pendingResults) { Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files for append."); @@ -165,13 +164,15 @@ public class IcebergTransaction implements Transaction { } - private void commitReplaceTxn(Table table, List pendingResults) { + private void commitReplaceTxn(List pendingResults) { if (pendingResults.isEmpty()) { // such as : insert overwrite table `dst_tb` select * from `empty_tb` // 1. if dst_tb is a partitioned table, it will return directly. // 2. if dst_tb is an unpartitioned table, the `dst_tb` table will be emptied. - if (!table.spec().isPartitioned()) { - OverwriteFiles overwriteFiles = table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth()); + if (!transaction.table().spec().isPartitioned()) { + OverwriteFiles overwriteFiles = transaction + .newOverwrite() + .scanManifestsWith(ops.getThreadPoolWithPreAuth()); try (CloseableIterable fileScanTasks = table.newScan().planFiles()) { fileScanTasks.forEach(f -> overwriteFiles.deleteFile(f.file())); } catch (IOException e) { @@ -183,7 +184,9 @@ public class IcebergTransaction implements Transaction { } // commit replace partitions - ReplacePartitions appendPartitionOp = table.newReplacePartitions(); + ReplacePartitions appendPartitionOp = transaction + .newReplacePartitions() + .scanManifestsWith(ops.getThreadPoolWithPreAuth()); for (WriteResult result : pendingResults) { Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files.");