[feature](hive)support insert overwrite (#32610)

support insert overwrite for unpartitioned table and partitioned table.

issue: #31442
This commit is contained in:
wuwenchi
2024-03-22 21:32:20 +08:00
committed by yiguolei
parent ac8ba43d8d
commit bd2e7d0ec8
4 changed files with 165 additions and 31 deletions

View File

@ -80,6 +80,8 @@ public class HMSCommitter {
private final Queue<DirectoryCleanUpTask> directoryCleanUpTasksForAbort = new ConcurrentLinkedQueue<>();
// when aborted, we need restore directory
private final List<RenameDirectoryTask> renameDirectoryTasksForAbort = new ArrayList<>();
// when finished, we need clear some directories
private final List<String> clearDirsForFinish = new ArrayList<>();
Executor fileSystemExecutor = Executors.newFixedThreadPool(16);
public HMSCommitter(HiveMetadataOps hiveOps, RemoteFileSystem fs, Table table) {
@ -105,6 +107,8 @@ public class HMSCommitter {
t.addSuppressed(new Exception("Failed to roll back after commit failure", e));
}
throw t;
} finally {
runClearPathsForFinish();
}
}
@ -250,7 +254,38 @@ public class HMSCommitter {
}
public void prepareOverwriteTable(THivePartitionUpdate pu, HivePartitionStatistics ps) {
String targetPath = pu.getLocation().getTargetPath();
String writePath = pu.getLocation().getWritePath();
if (!targetPath.equals(writePath)) {
Path path = new Path(targetPath);
String oldTablePath = new Path(path.getParent(), "_temp_" + path.getName()).toString();
Status status = fs.renameDir(
targetPath,
oldTablePath,
() -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldTablePath, targetPath)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + targetPath + " to " + oldTablePath + status.getErrMsg());
}
clearDirsForFinish.add(oldTablePath);
status = fs.renameDir(
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg());
}
}
updateStatisticsTasks.add(
new UpdateStatisticsTask(
table.getDbName(),
table.getTableName(),
Optional.empty(),
ps,
false
));
}
public void prepareCreateNewPartition(THivePartitionUpdate pu, HivePartitionStatistics ps) {
@ -335,7 +370,38 @@ public class HMSCommitter {
public void prepareOverwritePartition(THivePartitionUpdate pu, HivePartitionStatistics ps) {
String targetPath = pu.getLocation().getTargetPath();
String writePath = pu.getLocation().getWritePath();
if (!targetPath.equals(writePath)) {
Path path = new Path(targetPath);
String oldPartitionPath = new Path(path.getParent(), "_temp_" + path.getName()).toString();
Status status = fs.renameDir(
targetPath,
oldPartitionPath,
() -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldPartitionPath, targetPath)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + targetPath + " to " + oldPartitionPath + ":" + status.getErrMsg());
}
clearDirsForFinish.add(oldPartitionPath);
status = fs.renameDir(
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg());
}
}
updateStatisticsTasks.add(
new UpdateStatisticsTask(
table.getDbName(),
table.getTableName(),
Optional.of(pu.getName()),
ps,
false
));
}
@ -481,7 +547,7 @@ public class HMSCommitter {
createdPartitionValues.add(partition.getPartition().getPartitionValues());
}
} catch (Throwable t) {
LOG.error("Failed to add partition", t);
LOG.warn("Failed to add partition", t);
throw t;
}
}
@ -583,7 +649,27 @@ public class HMSCommitter {
}
private void runRenameDirTasksForAbort() {
// TODO abort
Status status;
for (RenameDirectoryTask task : renameDirectoryTasksForAbort) {
status = fs.exists(task.getRenameFrom());
if (status.ok()) {
status = fs.renameDir(task.getRenameFrom(), task.getRenameTo(), () -> {});
if (!status.ok()) {
LOG.warn("Failed to abort rename dir from {} to {}:{}",
task.getRenameFrom(), task.getRenameTo(), status.getErrMsg());
}
}
}
}
private void runClearPathsForFinish() {
Status status;
for (String path : clearDirsForFinish) {
status = fs.delete(path);
if (!status.ok()) {
LOG.warn("Failed to recursively delete path {}:{}", path, status.getErrCode());
}
}
}
@ -591,10 +677,10 @@ public class HMSCommitter {
DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, deleteEmptyDir);
if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
LOG.error("Failed to delete directory {}. Some eligible items can't be deleted: {}.",
LOG.warn("Failed to delete directory {}. Some eligible items can't be deleted: {}.",
directory.toString(), deleteResult.getNotDeletedEligibleItems());
} else if (deleteEmptyDir && !deleteResult.dirNotExists()) {
LOG.error("Failed to delete directory {} due to dir isn't empty", directory.toString());
LOG.warn("Failed to delete directory {} due to dir isn't empty", directory.toString());
}
}

View File

@ -49,6 +49,12 @@ public interface FileSystem {
Status rename(String origFilePath, String destFilePath);
default Status renameDir(String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
throw new UnsupportedOperationException("Unsupported operation rename dir on current file system.");
}
default void asyncRename(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,

View File

@ -412,6 +412,28 @@ public class DFSFileSystem extends RemoteFileSystem {
}
}
public Status renameDir(String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
Status status = exists(destFilePath);
if (status.ok()) {
throw new RuntimeException("Destination directory already exists: " + destFilePath);
}
String targetParent = new Path(destFilePath).getParent().toString();
status = exists(targetParent);
if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
status = makeDir(targetParent);
}
if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}
runWhenPathNotExist.run();
return rename(origFilePath, destFilePath);
}
@Override
public void asyncRenameDir(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
@ -423,23 +445,7 @@ public class DFSFileSystem extends RemoteFileSystem {
if (cancelled.get()) {
return;
}
Status status = exists(destFilePath);
if (status.ok()) {
throw new RuntimeException("Destination directory already exists: " + destFilePath);
}
String targetParent = new Path(destFilePath).getParent().toString();
status = exists(targetParent);
if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
makeDir(targetParent);
} else if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}
runWhenPathNotExist.run();
status = rename(origFilePath, destFilePath);
Status status = renameDir(origFilePath, destFilePath, runWhenPathNotExist);
if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}

View File

@ -134,7 +134,7 @@ public class HmsCommitTest {
pus.add(createRandomAppend(""));
hmsOps.commit(dbName, tbWithoutPartition, pus);
Table table = hmsClient.getTable(dbName, tbWithoutPartition);
Assert.assertEquals(3, Long.parseLong(table.getParameters().get("numRows")));
assertNumRows(3, table);
List<THivePartitionUpdate> pus2 = new ArrayList<>();
pus2.add(createRandomAppend(""));
@ -142,12 +142,19 @@ public class HmsCommitTest {
pus2.add(createRandomAppend(""));
hmsOps.commit(dbName, tbWithoutPartition, pus2);
table = hmsClient.getTable(dbName, tbWithoutPartition);
Assert.assertEquals(6, Long.parseLong(table.getParameters().get("numRows")));
assertNumRows(6, table);
}
@Test
public void testOverwritePartitionForUnPartitionedTable() {
// TODO
testAppendPartitionForUnPartitionedTable();
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomOverwrite(""));
pus.add(createRandomOverwrite(""));
pus.add(createRandomOverwrite(""));
hmsOps.commit(dbName, tbWithoutPartition, pus);
Table table = hmsClient.getTable(dbName, tbWithoutPartition);
assertNumRows(3, table);
}
@Test
@ -162,11 +169,11 @@ public class HmsCommitTest {
hmsOps.commit(dbName, tbWithPartition, pus);
Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a"));
Assert.assertEquals(3, Long.parseLong(pa.getParameters().get("numRows")));
assertNumRows(3, pa);
Partition pb = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("b"));
Assert.assertEquals(2, Long.parseLong(pb.getParameters().get("numRows")));
assertNumRows(2, pb);
Partition pc = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("c"));
Assert.assertEquals(1, Long.parseLong(pc.getParameters().get("numRows")));
assertNumRows(1, pc);
}
@Test
@ -183,11 +190,28 @@ public class HmsCommitTest {
hmsOps.commit(dbName, tbWithPartition, pus);
Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a"));
Assert.assertEquals(6, Long.parseLong(pa.getParameters().get("numRows")));
assertNumRows(6, pa);
Partition pb = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("b"));
Assert.assertEquals(4, Long.parseLong(pb.getParameters().get("numRows")));
assertNumRows(4, pb);
Partition pc = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("c"));
Assert.assertEquals(2, Long.parseLong(pc.getParameters().get("numRows")));
assertNumRows(2, pc);
}
@Test
public void testOverwritePartitionForPartitionedTable() {
testAppendPartitionForPartitionedTable();
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomOverwrite("a"));
pus.add(createRandomOverwrite("b"));
pus.add(createRandomOverwrite("c"));
hmsOps.commit(dbName, tbWithPartition, pus);
Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a"));
assertNumRows(1, pa);
Partition pb = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("b"));
assertNumRows(1, pb);
Partition pc = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("c"));
assertNumRows(1, pc);
}
@Test
@ -201,7 +225,7 @@ public class HmsCommitTest {
hmsOps.commit(dbName, tbWithPartition, pus);
for (int i = 0; i < nums; i++) {
Partition p = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("" + i));
Assert.assertEquals(1, Long.parseLong(p.getParameters().get("numRows")));
assertNumRows(1, p);
}
try {
@ -211,6 +235,14 @@ public class HmsCommitTest {
}
}
public void assertNumRows(long expected, Partition p) {
Assert.assertEquals(expected, Long.parseLong(p.getParameters().get("numRows")));
}
public void assertNumRows(long expected, Table t) {
Assert.assertEquals(expected, Long.parseLong(t.getParameters().get("numRows")));
}
public THivePartitionUpdate genOnePartitionUpdate(String partitionValue, TUpdateMode mode) {
String uuid = UUID.randomUUID().toString();
@ -242,4 +274,8 @@ public class HmsCommitTest {
public THivePartitionUpdate createRandomAppend(String partition) {
return genOnePartitionUpdate("c3=" + partition, TUpdateMode.APPEND);
}
public THivePartitionUpdate createRandomOverwrite(String partition) {
return genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE);
}
}