[feature](hive)support ExternalTransaction for writing exteral table (#32726)

Issue #31442

Add `TransactionManager` and `Transaction`. 

```
public interface Transaction {
    void commit() throws UserException;
    void rollback();
}
public interface TransactionManager {
    long begin();
    void commit(long id) throws UserException;
    void rollback(long id);
    Transaction getTransaction(long id);
}
```
`TransactionManager` is used to manage all external transactions:
The application layer should manage the entire transaction through this `TransactionManager`, like:
```
transactionManager.commit();
transactionManager.rollback();
```

`Transaction` is an interface. You can implement this interface according to the specific content, such as `HMSTransaction` currently implemented, iceberg that may be implemented in the future, etc.
This commit is contained in:
wuwenchi
2024-04-04 08:10:49 +08:00
committed by morningman
parent f0ac21e231
commit e11db3f050
14 changed files with 1597 additions and 822 deletions

View File

@ -50,6 +50,7 @@ import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.MasterCatalogExecutor;
import org.apache.doris.transaction.TransactionManager;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@ -111,6 +112,7 @@ public abstract class ExternalCatalog
private boolean objectCreated = false;
protected boolean invalidCacheInInit = true;
protected ExternalMetadataOps metadataOps;
protected TransactionManager transactionManager;
private ExternalSchemaCache schemaCache;
private String comment;

View File

@ -1,754 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java
// and modified by Doris
package org.apache.doris.datasource.hive;
import org.apache.doris.backup.Status;
import org.apache.doris.common.Pair;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TUpdateMode;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.airlift.concurrent.MoreFutures;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class HMSCommitter {
private static final Logger LOG = LogManager.getLogger(HMSCommitter.class);
private final HiveMetadataOps hiveOps;
private final RemoteFileSystem fs;
private final Table table;
// update statistics for unPartitioned table or existed partition
private final List<UpdateStatisticsTask> updateStatisticsTasks = new ArrayList<>();
Executor updateStatisticsExecutor = Executors.newFixedThreadPool(16);
// add new partition
private final AddPartitionsTask addPartitionsTask = new AddPartitionsTask();
private static final int PARTITION_COMMIT_BATCH_SIZE = 20;
// for file system rename operation
// whether to cancel the file system tasks
private final AtomicBoolean fileSystemTaskCancelled = new AtomicBoolean(false);
// file system tasks that are executed asynchronously, including rename_file, rename_dir
private final List<CompletableFuture<?>> asyncFileSystemTaskFutures = new ArrayList<>();
// when aborted, we need to delete all files under this path, even the current directory
private final Queue<DirectoryCleanUpTask> directoryCleanUpTasksForAbort = new ConcurrentLinkedQueue<>();
// when aborted, we need restore directory
private final List<RenameDirectoryTask> renameDirectoryTasksForAbort = new ArrayList<>();
// when finished, we need clear some directories
private final List<String> clearDirsForFinish = new ArrayList<>();
Executor fileSystemExecutor = Executors.newFixedThreadPool(16);
public HMSCommitter(HiveMetadataOps hiveOps, RemoteFileSystem fs, Table table) {
this.hiveOps = hiveOps;
this.fs = fs;
this.table = table;
}
public void commit(List<THivePartitionUpdate> hivePUs) {
try {
prepare(mergePartitions(hivePUs));
doCommit();
} catch (Throwable t) {
LOG.warn("Failed to commit for {}.{}, abort it.", table.getDbName(), table.getTableName());
try {
cancelUnStartedAsyncFileSystemTask();
undoUpdateStatisticsTasks();
undoAddPartitionsTask();
waitForAsyncFileSystemTaskSuppressThrowable();
runDirectoryClearUpTasksForAbort();
runRenameDirTasksForAbort();
} catch (Throwable e) {
t.addSuppressed(new Exception("Failed to roll back after commit failure", e));
}
throw t;
} finally {
runClearPathsForFinish();
}
}
public void prepare(List<THivePartitionUpdate> hivePUs) {
List<Pair<THivePartitionUpdate, HivePartitionStatistics>> insertExistsPartitions = new ArrayList<>();
for (THivePartitionUpdate pu : hivePUs) {
TUpdateMode updateMode = pu.getUpdateMode();
HivePartitionStatistics hivePartitionStatistics = HivePartitionStatistics.fromCommonStatistics(
pu.getRowCount(),
pu.getFileNamesSize(),
pu.getFileSize());
if (table.getPartitionKeysSize() == 0) {
Preconditions.checkArgument(hivePUs.size() == 1,
"When updating a non-partitioned table, multiple partitions should not be written");
switch (updateMode) {
case APPEND:
prepareAppendTable(pu, hivePartitionStatistics);
break;
case OVERWRITE:
prepareOverwriteTable(pu, hivePartitionStatistics);
break;
default:
throw new RuntimeException("Not support mode:[" + updateMode + "] in unPartitioned table");
}
} else {
switch (updateMode) {
case NEW:
prepareCreateNewPartition(pu, hivePartitionStatistics);
break;
case APPEND:
insertExistsPartitions.add(Pair.of(pu, hivePartitionStatistics));
break;
case OVERWRITE:
prepareOverwritePartition(pu, hivePartitionStatistics);
break;
default:
throw new RuntimeException("Not support mode:[" + updateMode + "] in unPartitioned table");
}
}
}
if (!insertExistsPartitions.isEmpty()) {
prepareInsertExistPartition(insertExistsPartitions);
}
}
public List<THivePartitionUpdate> mergePartitions(List<THivePartitionUpdate> hivePUs) {
Map<String, THivePartitionUpdate> mm = new HashMap<>();
for (THivePartitionUpdate pu : hivePUs) {
if (mm.containsKey(pu.getName())) {
THivePartitionUpdate old = mm.get(pu.getName());
old.setFileSize(old.getFileSize() + pu.getFileSize());
old.setRowCount(old.getRowCount() + pu.getRowCount());
old.getFileNames().addAll(pu.getFileNames());
} else {
mm.put(pu.getName(), pu);
}
}
return new ArrayList<>(mm.values());
}
public void doCommit() {
waitForAsyncFileSystemTasks();
doAddPartitionsTask();
doUpdateStatisticsTasks();
}
public void rollback() {
}
public void cancelUnStartedAsyncFileSystemTask() {
fileSystemTaskCancelled.set(true);
}
private void undoUpdateStatisticsTasks() {
ImmutableList.Builder<CompletableFuture<?>> undoUpdateFutures = ImmutableList.builder();
for (UpdateStatisticsTask task : updateStatisticsTasks) {
undoUpdateFutures.add(CompletableFuture.runAsync(() -> {
try {
task.undo(hiveOps);
} catch (Throwable throwable) {
LOG.warn("Failed to rollback: {}", task.getDescription(), throwable);
}
}, updateStatisticsExecutor));
}
for (CompletableFuture<?> undoUpdateFuture : undoUpdateFutures.build()) {
MoreFutures.getFutureValue(undoUpdateFuture);
}
}
private void undoAddPartitionsTask() {
if (addPartitionsTask.isEmpty()) {
return;
}
HivePartition firstPartition = addPartitionsTask.getPartitions().get(0).getPartition();
String dbName = firstPartition.getDbName();
String tableName = firstPartition.getTblName();
List<List<String>> rollbackFailedPartitions = addPartitionsTask.rollback(hiveOps);
if (!rollbackFailedPartitions.isEmpty()) {
LOG.warn("Failed to rollback: add_partition for partition values {}.{}.{}",
dbName, tableName, rollbackFailedPartitions);
}
}
private void waitForAsyncFileSystemTaskSuppressThrowable() {
for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Throwable t) {
// ignore
}
}
}
public void prepareAppendTable(THivePartitionUpdate pu, HivePartitionStatistics ps) {
String targetPath = pu.getLocation().getTargetPath();
String writePath = pu.getLocation().getWritePath();
if (!targetPath.equals(writePath)) {
fs.asyncRename(
fileSystemExecutor,
asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
writePath,
targetPath,
pu.getFileNames());
}
directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, false));
updateStatisticsTasks.add(
new UpdateStatisticsTask(
table.getDbName(),
table.getTableName(),
Optional.empty(),
ps,
true
));
}
public void prepareOverwriteTable(THivePartitionUpdate pu, HivePartitionStatistics ps) {
String targetPath = pu.getLocation().getTargetPath();
String writePath = pu.getLocation().getWritePath();
if (!targetPath.equals(writePath)) {
Path path = new Path(targetPath);
String oldTablePath = new Path(path.getParent(), "_temp_" + path.getName()).toString();
Status status = fs.renameDir(
targetPath,
oldTablePath,
() -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldTablePath, targetPath)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + targetPath + " to " + oldTablePath + status.getErrMsg());
}
clearDirsForFinish.add(oldTablePath);
status = fs.renameDir(
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg());
}
}
updateStatisticsTasks.add(
new UpdateStatisticsTask(
table.getDbName(),
table.getTableName(),
Optional.empty(),
ps,
false
));
}
public void prepareCreateNewPartition(THivePartitionUpdate pu, HivePartitionStatistics ps) {
String targetPath = pu.getLocation().getTargetPath();
String writePath = pu.getLocation().getWritePath();
if (!targetPath.equals(writePath)) {
fs.asyncRenameDir(
fileSystemExecutor,
asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true)));
}
StorageDescriptor sd = table.getSd();
HivePartition hivePartition = new HivePartition(
table.getDbName(),
table.getTableName(),
false,
sd.getInputFormat(),
pu.getLocation().getTargetPath(),
HiveUtil.toPartitionValues(pu.getName()),
Maps.newHashMap(),
sd.getOutputFormat(),
sd.getSerdeInfo().getSerializationLib(),
hiveOps.getClient().getSchema(table.getDbName(), table.getTableName())
);
HivePartitionWithStatistics partitionWithStats =
new HivePartitionWithStatistics(pu.getName(), hivePartition, ps);
addPartitionsTask.addPartition(partitionWithStats);
}
public void prepareInsertExistPartition(List<Pair<THivePartitionUpdate, HivePartitionStatistics>> partitions) {
for (List<Pair<THivePartitionUpdate, HivePartitionStatistics>> partitionBatch :
Iterables.partition(partitions, 100)) {
List<String> partitionNames = partitionBatch.stream()
.map(pair -> pair.first.getName())
.collect(Collectors.toList());
Map<String, Partition> partitionsByNamesMap = HiveUtil.convertToNamePartitionMap(
partitionNames,
hiveOps.getClient().getPartitions(table.getDbName(), table.getTableName(), partitionNames));
for (int i = 0; i < partitionsByNamesMap.size(); i++) {
String partitionName = partitionNames.get(i);
if (partitionsByNamesMap.get(partitionName) == null) {
// Prevent this partition from being deleted by other engines
throw new RuntimeException("Not found partition: " + partitionName);
}
THivePartitionUpdate pu = partitionBatch.get(i).first;
HivePartitionStatistics updateStats = partitionBatch.get(i).second;
String writePath = pu.getLocation().getWritePath();
String targetPath = pu.getLocation().getTargetPath();
directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, false));
if (!targetPath.equals(writePath)) {
fs.asyncRename(
fileSystemExecutor,
asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
writePath,
targetPath,
pu.getFileNames());
}
updateStatisticsTasks.add(
new UpdateStatisticsTask(
table.getDbName(),
table.getTableName(),
Optional.of(pu.getName()),
updateStats,
true));
}
}
}
public void prepareOverwritePartition(THivePartitionUpdate pu, HivePartitionStatistics ps) {
String targetPath = pu.getLocation().getTargetPath();
String writePath = pu.getLocation().getWritePath();
if (!targetPath.equals(writePath)) {
Path path = new Path(targetPath);
String oldPartitionPath = new Path(path.getParent(), "_temp_" + path.getName()).toString();
Status status = fs.renameDir(
targetPath,
oldPartitionPath,
() -> renameDirectoryTasksForAbort.add(new RenameDirectoryTask(oldPartitionPath, targetPath)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + targetPath + " to " + oldPartitionPath + ":" + status.getErrMsg());
}
clearDirsForFinish.add(oldPartitionPath);
status = fs.renameDir(
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true)));
if (!status.ok()) {
throw new RuntimeException(
"Error to rename dir from " + writePath + " to " + targetPath + ":" + status.getErrMsg());
}
}
updateStatisticsTasks.add(
new UpdateStatisticsTask(
table.getDbName(),
table.getTableName(),
Optional.of(pu.getName()),
ps,
false
));
}
private void waitForAsyncFileSystemTasks() {
for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
MoreFutures.getFutureValue(future, RuntimeException.class);
}
}
private void doAddPartitionsTask() {
if (!addPartitionsTask.isEmpty()) {
addPartitionsTask.run(hiveOps);
}
}
private void doUpdateStatisticsTasks() {
ImmutableList.Builder<CompletableFuture<?>> updateStatsFutures = ImmutableList.builder();
List<String> failedTaskDescriptions = new ArrayList<>();
List<Throwable> suppressedExceptions = new ArrayList<>();
for (UpdateStatisticsTask task : updateStatisticsTasks) {
updateStatsFutures.add(CompletableFuture.runAsync(() -> {
try {
task.run(hiveOps);
} catch (Throwable t) {
synchronized (suppressedExceptions) {
addSuppressedExceptions(suppressedExceptions, t, failedTaskDescriptions, task.getDescription());
}
}
}, updateStatisticsExecutor));
}
for (CompletableFuture<?> executeUpdateFuture : updateStatsFutures.build()) {
MoreFutures.getFutureValue(executeUpdateFuture);
}
if (!suppressedExceptions.isEmpty()) {
StringBuilder message = new StringBuilder();
message.append("Failed to execute some updating statistics tasks: ");
Joiner.on("; ").appendTo(message, failedTaskDescriptions);
RuntimeException exception = new RuntimeException(message.toString());
suppressedExceptions.forEach(exception::addSuppressed);
throw exception;
}
}
private static void addSuppressedExceptions(
List<Throwable> suppressedExceptions,
Throwable t,
List<String> descriptions,
String description) {
descriptions.add(description);
// A limit is needed to avoid having a huge exception object. 5 was chosen arbitrarily.
if (suppressedExceptions.size() < 5) {
suppressedExceptions.add(t);
}
}
private static class AddPartition {
}
private static class UpdateStatisticsTask {
private final String dbName;
private final String tableName;
private final Optional<String> partitionName;
private final HivePartitionStatistics updatePartitionStat;
private final boolean merge;
private boolean done;
public UpdateStatisticsTask(String dbName, String tableName, Optional<String> partitionName,
HivePartitionStatistics statistics, boolean merge) {
this.dbName = Objects.requireNonNull(dbName, "dbName is null");
this.tableName = Objects.requireNonNull(tableName, "tableName is null");
this.partitionName = Objects.requireNonNull(partitionName, "partitionName is null");
this.updatePartitionStat = Objects.requireNonNull(statistics, "statistics is null");
this.merge = merge;
}
public void run(HiveMetadataOps hiveOps) {
if (partitionName.isPresent()) {
hiveOps.updatePartitionStatistics(dbName, tableName, partitionName.get(), this::updateStatistics);
} else {
hiveOps.updateTableStatistics(dbName, tableName, this::updateStatistics);
}
done = true;
}
public void undo(HiveMetadataOps hmsOps) {
if (!done) {
return;
}
if (partitionName.isPresent()) {
hmsOps.updatePartitionStatistics(dbName, tableName, partitionName.get(), this::resetStatistics);
} else {
hmsOps.updateTableStatistics(dbName, tableName, this::resetStatistics);
}
}
public String getDescription() {
if (partitionName.isPresent()) {
return "alter partition parameters " + tableName + " " + partitionName.get();
} else {
return "alter table parameters " + tableName;
}
}
private HivePartitionStatistics updateStatistics(HivePartitionStatistics currentStats) {
return merge ? HivePartitionStatistics.merge(currentStats, updatePartitionStat) : updatePartitionStat;
}
private HivePartitionStatistics resetStatistics(HivePartitionStatistics currentStatistics) {
return HivePartitionStatistics
.reduce(currentStatistics, updatePartitionStat, HivePartitionStatistics.ReduceOperator.SUBTRACT);
}
}
public static class AddPartitionsTask {
private final List<HivePartitionWithStatistics> partitions = new ArrayList<>();
private final List<List<String>> createdPartitionValues = new ArrayList<>();
public boolean isEmpty() {
return partitions.isEmpty();
}
public List<HivePartitionWithStatistics> getPartitions() {
return partitions;
}
public void addPartition(HivePartitionWithStatistics partition) {
partitions.add(partition);
}
public void run(HiveMetadataOps hiveOps) {
HivePartition firstPartition = partitions.get(0).getPartition();
String dbName = firstPartition.getDbName();
String tableName = firstPartition.getTblName();
List<List<HivePartitionWithStatistics>> batchedPartitions =
Lists.partition(partitions, PARTITION_COMMIT_BATCH_SIZE);
for (List<HivePartitionWithStatistics> batch : batchedPartitions) {
try {
hiveOps.addPartitions(dbName, tableName, batch);
for (HivePartitionWithStatistics partition : batch) {
createdPartitionValues.add(partition.getPartition().getPartitionValues());
}
} catch (Throwable t) {
LOG.warn("Failed to add partition", t);
throw t;
}
}
partitions.clear();
}
public List<List<String>> rollback(HiveMetadataOps hiveOps) {
HivePartition firstPartition = partitions.get(0).getPartition();
String dbName = firstPartition.getDbName();
String tableName = firstPartition.getTblName();
List<List<String>> rollbackFailedPartitions = new ArrayList<>();
for (List<String> createdPartitionValue : createdPartitionValues) {
try {
hiveOps.dropPartition(dbName, tableName, createdPartitionValue, false);
} catch (Throwable t) {
LOG.warn("Failed to drop partition on {}.{}.{} when rollback",
dbName, tableName, rollbackFailedPartitions);
rollbackFailedPartitions.add(createdPartitionValue);
}
}
return rollbackFailedPartitions;
}
}
private static class DirectoryCleanUpTask {
private final Path path;
private final boolean deleteEmptyDir;
public DirectoryCleanUpTask(String path, boolean deleteEmptyDir) {
this.path = new Path(path);
this.deleteEmptyDir = deleteEmptyDir;
}
public Path getPath() {
return path;
}
public boolean isDeleteEmptyDir() {
return deleteEmptyDir;
}
@Override
public String toString() {
return new StringJoiner(", ", DirectoryCleanUpTask.class.getSimpleName() + "[", "]")
.add("path=" + path)
.add("deleteEmptyDir=" + deleteEmptyDir)
.toString();
}
}
public static class DeleteRecursivelyResult {
private final boolean dirNoLongerExists;
private final List<String> notDeletedEligibleItems;
public DeleteRecursivelyResult(boolean dirNoLongerExists, List<String> notDeletedEligibleItems) {
this.dirNoLongerExists = dirNoLongerExists;
this.notDeletedEligibleItems = notDeletedEligibleItems;
}
public boolean dirNotExists() {
return dirNoLongerExists;
}
public List<String> getNotDeletedEligibleItems() {
return notDeletedEligibleItems;
}
}
private void runDirectoryClearUpTasksForAbort() {
for (DirectoryCleanUpTask cleanUpTask : directoryCleanUpTasksForAbort) {
recursiveDeleteItems(cleanUpTask.getPath(), cleanUpTask.isDeleteEmptyDir());
}
}
private static class RenameDirectoryTask {
private final String renameFrom;
private final String renameTo;
public RenameDirectoryTask(String renameFrom, String renameTo) {
this.renameFrom = renameFrom;
this.renameTo = renameTo;
}
public String getRenameFrom() {
return renameFrom;
}
public String getRenameTo() {
return renameTo;
}
@Override
public String toString() {
return new StringJoiner(", ", RenameDirectoryTask.class.getSimpleName() + "[", "]")
.add("renameFrom:" + renameFrom)
.add("renameTo:" + renameTo)
.toString();
}
}
private void runRenameDirTasksForAbort() {
Status status;
for (RenameDirectoryTask task : renameDirectoryTasksForAbort) {
status = fs.exists(task.getRenameFrom());
if (status.ok()) {
status = fs.renameDir(task.getRenameFrom(), task.getRenameTo(), () -> {});
if (!status.ok()) {
LOG.warn("Failed to abort rename dir from {} to {}:{}",
task.getRenameFrom(), task.getRenameTo(), status.getErrMsg());
}
}
}
}
private void runClearPathsForFinish() {
Status status;
for (String path : clearDirsForFinish) {
status = fs.delete(path);
if (!status.ok()) {
LOG.warn("Failed to recursively delete path {}:{}", path, status.getErrCode());
}
}
}
private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir) {
DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, deleteEmptyDir);
if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
LOG.warn("Failed to delete directory {}. Some eligible items can't be deleted: {}.",
directory.toString(), deleteResult.getNotDeletedEligibleItems());
} else if (deleteEmptyDir && !deleteResult.dirNotExists()) {
LOG.warn("Failed to delete directory {} due to dir isn't empty", directory.toString());
}
}
public DeleteRecursivelyResult recursiveDeleteFiles(Path directory, boolean deleteEmptyDir) {
try {
if (!fs.exists(directory.getName()).ok()) {
return new DeleteRecursivelyResult(true, ImmutableList.of());
}
} catch (Exception e) {
ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder();
notDeletedEligibleItems.add(directory.toString() + "/*");
return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build());
}
return doRecursiveDeleteFiles(directory, deleteEmptyDir);
}
private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, boolean deleteEmptyDir) {
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.list(directory.getName(), remoteFiles);
if (!status.ok()) {
ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder();
notDeletedEligibleItems.add(directory + "/*");
return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build());
}
boolean isEmptyDir = true;
List<String> notDeletedEligibleItems = new ArrayList<>();
for (RemoteFile file : remoteFiles) {
if (file.isFile()) {
Path filePath = file.getPath();
isEmptyDir = false;
// TODO Check if this file was created by this query
if (!deleteIfExists(filePath)) {
notDeletedEligibleItems.add(filePath.toString());
}
} else if (file.isDirectory()) {
DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(file.getPath(), deleteEmptyDir);
if (!subResult.dirNotExists()) {
isEmptyDir = false;
}
if (!subResult.getNotDeletedEligibleItems().isEmpty()) {
notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems());
}
} else {
isEmptyDir = false;
notDeletedEligibleItems.add(file.getPath().toString());
}
}
if (isEmptyDir && deleteEmptyDir) {
Verify.verify(notDeletedEligibleItems.isEmpty());
if (!deleteIfExists(directory)) {
return new DeleteRecursivelyResult(false, ImmutableList.of(directory + "/"));
}
// all items of the location have been deleted.
return new DeleteRecursivelyResult(true, ImmutableList.of());
}
return new DeleteRecursivelyResult(false, notDeletedEligibleItems);
}
public boolean deleteIfExists(Path path) {
Status status = fs.delete(path.getName());
if (status.ok()) {
return true;
}
return !fs.exists(path.getName()).ok();
}
}

View File

@ -34,6 +34,7 @@ import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.operations.ExternalMetadataOperations;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.transaction.TransactionManagerFactory;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@ -145,7 +146,10 @@ public class HMSExternalCatalog extends ExternalCatalog {
AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
AuthenticationConfig.HADOOP_KERBEROS_KEYTAB));
}
metadataOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
HiveMetadataOps hiveOps = ExternalMetadataOperations.newHiveMetadataOps(hiveConf, jdbcClientConfig, this);
transactionManager = TransactionManagerFactory.createHiveTransactionManager(hiveOps);
transactionManager.setEditLog(Env.getCurrentEnv().getEditLog());
metadataOps = hiveOps;
}
@Override

File diff suppressed because it is too large Load Diff

View File

@ -36,13 +36,11 @@ import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.thrift.THivePartitionUpdate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -73,6 +71,7 @@ public class HiveMetadataOps implements ExternalMetadataOps {
public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client) {
this.catalog = catalog;
this.client = client;
// TODO Currently only supports DFSFileSystem, more types will be supported in the future
this.fs = new DFSFileSystem(catalog.getProperties());
}
@ -80,6 +79,10 @@ public class HiveMetadataOps implements ExternalMetadataOps {
return client;
}
public RemoteFileSystem getFs() {
return fs;
}
public static HMSCachedClient createCachedClient(HiveConf hiveConf, int thriftClientPoolSize,
JdbcClientConfig jdbcClientConfig) {
if (hiveConf != null) {
@ -253,23 +256,6 @@ public class HiveMetadataOps implements ExternalMetadataOps {
return client.getAllDatabases();
}
public void commit(String dbName,
String tableName,
List<THivePartitionUpdate> hivePUs) {
Table table = client.getTable(dbName, tableName);
HMSCommitter hmsCommitter = new HMSCommitter(this, fs, table);
hmsCommitter.commit(hivePUs);
try {
Env.getCurrentEnv().getCatalogMgr().refreshExternalTable(
dbName,
tableName,
catalog.getName(),
true);
} catch (DdlException e) {
LOG.warn("Failed to refresh table {}.{} : {}", dbName, tableName, e.getMessage());
}
}
public void updateTableStatistics(
String dbName,
String tableName,

View File

@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableMap;
import java.util.Map;
public class HivePartitionStatistics {
private static final HivePartitionStatistics EMPTY =
public static final HivePartitionStatistics EMPTY =
new HivePartitionStatistics(HiveCommonStatistics.EMPTY, ImmutableMap.of());
private final HiveCommonStatistics commonStatistics;

View File

@ -18,9 +18,9 @@
package org.apache.doris.datasource.hive;
public class HivePartitionWithStatistics {
private String name;
private HivePartition partition;
private HivePartitionStatistics statistics;
private final String name;
private final HivePartition partition;
private final HivePartitionStatistics statistics;
public HivePartitionWithStatistics(String name, HivePartition partition, HivePartitionStatistics statistics) {
this.name = name;

View File

@ -17,13 +17,12 @@
package org.apache.doris.nereids.trees.plans.commands.insert;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HiveMetadataOps;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
import org.apache.doris.datasource.hive.HMSTransaction;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.Plan;
@ -35,14 +34,13 @@ import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.transaction.TransactionManager;
import org.apache.doris.transaction.TransactionStatus;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Optional;
/**
@ -53,6 +51,8 @@ public class HiveInsertExecutor extends AbstractInsertExecutor {
private static final long INVALID_TXN_ID = -1L;
private long txnId = INVALID_TXN_ID;
private TransactionStatus txnStatus = TransactionStatus.ABORTED;
private final TransactionManager transactionManager;
private final String catalogName;
/**
* constructor
@ -61,6 +61,8 @@ public class HiveInsertExecutor extends AbstractInsertExecutor {
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
catalogName = table.getCatalog().getName();
transactionManager = table.getCatalog().getTransactionManager();
}
public long getTxnId() {
@ -69,7 +71,9 @@ public class HiveInsertExecutor extends AbstractInsertExecutor {
@Override
public void beginTransaction() {
// TODO: use hive txn rather than internal txn
txnId = transactionManager.begin();
HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId);
coordinator.setHivePartitionUpdateFunc(transaction::updateHivePartitionUpdates);
}
@Override
@ -93,13 +97,18 @@ public class HiveInsertExecutor extends AbstractInsertExecutor {
if (ctx.getState().getStateType() == QueryState.MysqlStateType.ERR) {
LOG.warn("errors when abort txn. {}", ctx.getQueryIdentifier());
} else {
// TODO use transaction
List<THivePartitionUpdate> ups = coordinator.getHivePartitionUpdates();
loadedRows = ups.stream().mapToLong(THivePartitionUpdate::getRowCount).sum();
ExternalCatalog catalog = ((HMSExternalTable) table).getCatalog();
ExternalMetadataOps metadataOps = catalog.getMetadataOps();
((HiveMetadataOps) metadataOps).commit(((HMSExternalTable) table).getDbName(), table.getName(), ups);
HMSTransaction transaction = (HMSTransaction) transactionManager.getTransaction(txnId);
loadedRows = transaction.getUpdateCnt();
String dbName = ((HMSExternalTable) table).getDbName();
String tbName = table.getName();
transaction.finishInsertTable(dbName, tbName);
transactionManager.commit(txnId);
txnStatus = TransactionStatus.COMMITTED;
Env.getCurrentEnv().getCatalogMgr().refreshExternalTable(
dbName,
tbName,
catalogName,
true);
}
}
@ -117,6 +126,7 @@ public class HiveInsertExecutor extends AbstractInsertExecutor {
}
}
ctx.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, t.getMessage());
transactionManager.rollback(txnId);
}
@Override

View File

@ -152,6 +152,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;
public class Coordinator implements CoordInterface {
@ -235,8 +236,8 @@ public class Coordinator implements CoordInterface {
private final List<TTabletCommitInfo> commitInfos = Lists.newArrayList();
private final List<TErrorTabletInfo> errorTabletInfos = Lists.newArrayList();
// TODO moved to ExternalTransactionManager
private final List<THivePartitionUpdate> hivePartitionUpdates = Lists.newArrayList();
// Collect all hivePartitionUpdates obtained from be
Consumer<List<THivePartitionUpdate>> hivePartitionUpdateFunc;
// Input parameter
private long jobId = -1; // job which this task belongs to
@ -503,10 +504,6 @@ public class Coordinator implements CoordInterface {
return errorTabletInfos;
}
public List<THivePartitionUpdate> getHivePartitionUpdates() {
return hivePartitionUpdates;
}
public Map<String, Integer> getBeToInstancesNum() {
Map<String, Integer> result = Maps.newTreeMap();
if (enablePipelineEngine) {
@ -2456,13 +2453,8 @@ public class Coordinator implements CoordInterface {
// TODO: more ranges?
}
private void updateHivePartitionUpdates(List<THivePartitionUpdate> hivePartitionUpdates) {
lock.lock();
try {
this.hivePartitionUpdates.addAll(hivePartitionUpdates);
} finally {
lock.unlock();
}
public void setHivePartitionUpdateFunc(Consumer<List<THivePartitionUpdate>> hivePartitionUpdateFunc) {
this.hivePartitionUpdateFunc = hivePartitionUpdateFunc;
}
// update job progress from BE
@ -2512,8 +2504,8 @@ public class Coordinator implements CoordInterface {
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
if (params.isSetHivePartitionUpdates()) {
updateHivePartitionUpdates(params.getHivePartitionUpdates());
if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) {
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
}
Preconditions.checkArgument(params.isSetDetailedReport());
@ -2577,8 +2569,8 @@ public class Coordinator implements CoordInterface {
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
if (params.isSetHivePartitionUpdates()) {
updateHivePartitionUpdates(params.getHivePartitionUpdates());
if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) {
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Query {} instance {} is marked done",
@ -2649,8 +2641,8 @@ public class Coordinator implements CoordInterface {
if (params.isSetErrorTabletInfos()) {
updateErrorTabletInfos(params.getErrorTabletInfos());
}
if (params.isSetHivePartitionUpdates()) {
updateHivePartitionUpdates(params.getHivePartitionUpdates());
if (params.isSetHivePartitionUpdates() && hivePartitionUpdateFunc != null) {
hivePartitionUpdateFunc.accept(params.getHivePartitionUpdates());
}
instancesDoneLatch.markedCountDown(params.getFragmentInstanceId(), -1L);
}

View File

@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.transaction;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.hive.HMSTransaction;
import org.apache.doris.datasource.hive.HiveMetadataOps;
import org.apache.doris.persist.EditLog;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class HiveTransactionManager implements TransactionManager {
private final Map<Long, HMSTransaction> transactions = new ConcurrentHashMap<>();
private final TransactionIdGenerator idGenerator = new TransactionIdGenerator();
private final HiveMetadataOps ops;
public HiveTransactionManager(HiveMetadataOps ops) {
this.ops = ops;
}
public Long getNextTransactionId() {
return idGenerator.getNextTransactionId();
}
@Override
public void setEditLog(EditLog editLog) {
this.idGenerator.setEditLog(editLog);
}
@Override
public long begin() {
long id = idGenerator.getNextTransactionId();
HMSTransaction hiveTransaction = new HMSTransaction(ops);
transactions.put(id, hiveTransaction);
return id;
}
@Override
public void commit(long id) throws UserException {
getTransactionWithException(id).commit();
transactions.remove(id);
}
@Override
public void rollback(long id) {
getTransactionWithException(id).rollback();
transactions.remove(id);
}
@Override
public HMSTransaction getTransaction(long id) {
return getTransactionWithException(id);
}
public HMSTransaction getTransactionWithException(long id) {
HMSTransaction hiveTransaction = transactions.get(id);
if (hiveTransaction == null) {
throw new RuntimeException("Can't find transaction for " + id);
}
return hiveTransaction;
}
}

View File

@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.transaction;
import org.apache.doris.common.UserException;
public interface Transaction {
void commit() throws UserException;
void rollback();
}

View File

@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.transaction;
import org.apache.doris.common.UserException;
import org.apache.doris.persist.EditLog;
public interface TransactionManager {
void setEditLog(EditLog editLog);
long begin();
void commit(long id) throws UserException;
void rollback(long id);
Transaction getTransaction(long id);
}

View File

@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.transaction;
import org.apache.doris.datasource.hive.HiveMetadataOps;
public class TransactionManagerFactory {
public static TransactionManager createHiveTransactionManager(HiveMetadataOps ops) {
return new HiveTransactionManager(ops);
}
}

View File

@ -17,13 +17,17 @@
package org.apache.doris.datasource.hive;
import org.apache.doris.backup.Status;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.thrift.THiveLocationParams;
import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TUpdateMode;
import com.google.common.collect.Lists;
import mockit.Mock;
import mockit.MockUp;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.After;
@ -41,6 +45,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@Ignore
public class HmsCommitTest {
@ -61,6 +68,7 @@ public class HmsCommitTest {
dbLocation = "file://" + warehousePath.toAbsolutePath() + "/";
createTestHiveCatalog();
createTestHiveDatabase();
mockFs();
}
@AfterClass
@ -90,22 +98,55 @@ public class HmsCommitTest {
hmsClient.createDatabase(dbMetadata);
}
public static void mockFs() {
new MockUp<DFSFileSystem>(DFSFileSystem.class) {
@Mock
public void asyncRenameDir(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
}
@Mock
public void asyncRename(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
List<String> fileNames) {
}
@Mock
public Status renameDir(String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
return Status.OK;
}
};
}
@Before
public void before() {
// create table
List<Column> columns = new ArrayList<>();
columns.add(new Column("c1", PrimitiveType.INT, true));
columns.add(new Column("c2", PrimitiveType.STRING, true));
columns.add(new Column("c3", PrimitiveType.STRING, false));
List<String> partitionKeys = new ArrayList<>();
partitionKeys.add("c3");
HiveTableMetadata tableMetadata = new HiveTableMetadata(
dbName, tbWithPartition, columns, partitionKeys,
new HashMap<>(), fileFormat);
hmsClient.createTable(tableMetadata, true);
HiveTableMetadata tableMetadata2 = new HiveTableMetadata(
dbName, tbWithoutPartition, columns, new ArrayList<>(),
new HashMap<>(), fileFormat);
hmsClient.createTable(tableMetadata2, true);
}
@After
@ -118,11 +159,7 @@ public class HmsCommitTest {
public void testNewPartitionForUnPartitionedTable() {
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomNew("a"));
try {
hmsOps.commit(dbName, tbWithoutPartition, pus);
} catch (Exception e) {
Assert.assertEquals("Not support mode:[NEW] in unPartitioned table", e.getMessage());
}
Assert.assertThrows(Exception.class, () -> commit(dbName, tbWithoutPartition, pus));
}
@Test
@ -131,7 +168,7 @@ public class HmsCommitTest {
pus.add(createRandomAppend(""));
pus.add(createRandomAppend(""));
pus.add(createRandomAppend(""));
hmsOps.commit(dbName, tbWithoutPartition, pus);
commit(dbName, tbWithoutPartition, pus);
Table table = hmsClient.getTable(dbName, tbWithoutPartition);
assertNumRows(3, table);
@ -139,7 +176,7 @@ public class HmsCommitTest {
pus2.add(createRandomAppend(""));
pus2.add(createRandomAppend(""));
pus2.add(createRandomAppend(""));
hmsOps.commit(dbName, tbWithoutPartition, pus2);
commit(dbName, tbWithoutPartition, pus2);
table = hmsClient.getTable(dbName, tbWithoutPartition);
assertNumRows(6, table);
}
@ -151,7 +188,7 @@ public class HmsCommitTest {
pus.add(createRandomOverwrite(""));
pus.add(createRandomOverwrite(""));
pus.add(createRandomOverwrite(""));
hmsOps.commit(dbName, tbWithoutPartition, pus);
commit(dbName, tbWithoutPartition, pus);
Table table = hmsClient.getTable(dbName, tbWithoutPartition);
assertNumRows(3, table);
}
@ -165,7 +202,7 @@ public class HmsCommitTest {
pus.add(createRandomNew("b"));
pus.add(createRandomNew("b"));
pus.add(createRandomNew("c"));
hmsOps.commit(dbName, tbWithPartition, pus);
commit(dbName, tbWithPartition, pus);
Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a"));
assertNumRows(3, pa);
@ -186,7 +223,7 @@ public class HmsCommitTest {
pus.add(createRandomAppend("b"));
pus.add(createRandomAppend("b"));
pus.add(createRandomAppend("c"));
hmsOps.commit(dbName, tbWithPartition, pus);
commit(dbName, tbWithPartition, pus);
Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a"));
assertNumRows(6, pa);
@ -203,7 +240,7 @@ public class HmsCommitTest {
pus.add(createRandomOverwrite("a"));
pus.add(createRandomOverwrite("b"));
pus.add(createRandomOverwrite("c"));
hmsOps.commit(dbName, tbWithPartition, pus);
commit(dbName, tbWithPartition, pus);
Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a"));
assertNumRows(1, pa);
@ -221,14 +258,14 @@ public class HmsCommitTest {
pus.add(createRandomNew("" + i));
}
hmsOps.commit(dbName, tbWithPartition, pus);
commit(dbName, tbWithPartition, pus);
for (int i = 0; i < nums; i++) {
Partition p = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("" + i));
assertNumRows(1, p);
}
try {
hmsOps.commit(dbName, tbWithPartition, pus);
commit(dbName, tbWithPartition, pus);
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("failed to add partitions"));
}
@ -277,4 +314,13 @@ public class HmsCommitTest {
public THivePartitionUpdate createRandomOverwrite(String partition) {
return genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE);
}
public void commit(String dbName,
String tableName,
List<THivePartitionUpdate> hivePUs) {
HMSTransaction hmsTransaction = new HMSTransaction(hmsOps);
hmsTransaction.setHivePartitionUpdates(hivePUs);
hmsTransaction.finishInsertTable(dbName, tableName);
hmsTransaction.commit();
}
}