bp #52716 Co-authored-by: wuwenchi <wuwenchi@selectdb.com> Co-authored-by: wuwenchi.wwc <wuwenchi.wwc@oceanbase.com>
This commit is contained in:
committed by
GitHub
parent
e2fb2566dd
commit
140fb4c6e6
@ -111,8 +111,8 @@ public class IcebergTransaction implements Transaction {
|
||||
}
|
||||
|
||||
private void updateManifestAfterInsert(TUpdateMode updateMode) {
|
||||
PartitionSpec spec = table.spec();
|
||||
FileFormat fileFormat = IcebergUtils.getFileFormat(table);
|
||||
PartitionSpec spec = transaction.table().spec();
|
||||
FileFormat fileFormat = IcebergUtils.getFileFormat(transaction.table());
|
||||
|
||||
List<WriteResult> pendingResults;
|
||||
if (commitDataList.isEmpty()) {
|
||||
@ -125,9 +125,9 @@ public class IcebergTransaction implements Transaction {
|
||||
}
|
||||
|
||||
if (updateMode == TUpdateMode.APPEND) {
|
||||
commitAppendTxn(table, pendingResults);
|
||||
commitAppendTxn(pendingResults);
|
||||
} else {
|
||||
commitReplaceTxn(table, pendingResults);
|
||||
commitReplaceTxn(pendingResults);
|
||||
}
|
||||
}
|
||||
|
||||
@ -146,16 +146,15 @@ public class IcebergTransaction implements Transaction {
|
||||
return commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum();
|
||||
}
|
||||
|
||||
|
||||
private synchronized Table getNativeTable(SimpleTableInfo tableInfo) {
|
||||
Objects.requireNonNull(tableInfo);
|
||||
ExternalCatalog externalCatalog = ops.getExternalCatalog();
|
||||
return IcebergUtils.getRemoteTable(externalCatalog, tableInfo);
|
||||
}
|
||||
|
||||
private void commitAppendTxn(Table table, List<WriteResult> pendingResults) {
|
||||
private void commitAppendTxn(List<WriteResult> pendingResults) {
|
||||
// commit append files.
|
||||
AppendFiles appendFiles = table.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth());
|
||||
AppendFiles appendFiles = transaction.newAppend().scanManifestsWith(ops.getThreadPoolWithPreAuth());
|
||||
for (WriteResult result : pendingResults) {
|
||||
Preconditions.checkState(result.referencedDataFiles().length == 0,
|
||||
"Should have no referenced data files for append.");
|
||||
@ -165,13 +164,15 @@ public class IcebergTransaction implements Transaction {
|
||||
}
|
||||
|
||||
|
||||
private void commitReplaceTxn(Table table, List<WriteResult> pendingResults) {
|
||||
private void commitReplaceTxn(List<WriteResult> pendingResults) {
|
||||
if (pendingResults.isEmpty()) {
|
||||
// such as : insert overwrite table `dst_tb` select * from `empty_tb`
|
||||
// 1. if dst_tb is a partitioned table, it will return directly.
|
||||
// 2. if dst_tb is an unpartitioned table, the `dst_tb` table will be emptied.
|
||||
if (!table.spec().isPartitioned()) {
|
||||
OverwriteFiles overwriteFiles = table.newOverwrite().scanManifestsWith(ops.getThreadPoolWithPreAuth());
|
||||
if (!transaction.table().spec().isPartitioned()) {
|
||||
OverwriteFiles overwriteFiles = transaction
|
||||
.newOverwrite()
|
||||
.scanManifestsWith(ops.getThreadPoolWithPreAuth());
|
||||
try (CloseableIterable<FileScanTask> fileScanTasks = table.newScan().planFiles()) {
|
||||
fileScanTasks.forEach(f -> overwriteFiles.deleteFile(f.file()));
|
||||
} catch (IOException e) {
|
||||
@ -183,7 +184,9 @@ public class IcebergTransaction implements Transaction {
|
||||
}
|
||||
|
||||
// commit replace partitions
|
||||
ReplacePartitions appendPartitionOp = table.newReplacePartitions();
|
||||
ReplacePartitions appendPartitionOp = transaction
|
||||
.newReplacePartitions()
|
||||
.scanManifestsWith(ops.getThreadPoolWithPreAuth());
|
||||
for (WriteResult result : pendingResults) {
|
||||
Preconditions.checkState(result.referencedDataFiles().length == 0,
|
||||
"Should have no referenced data files.");
|
||||
|
||||
Reference in New Issue
Block a user