[testcase](hive)add exception test for hive txn (#33278)

Issue #31442
#32726

1. add LocalDfsFileSystem to manipulate local files.
2. add HMSCachedClientTest to analog HMS services.
3. add test for rollback commit.
This commit is contained in:
wuwenchi
2024-04-10 15:09:01 +08:00
committed by morningman
parent e11db3f050
commit 31a7060dbd
10 changed files with 1033 additions and 290 deletions

View File

@ -23,8 +23,8 @@ package org.apache.doris.datasource.hive;
import org.apache.doris.backup.Status;
import org.apache.doris.common.Pair;
import org.apache.doris.fs.FileSystem;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TUpdateMode;
import org.apache.doris.transaction.Transaction;
@ -47,11 +47,13 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -63,7 +65,7 @@ import java.util.stream.Collectors;
public class HMSTransaction implements Transaction {
private static final Logger LOG = LogManager.getLogger(HMSTransaction.class);
private final HiveMetadataOps hiveOps;
private final RemoteFileSystem fs;
private final FileSystem fs;
private String dbName;
private String tbName;
@ -115,8 +117,8 @@ public class HMSTransaction implements Transaction {
}
public void finishInsertTable(String dbName, String tbName) {
this.tbName = tbName;
this.dbName = dbName;
this.tbName = tbName;
List<THivePartitionUpdate> mergedPUs = mergePartitions(hivePartitionUpdates);
Table table = getTable(dbName, tbName);
List<Pair<THivePartitionUpdate, HivePartitionStatistics>> insertExistsPartitions = new ArrayList<>();
@ -226,17 +228,10 @@ public class HMSTransaction implements Transaction {
}
}
hmsCommitter.waitForAsyncFileSystemTasks();
hmsCommitter.doAddPartitionsTask();
hmsCommitter.doUpdateStatisticsTasks();
hmsCommitter.doCommit();
} catch (Throwable t) {
LOG.warn("Failed to commit for {}.{}, abort it.", dbName, tbName);
hmsCommitter.cancelUnStartedAsyncFileSystemTask();
hmsCommitter.undoUpdateStatisticsTasks();
hmsCommitter.undoAddPartitionsTask();
hmsCommitter.waitForAsyncFileSystemTaskSuppressThrowable();
hmsCommitter.runDirectoryClearUpTasksForAbort();
hmsCommitter.runRenameDirTasksForAbort();
hmsCommitter.rollback();
throw t;
} finally {
hmsCommitter.runClearPathsForFinish();
@ -354,7 +349,7 @@ public class HMSTransaction implements Transaction {
}
}
private static class UpdateStatisticsTask {
public static class UpdateStatisticsTask {
private final String dbName;
private final String tableName;
private final Optional<String> partitionName;
@ -442,7 +437,6 @@ public class HMSTransaction implements Transaction {
throw t;
}
}
partitions.clear();
}
public List<List<String>> rollback(HiveMetadataOps hiveOps) {
@ -548,7 +542,7 @@ public class HMSTransaction implements Transaction {
private DeleteRecursivelyResult recursiveDeleteFiles(Path directory, boolean deleteEmptyDir) {
try {
if (!fs.exists(directory.getName()).ok()) {
if (!fs.exists(directory.toString()).ok()) {
return new DeleteRecursivelyResult(true, ImmutableList.of());
}
} catch (Exception e) {
@ -561,57 +555,53 @@ public class HMSTransaction implements Transaction {
}
private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, boolean deleteEmptyDir) {
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.list(directory.getName(), remoteFiles);
if (!status.ok()) {
List<RemoteFile> allFiles = new ArrayList<>();
Set<String> allDirs = new HashSet<>();
Status statusFile = fs.listFiles(directory.toString(), allFiles);
Status statusDir = fs.listDirectories(directory.toString(), allDirs);
if (!statusFile.ok() || !statusDir.ok()) {
ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder();
notDeletedEligibleItems.add(directory + "/*");
return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build());
}
boolean isEmptyDir = true;
List<String> notDeletedEligibleItems = new ArrayList<>();
for (RemoteFile file : remoteFiles) {
if (file.isFile()) {
Path filePath = file.getPath();
isEmptyDir = false;
// TODO Check if this file was created by this query
if (!deleteIfExists(filePath)) {
notDeletedEligibleItems.add(filePath.toString());
}
} else if (file.isDirectory()) {
DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(file.getPath(), deleteEmptyDir);
if (!subResult.dirNotExists()) {
isEmptyDir = false;
}
if (!subResult.getNotDeletedEligibleItems().isEmpty()) {
notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems());
}
} else {
isEmptyDir = false;
notDeletedEligibleItems.add(file.getPath().toString());
boolean allDescendentsDeleted = true;
ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder();
for (RemoteFile file : allFiles) {
String fileName = file.getName();
if (!deleteIfExists(new Path(fileName))) {
allDescendentsDeleted = false;
notDeletedEligibleItems.add(fileName);
}
}
if (isEmptyDir && deleteEmptyDir) {
Verify.verify(notDeletedEligibleItems.isEmpty());
for (String dir : allDirs) {
DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(new Path(dir), deleteEmptyDir);
if (!subResult.dirNotExists()) {
allDescendentsDeleted = false;
}
if (!subResult.getNotDeletedEligibleItems().isEmpty()) {
notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems());
}
}
if (allDescendentsDeleted && deleteEmptyDir) {
Verify.verify(notDeletedEligibleItems.build().isEmpty());
if (!deleteIfExists(directory)) {
return new DeleteRecursivelyResult(false, ImmutableList.of(directory + "/"));
}
// all items of the location have been deleted.
return new DeleteRecursivelyResult(true, ImmutableList.of());
}
return new DeleteRecursivelyResult(false, notDeletedEligibleItems);
return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build());
}
public boolean deleteIfExists(Path path) {
Status status = fs.delete(path.getName());
Status status = fs.delete(path.toString());
if (status.ok()) {
return true;
}
return !fs.exists(path.getName()).ok();
return !fs.exists(path.toString()).ok();
}
public static class DatabaseTableName {
@ -1039,9 +1029,6 @@ public class HMSTransaction implements Transaction {
}
private void undoAddPartitionsTask() {
if (addPartitionsTask.isEmpty()) {
return;
}
HivePartition firstPartition = addPartitionsTask.getPartitions().get(0).getPartition();
String dbName = firstPartition.getDbName();
@ -1304,10 +1291,16 @@ public class HMSTransaction implements Transaction {
}
}
public void doNothing() {
// do nothing
// only for regression test and unit test to throw exception
}
public void doCommit() {
waitForAsyncFileSystemTasks();
doAddPartitionsTask();
doUpdateStatisticsTasks();
doNothing();
}
public void rollback() {

View File

@ -34,7 +34,7 @@ import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fs.FileSystem;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import com.google.common.annotations.VisibleForTesting;
@ -58,7 +58,7 @@ public class HiveMetadataOps implements ExternalMetadataOps {
private static final Logger LOG = LogManager.getLogger(HiveMetadataOps.class);
private static final int MIN_CLIENT_POOL_SIZE = 8;
private final HMSCachedClient client;
private final RemoteFileSystem fs;
private final FileSystem fs;
private final HMSExternalCatalog catalog;
public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, HMSExternalCatalog catalog) {
@ -75,11 +75,19 @@ public class HiveMetadataOps implements ExternalMetadataOps {
this.fs = new DFSFileSystem(catalog.getProperties());
}
// for test
public HiveMetadataOps(HMSExternalCatalog catalog, HMSCachedClient client, FileSystem fs) {
this.catalog = catalog;
this.client = client;
this.fs = fs;
}
public HMSCachedClient getClient() {
return client;
}
public RemoteFileSystem getFs() {
public FileSystem getFs() {
return fs;
}

View File

@ -25,10 +25,12 @@ import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
@ -167,7 +169,9 @@ public final class HiveUtil {
ImmutableMap.Builder<String, Partition> resultBuilder = ImmutableMap.builder();
for (Map.Entry<String, List<String>> entry : partitionNameToPartitionValues.entrySet()) {
Partition partition = partitionValuesToPartition.get(entry.getValue());
resultBuilder.put(entry.getKey(), partition);
if (partition != null) {
resultBuilder.put(entry.getKey(), partition);
}
}
return resultBuilder.build();
}
@ -267,4 +271,63 @@ public final class HiveUtil {
database.setDescription(hiveDb.getComment());
return database;
}
public static Map<String, String> updateStatisticsParameters(
Map<String, String> parameters,
HiveCommonStatistics statistics) {
HashMap<String, String> result = new HashMap<>(parameters);
result.put(StatsSetupConst.NUM_FILES, String.valueOf(statistics.getFileCount()));
result.put(StatsSetupConst.ROW_COUNT, String.valueOf(statistics.getRowCount()));
result.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(statistics.getTotalFileBytes()));
// CDH 5.16 metastore ignores stats unless STATS_GENERATED_VIA_STATS_TASK is set
// https://github.com/cloudera/hive/blob/cdh5.16.2-release/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L227-L231
if (!parameters.containsKey("STATS_GENERATED_VIA_STATS_TASK")) {
result.put("STATS_GENERATED_VIA_STATS_TASK", "workaround for potential lack of HIVE-12730");
}
return result;
}
public static HivePartitionStatistics toHivePartitionStatistics(Map<String, String> params) {
long rowCount = Long.parseLong(params.getOrDefault(StatsSetupConst.ROW_COUNT, "-1"));
long totalSize = Long.parseLong(params.getOrDefault(StatsSetupConst.TOTAL_SIZE, "-1"));
long numFiles = Long.parseLong(params.getOrDefault(StatsSetupConst.NUM_FILES, "-1"));
return HivePartitionStatistics.fromCommonStatistics(rowCount, numFiles, totalSize);
}
public static Partition toMetastoreApiPartition(HivePartitionWithStatistics partitionWithStatistics) {
Partition partition =
toMetastoreApiPartition(partitionWithStatistics.getPartition());
partition.setParameters(updateStatisticsParameters(
partition.getParameters(), partitionWithStatistics.getStatistics().getCommonStatistics()));
return partition;
}
public static Partition toMetastoreApiPartition(HivePartition hivePartition) {
Partition result = new Partition();
result.setDbName(hivePartition.getDbName());
result.setTableName(hivePartition.getTblName());
result.setValues(hivePartition.getPartitionValues());
result.setSd(makeStorageDescriptorFromHivePartition(hivePartition));
result.setParameters(hivePartition.getParameters());
return result;
}
public static StorageDescriptor makeStorageDescriptorFromHivePartition(HivePartition partition) {
SerDeInfo serdeInfo = new SerDeInfo();
serdeInfo.setName(partition.getTblName());
serdeInfo.setSerializationLib(partition.getSerde());
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation(Strings.emptyToNull(partition.getPath()));
sd.setCols(partition.getColumns());
sd.setSerdeInfo(serdeInfo);
sd.setInputFormat(partition.getInputFormat());
sd.setOutputFormat(partition.getOutputFormat());
sd.setParameters(ImmutableMap.of());
return sd;
}
}

View File

@ -27,11 +27,8 @@ import org.apache.doris.datasource.property.constants.HMSProperties;
import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
@ -56,8 +53,6 @@ import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@ -67,7 +62,6 @@ import org.apache.logging.log4j.Logger;
import java.security.PrivilegedExceptionAction;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@ -605,11 +599,11 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
Table originTable = getTable(dbName, tableName);
Map<String, String> originParams = originTable.getParameters();
HivePartitionStatistics updatedStats = update.apply(toHivePartitionStatistics(originParams));
HivePartitionStatistics updatedStats = update.apply(HiveUtil.toHivePartitionStatistics(originParams));
Table newTable = originTable.deepCopy();
Map<String, String> newParams =
updateStatisticsParameters(originParams, updatedStats.getCommonStatistics());
HiveUtil.updateStatisticsParameters(originParams, updatedStats.getCommonStatistics());
newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000));
newTable.setParameters(newParams);
client.client.alter_table(dbName, tableName, newTable);
@ -633,11 +627,11 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
Partition originPartition = partitions.get(0);
Map<String, String> originParams = originPartition.getParameters();
HivePartitionStatistics updatedStats = update.apply(toHivePartitionStatistics(originParams));
HivePartitionStatistics updatedStats = update.apply(HiveUtil.toHivePartitionStatistics(originParams));
Partition modifiedPartition = originPartition.deepCopy();
Map<String, String> newParams =
updateStatisticsParameters(originParams, updatedStats.getCommonStatistics());
HiveUtil.updateStatisticsParameters(originParams, updatedStats.getCommonStatistics());
newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000));
modifiedPartition.setParameters(newParams);
client.client.alter_partition(dbName, tableName, modifiedPartition);
@ -650,7 +644,7 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
public void addPartitions(String dbName, String tableName, List<HivePartitionWithStatistics> partitions) {
try (ThriftHMSClient client = getClient()) {
List<Partition> hivePartitions = partitions.stream()
.map(ThriftHMSCachedClient::toMetastoreApiPartition)
.map(HiveUtil::toMetastoreApiPartition)
.collect(Collectors.toList());
client.client.add_partitions(hivePartitions);
} catch (Exception e) {
@ -666,64 +660,4 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
throw new RuntimeException("failed to drop partition for " + dbName + "." + tableName);
}
}
private static HivePartitionStatistics toHivePartitionStatistics(Map<String, String> params) {
long rowCount = Long.parseLong(params.getOrDefault(StatsSetupConst.ROW_COUNT, "-1"));
long totalSize = Long.parseLong(params.getOrDefault(StatsSetupConst.TOTAL_SIZE, "-1"));
long numFiles = Long.parseLong(params.getOrDefault(StatsSetupConst.NUM_FILES, "-1"));
return HivePartitionStatistics.fromCommonStatistics(rowCount, numFiles, totalSize);
}
private static Map<String, String> updateStatisticsParameters(
Map<String, String> parameters,
HiveCommonStatistics statistics) {
HashMap<String, String> result = new HashMap<>(parameters);
result.put(StatsSetupConst.NUM_FILES, String.valueOf(statistics.getFileCount()));
result.put(StatsSetupConst.ROW_COUNT, String.valueOf(statistics.getRowCount()));
result.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(statistics.getTotalFileBytes()));
// CDH 5.16 metastore ignores stats unless STATS_GENERATED_VIA_STATS_TASK is set
// https://github.com/cloudera/hive/blob/cdh5.16.2-release/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L227-L231
if (!parameters.containsKey("STATS_GENERATED_VIA_STATS_TASK")) {
result.put("STATS_GENERATED_VIA_STATS_TASK", "workaround for potential lack of HIVE-12730");
}
return result;
}
public static Partition toMetastoreApiPartition(HivePartitionWithStatistics partitionWithStatistics) {
Partition partition =
toMetastoreApiPartition(partitionWithStatistics.getPartition());
partition.setParameters(updateStatisticsParameters(
partition.getParameters(), partitionWithStatistics.getStatistics().getCommonStatistics()));
return partition;
}
public static Partition toMetastoreApiPartition(HivePartition hivePartition) {
Partition result = new Partition();
result.setDbName(hivePartition.getDbName());
result.setTableName(hivePartition.getTblName());
result.setValues(hivePartition.getPartitionValues());
result.setSd(makeStorageDescriptorFromHivePartition(hivePartition));
result.setParameters(hivePartition.getParameters());
return result;
}
private static StorageDescriptor makeStorageDescriptorFromHivePartition(HivePartition partition) {
SerDeInfo serdeInfo = new SerDeInfo();
serdeInfo.setName(partition.getTblName());
serdeInfo.setSerializationLib(partition.getSerde());
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation(Strings.emptyToNull(partition.getPath()));
sd.setCols(partition.getColumns());
sd.setSerdeInfo(serdeInfo);
sd.setInputFormat(partition.getInputFormat());
sd.setOutputFormat(partition.getOutputFormat());
sd.setParameters(ImmutableMap.of());
return sd;
}
}

View File

@ -23,6 +23,7 @@ import org.apache.doris.fs.remote.RemoteFile;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@ -86,4 +87,12 @@ public interface FileSystem {
}
Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly);
default Status listFiles(String remotePath, List<RemoteFile> result) {
throw new UnsupportedOperationException("Unsupported operation list files on current file system.");
}
default Status listDirectories(String remotePath, Set<String> result) {
throw new UnsupportedOperationException("Unsupported operation list directores on current file system.");
}
}

View File

@ -0,0 +1,245 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs;
import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.fs.remote.RemoteFile;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
public class LocalDfsFileSystem implements FileSystem {
public LocalFileSystem fs = LocalFileSystem.getLocal(new Configuration());
public LocalDfsFileSystem() throws IOException {
}
@Override
public Map<String, String> getProperties() {
return null;
}
@Override
public Status exists(String remotePath) {
boolean exists = false;
try {
exists = fs.exists(new Path(remotePath));
} catch (IOException e) {
throw new RuntimeException(e);
}
if (exists) {
return Status.OK;
} else {
return new Status(Status.ErrCode.NOT_FOUND, "");
}
}
@Override
public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) {
return null;
}
@Override
public Status upload(String localPath, String remotePath) {
return null;
}
@Override
public Status directUpload(String content, String remoteFile) {
return null;
}
@Override
public Status rename(String origFilePath, String destFilePath) {
try {
fs.rename(new Path(origFilePath), new Path(destFilePath));
} catch (IOException e) {
throw new RuntimeException(e);
}
return Status.OK;
}
@Override
public Status renameDir(String origFilePath, String destFilePath, Runnable runWhenPathNotExist) {
Status status = exists(destFilePath);
if (status.ok()) {
throw new RuntimeException("Destination directory already exists: " + destFilePath);
}
String targetParent = new Path(destFilePath).getParent().toString();
status = exists(targetParent);
if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
status = makeDir(targetParent);
}
if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}
runWhenPathNotExist.run();
return rename(origFilePath, destFilePath);
}
@Override
public void asyncRename(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
List<String> fileNames) {
for (String fileName : fileNames) {
Path source = new Path(origFilePath, fileName);
Path target = new Path(destFilePath, fileName);
renameFileFutures.add(CompletableFuture.runAsync(() -> {
if (cancelled.get()) {
return;
}
Status status = rename(source.toString(), target.toString());
if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}
}, executor));
}
}
@Override
public void asyncRenameDir(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
renameFileFutures.add(CompletableFuture.runAsync(() -> {
if (cancelled.get()) {
return;
}
Status status = renameDir(origFilePath, destFilePath, runWhenPathNotExist);
if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}
}, executor));
}
@Override
public Status delete(String remotePath) {
try {
fs.delete(new Path(remotePath), true);
} catch (IOException e) {
throw new RuntimeException(e);
}
return Status.OK;
}
@Override
public Status makeDir(String remotePath) {
try {
fs.mkdirs(new Path(remotePath));
} catch (IOException e) {
throw new RuntimeException(e);
}
return Status.OK;
}
@Override
public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException {
return null;
}
@Override
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
try {
FileStatus[] locatedFileStatusRemoteIterator = fs.listStatus(new Path(remotePath));
if (locatedFileStatusRemoteIterator == null) {
return Status.OK;
}
for (FileStatus fileStatus : locatedFileStatusRemoteIterator) {
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(),
!fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(),
fileStatus.getBlockSize(), fileStatus.getModificationTime());
result.add(remoteFile);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return Status.OK;
}
@Override
public Status listFiles(String remotePath, List<RemoteFile> result) {
RemoteIterator<LocatedFileStatus> iterator;
try {
Path dirPath = new Path(remotePath);
iterator = fs.listFiles(dirPath, true);
while (iterator.hasNext()) {
LocatedFileStatus next = iterator.next();
String location = next.getPath().toString();
String child = location.substring(dirPath.toString().length());
while (child.startsWith("/")) {
child = child.substring(1);
}
if (!child.contains("/")) {
result.add(new RemoteFile(location, next.isFile(), next.getLen(), next.getBlockSize()));
}
}
} catch (IOException e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
return Status.OK;
}
@Override
public Status listDirectories(String remotePath, Set<String> result) {
try {
FileStatus[] fileStatuses = fs.listStatus(new Path(remotePath));
result.addAll(
Arrays.stream(fileStatuses)
.filter(FileStatus::isDirectory)
.map(file -> file.getPath().toString() + "/")
.collect(ImmutableSet.toImmutableSet()));
} catch (IOException e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
return Status.OK;
}
public void createFile(String path) throws IOException {
Path path1 = new Path(path);
if (!exists(path1.getParent().toString()).ok()) {
makeDir(path1.getParent().toString());
}
FSDataOutputStream build = fs.createFile(path1).build();
build.close();
}
}

View File

@ -1,76 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs;
import org.apache.doris.backup.Status;
import org.apache.doris.fs.remote.RemoteFile;
import java.util.List;
import java.util.Map;
public class LocalFileSystem implements FileSystem {
@Override
public Status exists(String remotePath) {
throw new UnsupportedOperationException("Unsupported operation on local file system.");
}
@Override
public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) {
throw new UnsupportedOperationException("Unsupported operation on local file system.");
}
@Override
public Status upload(String localPath, String remotePath) {
throw new UnsupportedOperationException("Unsupported operation on local file system.");
}
@Override
public Status directUpload(String content, String remoteFile) {
throw new UnsupportedOperationException("Unsupported operation on local file system.");
}
@Override
public Status rename(String origFilePath, String destFilePath) {
throw new UnsupportedOperationException("Unsupported operation on local file system.");
}
@Override
public Status delete(String remotePath) {
throw new UnsupportedOperationException("Unsupported operation on local file system.");
}
@Override
public Status makeDir(String remotePath) {
throw new UnsupportedOperationException("Unsupported operation on local file system.");
}
@Override
public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) {
throw new UnsupportedOperationException("Unsupported operation on local file system.");
}
@Override
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
throw new UnsupportedOperationException("Unsupported operation on local file system.");
}
@Override
public Map<String, String> getProperties() {
throw new UnsupportedOperationException("Unsupported operation on local file system.");
}
}

View File

@ -31,12 +31,15 @@ import org.apache.doris.fs.remote.RemoteFileSystem;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -51,9 +54,11 @@ import java.nio.ByteBuffer;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@ -523,4 +528,44 @@ public class DFSFileSystem extends RemoteFileSystem {
}
return Status.OK;
}
@Override
public Status listFiles(String remotePath, List<RemoteFile> result) {
RemoteIterator<LocatedFileStatus> iterator;
try {
FileSystem fileSystem = nativeFileSystem(remotePath);
Path dirPath = new Path(remotePath);
iterator = fileSystem.listFiles(dirPath, true);
while (iterator.hasNext()) {
LocatedFileStatus next = iterator.next();
String location = next.getPath().toString();
String child = location.substring(dirPath.toString().length());
while (child.startsWith("/")) {
child = child.substring(1);
}
if (!child.contains("/")) {
result.add(new RemoteFile(location, next.isFile(), next.getLen(), next.getBlockSize()));
}
}
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
return Status.OK;
}
@Override
public Status listDirectories(String remotePath, Set<String> result) {
try {
FileSystem fileSystem = nativeFileSystem(remotePath);
FileStatus[] fileStatuses = fileSystem.listStatus(new Path(remotePath));
result.addAll(
Arrays.stream(fileStatuses)
.filter(FileStatus::isDirectory)
.map(file -> file.getPath().toString() + "/")
.collect(ImmutableSet.toImmutableSet()));
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
return Status.OK;
}
}

View File

@ -0,0 +1,328 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.analysis.TableName;
import org.apache.doris.datasource.hive.HMSCachedClient;
import org.apache.doris.datasource.hive.HMSTransaction;
import org.apache.doris.datasource.hive.HiveDatabaseMetadata;
import org.apache.doris.datasource.hive.HivePartitionStatistics;
import org.apache.doris.datasource.hive.HivePartitionWithStatistics;
import org.apache.doris.datasource.hive.HiveTableMetadata;
import org.apache.doris.datasource.hive.HiveUtil;
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;
public class HMSCachedClientTest implements HMSCachedClient {
public Map<HMSTransaction.DatabaseTableName, List<Partition>> partitions = new ConcurrentHashMap<>();
public Map<String, List<Table>> tables = new HashMap<>();
public List<Database> dbs = new ArrayList<>();
@Override
public Database getDatabase(String dbName) {
for (Database db : this.dbs) {
if (db.getName().equals(dbName)) {
return db;
}
}
throw new RuntimeException("can't found database: " + dbName);
}
@Override
public List<String> getAllDatabases() {
return null;
}
@Override
public List<String> getAllTables(String dbName) {
return null;
}
@Override
public boolean tableExists(String dbName, String tblName) {
List<Table> tablesList = getTableList(dbName);
for (Table table : tablesList) {
if (table.getTableName().equals(tblName)) {
return true;
}
}
return false;
}
@Override
public List<String> listPartitionNames(String dbName, String tblName) {
List<Partition> partitionList = getPartitionList(dbName, tblName);
ArrayList<String> ret = new ArrayList<>();
for (Partition partition : partitionList) {
StringBuilder names = new StringBuilder();
List<String> values = partition.getValues();
for (int i = 0; i < values.size(); i++) {
names.append(values.get(i));
if (i < values.size() - 1) {
names.append("/");
}
}
ret.add(names.toString());
}
return ret;
}
@Override
public List<Partition> listPartitions(String dbName, String tblName) {
return getPartitionList(dbName, tblName);
}
@Override
public List<String> listPartitionNames(String dbName, String tblName, long maxListPartitionNum) {
return listPartitionNames(dbName, tblName);
}
@Override
public Partition getPartition(String dbName, String tblName, List<String> partitionValues) {
synchronized (this) {
List<Partition> partitionList = getPartitionList(dbName, tblName);
for (Partition partition : partitionList) {
if (partition.getValues().equals(partitionValues)) {
return partition;
}
}
throw new RuntimeException("can't found partition");
}
}
@Override
public List<Partition> getPartitions(String dbName, String tblName, List<String> partitionNames) {
synchronized (this) {
List<Partition> partitionList = getPartitionList(dbName, tblName);
ArrayList<Partition> ret = new ArrayList<>();
List<List<String>> partitionValuesList =
partitionNames
.stream()
.map(HiveUtil::toPartitionValues)
.collect(Collectors.toList());
partitionValuesList.forEach(values -> {
for (Partition partition : partitionList) {
if (partition.getValues().equals(values)) {
ret.add(partition);
break;
}
}
});
return ret;
}
}
@Override
public Table getTable(String dbName, String tblName) {
List<Table> tablesList = getTableList(dbName);
for (Table table : tablesList) {
if (table.getTableName().equals(tblName)) {
return table;
}
}
throw new RuntimeException("can't found table: " + tblName);
}
@Override
public List<FieldSchema> getSchema(String dbName, String tblName) {
return null;
}
@Override
public List<ColumnStatisticsObj> getTableColumnStatistics(String dbName, String tblName, List<String> columns) {
return null;
}
@Override
public Map<String, List<ColumnStatisticsObj>> getPartitionColumnStatistics(String dbName, String tblName, List<String> partNames, List<String> columns) {
return null;
}
@Override
public CurrentNotificationEventId getCurrentNotificationEventId() {
return null;
}
@Override
public NotificationEventResponse getNextNotification(long lastEventId, int maxEvents, IMetaStoreClient.NotificationFilter filter) throws MetastoreNotificationFetchException {
return null;
}
@Override
public long openTxn(String user) {
return 0;
}
@Override
public void commitTxn(long txnId) {
}
@Override
public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) {
return null;
}
@Override
public void acquireSharedLock(String queryId, long txnId, String user, TableName tblName, List<String> partitionNames, long timeoutMs) {
}
@Override
public String getCatalogLocation(String catalogName) {
return null;
}
@Override
public void createDatabase(DatabaseMetadata db) {
dbs.add(HiveUtil.toHiveDatabase((HiveDatabaseMetadata) db));
tables.put(db.getDbName(), new ArrayList<>());
}
@Override
public void dropDatabase(String dbName) {
Database db = getDatabase(dbName);
this.dbs.remove(db);
}
@Override
public void dropTable(String dbName, String tableName) {
Table table = getTable(dbName, tableName);
this.tables.get(dbName).remove(table);
this.partitions.remove(new HMSTransaction.DatabaseTableName(dbName, tableName));
}
@Override
public void createTable(TableMetadata tbl, boolean ignoreIfExists) {
String dbName = tbl.getDbName();
String tbName = tbl.getTableName();
if (tableExists(dbName, tbName)) {
throw new RuntimeException("Table '" + tbName + "' has existed in '" + dbName + "'.");
}
List<Table> tableList = getTableList(tbl.getDbName());
tableList.add(HiveUtil.toHiveTable((HiveTableMetadata) tbl));
HMSTransaction.DatabaseTableName key = new HMSTransaction.DatabaseTableName(dbName, tbName);
partitions.put(key, new ArrayList<>());
}
@Override
public void updateTableStatistics(String dbName, String tableName, Function<HivePartitionStatistics, HivePartitionStatistics> update) {
synchronized (this) {
Table originTable = getTable(dbName, tableName);
Map<String, String> originParams = originTable.getParameters();
HivePartitionStatistics updatedStats = update.apply(HiveUtil.toHivePartitionStatistics(originParams));
Table newTable = originTable.deepCopy();
Map<String, String> newParams =
HiveUtil.updateStatisticsParameters(originParams, updatedStats.getCommonStatistics());
newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000));
newTable.setParameters(newParams);
List<Table> tableList = getTableList(dbName);
tableList.remove(originTable);
tableList.add(newTable);
}
}
@Override
public void updatePartitionStatistics(String dbName, String tableName, String partitionName, Function<HivePartitionStatistics, HivePartitionStatistics> update) {
synchronized (this) {
List<Partition> partitions = getPartitions(dbName, tableName, ImmutableList.of(partitionName));
if (partitions.size() != 1) {
throw new RuntimeException("Metastore returned multiple partitions for name: " + partitionName);
}
Partition originPartition = partitions.get(0);
Map<String, String> originParams = originPartition.getParameters();
HivePartitionStatistics updatedStats = update.apply(HiveUtil.toHivePartitionStatistics(originParams));
Partition modifiedPartition = originPartition.deepCopy();
Map<String, String> newParams =
HiveUtil.updateStatisticsParameters(originParams, updatedStats.getCommonStatistics());
newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000));
modifiedPartition.setParameters(newParams);
List<Partition> partitionList = getPartitionList(dbName, tableName);
partitionList.remove(originPartition);
partitionList.add(modifiedPartition);
}
}
@Override
public void addPartitions(String dbName, String tableName, List<HivePartitionWithStatistics> partitions) {
synchronized (this) {
List<Partition> partitionList = getPartitionList(dbName, tableName);
List<Partition> hivePartitions = partitions.stream()
.map(HiveUtil::toMetastoreApiPartition)
.collect(Collectors.toList());
partitionList.addAll(hivePartitions);
}
}
@Override
public void dropPartition(String dbName, String tableName, List<String> partitionValues, boolean deleteData) {
synchronized (this) {
List<Partition> partitionList = getPartitionList(dbName, tableName);
for (int j = 0; j < partitionList.size(); j++) {
Partition partition = partitionList.get(j);
if (partition.getValues().equals(partitionValues)) {
partitionList.remove(partition);
return;
}
}
throw new RuntimeException("can't found the partition");
}
}
public List<Partition> getPartitionList(String dbName, String tableName) {
HMSTransaction.DatabaseTableName key = new HMSTransaction.DatabaseTableName(dbName, tableName);
List<Partition> partitionList = this.partitions.get(key);
if (partitionList == null) {
throw new RuntimeException("can't found table: " + key);
}
return partitionList;
}
public List<Table> getTableList(String dbName) {
List<Table> tablesList = this.tables.get(dbName);
if (tablesList == null) {
throw new RuntimeException("can't found database: " + dbName);
}
return tablesList;
}
}

View File

@ -17,10 +17,10 @@
package org.apache.doris.datasource.hive;
import org.apache.doris.backup.Status;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.datasource.HMSCachedClientTest;
import org.apache.doris.fs.LocalDfsFileSystem;
import org.apache.doris.thrift.THiveLocationParams;
import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TUpdateMode;
@ -28,6 +28,7 @@ import org.apache.doris.thrift.TUpdateMode;
import com.google.common.collect.Lists;
import mockit.Mock;
import mockit.MockUp;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.After;
@ -35,59 +36,57 @@ import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
@Ignore
public class HmsCommitTest {
private static HMSExternalCatalog hmsCatalog;
private static HiveMetadataOps hmsOps;
private static HMSCachedClient hmsClient;
private static final String dbName = "test_db";
private static final String tbWithPartition = "test_tb_with_partition";
private static final String tbWithoutPartition = "test_tb_without_partition";
private static Path warehousePath;
private static LocalDfsFileSystem fs;
static String dbLocation;
private String fileFormat = "orc";
static String writeLocation;
static String uri = "thrift://127.0.0.1:9083";
static boolean hasRealHmsService = false;
@BeforeClass
public static void beforeClass() throws Throwable {
warehousePath = Files.createTempDirectory("test_warehouse_");
Path warehousePath = Files.createTempDirectory("test_warehouse_");
Path writePath = Files.createTempDirectory("test_write_");
dbLocation = "file://" + warehousePath.toAbsolutePath() + "/";
writeLocation = "file://" + writePath.toAbsolutePath() + "/";
createTestHiveCatalog();
createTestHiveDatabase();
mockFs();
}
@AfterClass
public static void afterClass() {
hmsClient.dropTable(dbName, tbWithPartition);
hmsClient.dropTable(dbName, tbWithoutPartition);
hmsClient.dropDatabase(dbName);
}
public static void createTestHiveCatalog() {
Map<String, String> props = new HashMap<>();
props.put("type", "hms");
props.put("hive.metastore.uris", "thrift://127.0.0.1:9083");
props.put("hadoop.username", "hadoop");
hmsCatalog = new HMSExternalCatalog(1, "hive_catalog", null, props, "comment");
hmsCatalog.setInitialized();
hmsCatalog.initLocalObjectsImpl();
hmsOps = (HiveMetadataOps) hmsCatalog.getMetadataOps();
hmsClient = hmsOps.getClient();
public static void createTestHiveCatalog() throws IOException {
fs = new LocalDfsFileSystem();
if (hasRealHmsService) {
// If you have a real HMS service, then you can use this client to create real connections for testing
HiveConf entries = new HiveConf();
entries.set("hive.metastore.uris", uri);
hmsClient = new ThriftHMSCachedClient(entries, 2);
} else {
hmsClient = new HMSCachedClientTest();
}
hmsOps = new HiveMetadataOps(null, hmsClient, fs);
}
public static void createTestHiveDatabase() {
@ -98,53 +97,31 @@ public class HmsCommitTest {
hmsClient.createDatabase(dbMetadata);
}
public static void mockFs() {
new MockUp<DFSFileSystem>(DFSFileSystem.class) {
@Mock
public void asyncRenameDir(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
}
@Mock
public void asyncRename(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
List<String> fileNames) {
}
@Mock
public Status renameDir(String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
return Status.OK;
}
};
}
@Before
public void before() {
// create table
// create table for tbWithPartition
List<Column> columns = new ArrayList<>();
columns.add(new Column("c1", PrimitiveType.INT, true));
columns.add(new Column("c2", PrimitiveType.STRING, true));
columns.add(new Column("c3", PrimitiveType.STRING, false));
List<String> partitionKeys = new ArrayList<>();
partitionKeys.add("c3");
String fileFormat = "orc";
HashMap<String, String> params = new HashMap<String, String>() {{
put("location_uri", dbLocation + tbWithPartition);
}};
HiveTableMetadata tableMetadata = new HiveTableMetadata(
dbName, tbWithPartition, columns, partitionKeys,
new HashMap<>(), fileFormat);
params, fileFormat);
hmsClient.createTable(tableMetadata, true);
// create table for tbWithoutPartition
HashMap<String, String> params2 = new HashMap<String, String>() {{
put("location_uri", dbLocation + tbWithPartition);
}};
HiveTableMetadata tableMetadata2 = new HiveTableMetadata(
dbName, tbWithoutPartition, columns, new ArrayList<>(),
new HashMap<>(), fileFormat);
dbName, tbWithoutPartition, columns, new ArrayList<>(),
params2, fileFormat);
hmsClient.createTable(tableMetadata2, true);
}
@ -156,45 +133,45 @@ public class HmsCommitTest {
}
@Test
public void testNewPartitionForUnPartitionedTable() {
public void testNewPartitionForUnPartitionedTable() throws IOException {
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomNew("a"));
pus.add(createRandomNew(null));
Assert.assertThrows(Exception.class, () -> commit(dbName, tbWithoutPartition, pus));
}
@Test
public void testAppendPartitionForUnPartitionedTable() {
public void testAppendPartitionForUnPartitionedTable() throws IOException {
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomAppend(""));
pus.add(createRandomAppend(""));
pus.add(createRandomAppend(""));
pus.add(createRandomAppend(null));
pus.add(createRandomAppend(null));
pus.add(createRandomAppend(null));
commit(dbName, tbWithoutPartition, pus);
Table table = hmsClient.getTable(dbName, tbWithoutPartition);
assertNumRows(3, table);
List<THivePartitionUpdate> pus2 = new ArrayList<>();
pus2.add(createRandomAppend(""));
pus2.add(createRandomAppend(""));
pus2.add(createRandomAppend(""));
pus2.add(createRandomAppend(null));
pus2.add(createRandomAppend(null));
pus2.add(createRandomAppend(null));
commit(dbName, tbWithoutPartition, pus2);
table = hmsClient.getTable(dbName, tbWithoutPartition);
assertNumRows(6, table);
}
@Test
public void testOverwritePartitionForUnPartitionedTable() {
public void testOverwritePartitionForUnPartitionedTable() throws IOException {
testAppendPartitionForUnPartitionedTable();
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomOverwrite(""));
pus.add(createRandomOverwrite(""));
pus.add(createRandomOverwrite(""));
pus.add(createRandomOverwrite(null));
pus.add(createRandomOverwrite(null));
pus.add(createRandomOverwrite(null));
commit(dbName, tbWithoutPartition, pus);
Table table = hmsClient.getTable(dbName, tbWithoutPartition);
assertNumRows(3, table);
}
@Test
public void testNewPartitionForPartitionedTable() {
public void testNewPartitionForPartitionedTable() throws IOException {
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomNew("a"));
pus.add(createRandomNew("a"));
@ -213,7 +190,7 @@ public class HmsCommitTest {
}
@Test
public void testAppendPartitionForPartitionedTable() {
public void testAppendPartitionForPartitionedTable() throws IOException {
testNewPartitionForPartitionedTable();
List<THivePartitionUpdate> pus = new ArrayList<>();
@ -234,7 +211,7 @@ public class HmsCommitTest {
}
@Test
public void testOverwritePartitionForPartitionedTable() {
public void testOverwritePartitionForPartitionedTable() throws IOException {
testAppendPartitionForPartitionedTable();
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomOverwrite("a"));
@ -251,7 +228,7 @@ public class HmsCommitTest {
}
@Test
public void testNewManyPartitionForPartitionedTable() {
public void testNewManyPartitionForPartitionedTable() throws IOException {
List<THivePartitionUpdate> pus = new ArrayList<>();
int nums = 150;
for (int i = 0; i < nums; i++) {
@ -265,12 +242,30 @@ public class HmsCommitTest {
}
try {
commit(dbName, tbWithPartition, pus);
commit(dbName, tbWithPartition, Collections.singletonList(createRandomNew("1")));
} catch (Exception e) {
Assert.assertTrue(e.getMessage().contains("failed to add partitions"));
}
}
@Test
public void testErrorPartitionTypeFromHmsCheck() throws IOException {
// first add three partition: a,b,c
testNewPartitionForPartitionedTable();
// second append two partition: a,x
// but there is no 'x' partition in the previous table, so when verifying based on HMS,
// it will throw exception
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomAppend("a"));
pus.add(createRandomAppend("x"));
Assert.assertThrows(
Exception.class,
() -> commit(dbName, tbWithPartition, pus)
);
}
public void assertNumRows(long expected, Partition p) {
Assert.assertEquals(expected, Long.parseLong(p.getParameters().get("numRows")));
}
@ -279,40 +274,62 @@ public class HmsCommitTest {
Assert.assertEquals(expected, Long.parseLong(t.getParameters().get("numRows")));
}
public THivePartitionUpdate genOnePartitionUpdate(String partitionValue, TUpdateMode mode) {
public THivePartitionUpdate genOnePartitionUpdate(TUpdateMode mode) throws IOException {
return genOnePartitionUpdate("", mode);
}
public THivePartitionUpdate genOnePartitionUpdate(String partitionValue, TUpdateMode mode) throws IOException {
String uuid = UUID.randomUUID().toString();
THiveLocationParams location = new THiveLocationParams();
String targetPath = dbLocation + uuid;
String targetPath = dbLocation + uuid + "/" + partitionValue;
location.setTargetPath(targetPath);
location.setWritePath(targetPath);
location.setWritePath(writeLocation + partitionValue);
THivePartitionUpdate pu = new THivePartitionUpdate();
pu.setName(partitionValue);
if (partitionValue != null) {
pu.setName(partitionValue);
}
pu.setUpdateMode(mode);
pu.setRowCount(1);
pu.setFileSize(1);
pu.setLocation(location);
String f1 = uuid + "f1";
String f2 = uuid + "f2";
String f3 = uuid + "f3";
pu.setFileNames(new ArrayList<String>() {
{
add(targetPath + "/f1");
add(targetPath + "/f2");
add(targetPath + "/f3");
add(f1);
add(f2);
add(f3);
}
});
if (mode != TUpdateMode.NEW) {
fs.makeDir(targetPath);
}
fs.createFile(writeLocation + partitionValue + "/" + f1);
fs.createFile(writeLocation + partitionValue + "/" + f2);
fs.createFile(writeLocation + partitionValue + "/" + f3);
return pu;
}
public THivePartitionUpdate createRandomNew(String partition) {
return genOnePartitionUpdate("c3=" + partition, TUpdateMode.NEW);
public THivePartitionUpdate createRandomNew(String partition) throws IOException {
return partition == null ? genOnePartitionUpdate(TUpdateMode.NEW) :
genOnePartitionUpdate("c3=" + partition, TUpdateMode.NEW);
}
public THivePartitionUpdate createRandomAppend(String partition) {
return genOnePartitionUpdate("c3=" + partition, TUpdateMode.APPEND);
public THivePartitionUpdate createRandomAppend(String partition) throws IOException {
return partition == null ? genOnePartitionUpdate(TUpdateMode.APPEND) :
genOnePartitionUpdate("c3=" + partition, TUpdateMode.APPEND);
}
public THivePartitionUpdate createRandomOverwrite(String partition) {
return genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE);
public THivePartitionUpdate createRandomOverwrite(String partition) throws IOException {
return partition == null ? genOnePartitionUpdate(TUpdateMode.OVERWRITE) :
genOnePartitionUpdate("c3=" + partition, TUpdateMode.OVERWRITE);
}
public void commit(String dbName,
@ -323,4 +340,181 @@ public class HmsCommitTest {
hmsTransaction.finishInsertTable(dbName, tableName);
hmsTransaction.commit();
}
public void mockAddPartitionTaskException(Runnable runnable) {
new MockUp<HMSTransaction.AddPartitionsTask>(HMSTransaction.AddPartitionsTask.class) {
@Mock
private void run(HiveMetadataOps hiveOps) {
runnable.run();
throw new RuntimeException("failed to add partition");
}
};
}
public void mockDoOther(Runnable runnable) {
new MockUp<HMSTransaction.HmsCommitter>(HMSTransaction.HmsCommitter.class) {
@Mock
private void doNothing() {
runnable.run();
throw new RuntimeException("failed to do nothing");
}
};
}
public void mockUpdateStatisticsTaskException(Runnable runnable) {
new MockUp<HMSTransaction.UpdateStatisticsTask>(HMSTransaction.UpdateStatisticsTask.class) {
@Mock
private void run(HiveMetadataOps hiveOps) {
runnable.run();
throw new RuntimeException("failed to update partition");
}
};
}
@Test
public void testRollbackNewPartitionForPartitionedTableForFilesystem() throws IOException {
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomNew("a"));
THiveLocationParams location = pus.get(0).getLocation();
// For new partition, there should be no target path
Assert.assertFalse(fs.exists(location.getTargetPath()).ok());
Assert.assertTrue(fs.exists(location.getWritePath()).ok());
mockAddPartitionTaskException(() -> {
// When the commit is completed, these files should be renamed successfully
String targetPath = location.getTargetPath();
Assert.assertTrue(fs.exists(targetPath).ok());
for (String file : pus.get(0).getFileNames()) {
Assert.assertTrue(fs.exists(targetPath + "/" + file).ok());
}
});
try {
commit(dbName, tbWithPartition, pus);
Assert.assertTrue(false);
} catch (Exception e) {
// ignore
}
// After rollback, these files will be deleted
String targetPath = location.getTargetPath();
Assert.assertFalse(fs.exists(targetPath).ok());
for (String file : pus.get(0).getFileNames()) {
Assert.assertFalse(fs.exists(targetPath + "/" + file).ok());
}
}
@Test
public void testRollbackNewPartitionForPartitionedTableWithNewPartition() throws IOException {
// first create three partitions: a,b,c
testNewPartitionForPartitionedTable();
// second add 'new partition' for 'x'
// add 'append partition' for 'a'
// when 'doCommit', 'new partition' will be executed before 'append partition'
// so, when 'rollback', the 'x' partition will be added and then deleted
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomNew("x"));
pus.add(createRandomAppend("a"));
THiveLocationParams location = pus.get(0).getLocation();
// For new partition, there should be no target path
Assert.assertFalse(fs.exists(location.getTargetPath()).ok());
Assert.assertTrue(fs.exists(location.getWritePath()).ok());
mockUpdateStatisticsTaskException(() -> {
// When the commit is completed, these files should be renamed successfully
String targetPath = location.getTargetPath();
Assert.assertTrue(fs.exists(targetPath).ok());
for (String file : pus.get(0).getFileNames()) {
Assert.assertTrue(fs.exists(targetPath + "/" + file).ok());
}
// new partition will be executed before append partition,
// so, we can get the new partition
Partition px = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("x"));
assertNumRows(1, px);
});
try {
commit(dbName, tbWithPartition, pus);
Assert.assertTrue(false);
} catch (Exception e) {
// ignore
}
// After rollback, these files will be deleted
String targetPath = location.getTargetPath();
Assert.assertFalse(fs.exists(targetPath).ok());
for (String file : pus.get(0).getFileNames()) {
Assert.assertFalse(fs.exists(targetPath + "/" + file).ok());
}
// x partition will be deleted
Assert.assertThrows(
"the 'x' partition should be deleted",
Exception.class,
() -> hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("x"))
);
}
@Test
public void testRollbackNewPartitionForPartitionedTableWithNewAppendPartition() throws IOException {
// first create three partitions: a,b,c
testNewPartitionForPartitionedTable();
// second add 'new partition' for 'x'
// add 'append partition' for 'a'
List<THivePartitionUpdate> pus = new ArrayList<>();
pus.add(createRandomNew("x"));
pus.add(createRandomAppend("a"));
THiveLocationParams location = pus.get(0).getLocation();
// For new partition, there should be no target path
Assert.assertFalse(fs.exists(location.getTargetPath()).ok());
Assert.assertTrue(fs.exists(location.getWritePath()).ok());
mockDoOther(() -> {
// When the commit is completed, these files should be renamed successfully
String targetPath = location.getTargetPath();
Assert.assertTrue(fs.exists(targetPath).ok());
for (String file : pus.get(0).getFileNames()) {
Assert.assertTrue(fs.exists(targetPath + "/" + file).ok());
}
// new partition will be executed,
// so, we can get the new partition
Partition px = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("x"));
assertNumRows(1, px);
// append partition will be executed,
// so, we can get the updated partition
Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a"));
assertNumRows(4, pa);
});
try {
commit(dbName, tbWithPartition, pus);
Assert.assertTrue(false);
} catch (Exception e) {
// ignore
}
// After rollback, these files will be deleted
String targetPath = location.getTargetPath();
Assert.assertFalse(fs.exists(targetPath).ok());
for (String file : pus.get(0).getFileNames()) {
Assert.assertFalse(fs.exists(targetPath + "/" + file).ok());
}
// x partition will be deleted
Assert.assertThrows(
"the 'x' partition should be deleted",
Exception.class,
() -> hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("x"))
);
// the 'a' partition should be rollback
Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a"));
assertNumRows(3, pa);
}
}