[feature](hive)add 'HmsCommiter' to support inserting data into hive table (#32283) (#32362)

bp #32283
Co-authored-by: wuwenchi <wuwenchihdu@hotmail.com>
This commit is contained in:
Mingyu Chen
2024-03-18 10:59:32 +08:00
committed by GitHub
parent 2add3bc13a
commit a444e84be6
17 changed files with 1558 additions and 12 deletions

View File

@ -135,6 +135,11 @@ public abstract class ExternalCatalog
return conf;
}
// only for test
public void setInitialized() {
initialized = true;
}
/**
* set some default properties when creating catalog
* @return list of database names in this catalog

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
/**
* A hive metastore client pool for a specific catalog with hive configuration.
@ -90,4 +91,19 @@ public interface HMSCachedClient {
void dropTable(String dbName, String tableName);
void createTable(TableMetadata catalogTable, boolean ignoreIfExists);
void updateTableStatistics(
String dbName,
String tableName,
Function<HivePartitionStatistics, HivePartitionStatistics> update);
void updatePartitionStatistics(
String dbName,
String tableName,
String partitionName,
Function<HivePartitionStatistics, HivePartitionStatistics> update);
void addPartitions(String dbName, String tableName, List<HivePartitionWithStatistics> partitions);
void dropPartition(String dbName, String tableName, List<String> partitionValues, boolean deleteData);
}

View File

@ -0,0 +1,668 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java
// and modified by Doris
package org.apache.doris.datasource.hive;
import org.apache.doris.backup.Status;
import org.apache.doris.common.Pair;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.thrift.THivePartitionUpdate;
import org.apache.doris.thrift.TUpdateMode;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.airlift.concurrent.MoreFutures;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.StringJoiner;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class HMSCommitter {
private static final Logger LOG = LogManager.getLogger(HMSCommitter.class);
private final HiveMetadataOps hiveOps;
private final RemoteFileSystem fs;
private final Table table;
// update statistics for unPartitioned table or existed partition
private final List<UpdateStatisticsTask> updateStatisticsTasks = new ArrayList<>();
Executor updateStatisticsExecutor = Executors.newFixedThreadPool(16);
// add new partition
private final AddPartitionsTask addPartitionsTask = new AddPartitionsTask();
private static final int PARTITION_COMMIT_BATCH_SIZE = 20;
// for file system rename operation
// whether to cancel the file system tasks
private final AtomicBoolean fileSystemTaskCancelled = new AtomicBoolean(false);
// file system tasks that are executed asynchronously, including rename_file, rename_dir
private final List<CompletableFuture<?>> asyncFileSystemTaskFutures = new ArrayList<>();
// when aborted, we need to delete all files under this path, even the current directory
private final Queue<DirectoryCleanUpTask> directoryCleanUpTasksForAbort = new ConcurrentLinkedQueue<>();
// when aborted, we need restore directory
private final List<RenameDirectoryTask> renameDirectoryTasksForAbort = new ArrayList<>();
Executor fileSystemExecutor = Executors.newFixedThreadPool(16);
public HMSCommitter(HiveMetadataOps hiveOps, RemoteFileSystem fs, Table table) {
this.hiveOps = hiveOps;
this.fs = fs;
this.table = table;
}
public void commit(List<THivePartitionUpdate> hivePUs) {
try {
prepare(mergePartitions(hivePUs));
doCommit();
} catch (Throwable t) {
LOG.warn("Failed to commit for {}.{}, abort it.", table.getDbName(), table.getTableName());
try {
cancelUnStartedAsyncFileSystemTask();
undoUpdateStatisticsTasks();
undoAddPartitionsTask();
waitForAsyncFileSystemTaskSuppressThrowable();
runDirectoryClearUpTasksForAbort();
runRenameDirTasksForAbort();
} catch (Throwable e) {
t.addSuppressed(new Exception("Failed to roll back after commit failure", e));
}
throw t;
}
}
public void prepare(List<THivePartitionUpdate> hivePUs) {
List<Pair<THivePartitionUpdate, HivePartitionStatistics>> insertExistsPartitions = new ArrayList<>();
for (THivePartitionUpdate pu : hivePUs) {
TUpdateMode updateMode = pu.getUpdateMode();
HivePartitionStatistics hivePartitionStatistics = HivePartitionStatistics.fromCommonStatistics(
pu.getRowCount(),
pu.getFileNamesSize(),
pu.getFileSize());
if (table.getPartitionKeysSize() == 0) {
Preconditions.checkArgument(hivePUs.size() == 1,
"When updating a non-partitioned table, multiple partitions should not be written");
switch (updateMode) {
case APPEND:
prepareAppendTable(pu, hivePartitionStatistics);
break;
case OVERWRITE:
prepareOverwriteTable(pu, hivePartitionStatistics);
break;
default:
throw new RuntimeException("Not support mode:[" + updateMode + "] in unPartitioned table");
}
} else {
switch (updateMode) {
case NEW:
prepareCreateNewPartition(pu, hivePartitionStatistics);
break;
case APPEND:
insertExistsPartitions.add(Pair.of(pu, hivePartitionStatistics));
break;
case OVERWRITE:
prepareOverwritePartition(pu, hivePartitionStatistics);
break;
default:
throw new RuntimeException("Not support mode:[" + updateMode + "] in unPartitioned table");
}
}
}
if (!insertExistsPartitions.isEmpty()) {
prepareInsertExistPartition(insertExistsPartitions);
}
}
public List<THivePartitionUpdate> mergePartitions(List<THivePartitionUpdate> hivePUs) {
Map<String, THivePartitionUpdate> mm = new HashMap<>();
for (THivePartitionUpdate pu : hivePUs) {
if (mm.containsKey(pu.getName())) {
THivePartitionUpdate old = mm.get(pu.getName());
old.setFileSize(old.getFileSize() + pu.getFileSize());
old.setRowCount(old.getRowCount() + pu.getRowCount());
old.getFileNames().addAll(pu.getFileNames());
} else {
mm.put(pu.getName(), pu);
}
}
return new ArrayList<>(mm.values());
}
public void doCommit() {
waitForAsyncFileSystemTasks();
doAddPartitionsTask();
doUpdateStatisticsTasks();
}
public void rollback() {
}
public void cancelUnStartedAsyncFileSystemTask() {
fileSystemTaskCancelled.set(true);
}
private void undoUpdateStatisticsTasks() {
ImmutableList.Builder<CompletableFuture<?>> undoUpdateFutures = ImmutableList.builder();
for (UpdateStatisticsTask task : updateStatisticsTasks) {
undoUpdateFutures.add(CompletableFuture.runAsync(() -> {
try {
task.undo(hiveOps);
} catch (Throwable throwable) {
LOG.warn("Failed to rollback: {}", task.getDescription(), throwable);
}
}, updateStatisticsExecutor));
}
for (CompletableFuture<?> undoUpdateFuture : undoUpdateFutures.build()) {
MoreFutures.getFutureValue(undoUpdateFuture);
}
}
private void undoAddPartitionsTask() {
if (addPartitionsTask.isEmpty()) {
return;
}
HivePartition firstPartition = addPartitionsTask.getPartitions().get(0).getPartition();
String dbName = firstPartition.getDbName();
String tableName = firstPartition.getTblName();
List<List<String>> rollbackFailedPartitions = addPartitionsTask.rollback(hiveOps);
if (!rollbackFailedPartitions.isEmpty()) {
LOG.warn("Failed to rollback: add_partition for partition values {}.{}.{}",
dbName, tableName, rollbackFailedPartitions);
}
}
private void waitForAsyncFileSystemTaskSuppressThrowable() {
for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
try {
future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Throwable t) {
// ignore
}
}
}
public void prepareAppendTable(THivePartitionUpdate pu, HivePartitionStatistics ps) {
String targetPath = pu.getLocation().getTargetPath();
String writePath = pu.getLocation().getWritePath();
if (!targetPath.equals(writePath)) {
fs.asyncRename(
fileSystemExecutor,
asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
writePath,
targetPath,
pu.getFileNames());
}
directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, false));
updateStatisticsTasks.add(
new UpdateStatisticsTask(
table.getDbName(),
table.getTableName(),
Optional.empty(),
ps,
true
));
}
public void prepareOverwriteTable(THivePartitionUpdate pu, HivePartitionStatistics ps) {
}
public void prepareCreateNewPartition(THivePartitionUpdate pu, HivePartitionStatistics ps) {
String targetPath = pu.getLocation().getTargetPath();
String writePath = pu.getLocation().getWritePath();
if (!targetPath.equals(writePath)) {
fs.asyncRenameDir(
fileSystemExecutor,
asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
writePath,
targetPath,
() -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true)));
}
StorageDescriptor sd = table.getSd();
HivePartition hivePartition = new HivePartition(
table.getDbName(),
table.getTableName(),
false,
sd.getInputFormat(),
pu.getLocation().getTargetPath(),
HiveUtil.toPartitionValues(pu.getName()),
Maps.newHashMap(),
sd.getOutputFormat(),
sd.getSerdeInfo().getSerializationLib(),
hiveOps.getClient().getSchema(table.getDbName(), table.getTableName())
);
HivePartitionWithStatistics partitionWithStats =
new HivePartitionWithStatistics(pu.getName(), hivePartition, ps);
addPartitionsTask.addPartition(partitionWithStats);
}
public void prepareInsertExistPartition(List<Pair<THivePartitionUpdate, HivePartitionStatistics>> partitions) {
for (List<Pair<THivePartitionUpdate, HivePartitionStatistics>> partitionBatch :
Iterables.partition(partitions, 100)) {
List<String> partitionNames = partitionBatch.stream()
.map(pair -> pair.first.getName())
.collect(Collectors.toList());
Map<String, Partition> partitionsByNamesMap = HiveUtil.convertToNamePartitionMap(
partitionNames,
hiveOps.getClient().getPartitions(table.getDbName(), table.getTableName(), partitionNames));
for (int i = 0; i < partitionsByNamesMap.size(); i++) {
String partitionName = partitionNames.get(i);
if (partitionsByNamesMap.get(partitionName) == null) {
// Prevent this partition from being deleted by other engines
throw new RuntimeException("Not found partition: " + partitionName);
}
THivePartitionUpdate pu = partitionBatch.get(i).first;
HivePartitionStatistics updateStats = partitionBatch.get(i).second;
String writePath = pu.getLocation().getWritePath();
String targetPath = pu.getLocation().getTargetPath();
directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, false));
if (!targetPath.equals(writePath)) {
fs.asyncRename(
fileSystemExecutor,
asyncFileSystemTaskFutures,
fileSystemTaskCancelled,
writePath,
targetPath,
pu.getFileNames());
}
updateStatisticsTasks.add(
new UpdateStatisticsTask(
table.getDbName(),
table.getTableName(),
Optional.of(pu.getName()),
updateStats,
true));
}
}
}
public void prepareOverwritePartition(THivePartitionUpdate pu, HivePartitionStatistics ps) {
}
private void waitForAsyncFileSystemTasks() {
for (CompletableFuture<?> future : asyncFileSystemTaskFutures) {
MoreFutures.getFutureValue(future, RuntimeException.class);
}
}
private void doAddPartitionsTask() {
if (!addPartitionsTask.isEmpty()) {
addPartitionsTask.run(hiveOps);
}
}
private void doUpdateStatisticsTasks() {
ImmutableList.Builder<CompletableFuture<?>> updateStatsFutures = ImmutableList.builder();
List<String> failedTaskDescriptions = new ArrayList<>();
List<Throwable> suppressedExceptions = new ArrayList<>();
for (UpdateStatisticsTask task : updateStatisticsTasks) {
updateStatsFutures.add(CompletableFuture.runAsync(() -> {
try {
task.run(hiveOps);
} catch (Throwable t) {
synchronized (suppressedExceptions) {
addSuppressedExceptions(suppressedExceptions, t, failedTaskDescriptions, task.getDescription());
}
}
}, updateStatisticsExecutor));
}
for (CompletableFuture<?> executeUpdateFuture : updateStatsFutures.build()) {
MoreFutures.getFutureValue(executeUpdateFuture);
}
if (!suppressedExceptions.isEmpty()) {
StringBuilder message = new StringBuilder();
message.append("Failed to execute some updating statistics tasks: ");
Joiner.on("; ").appendTo(message, failedTaskDescriptions);
RuntimeException exception = new RuntimeException(message.toString());
suppressedExceptions.forEach(exception::addSuppressed);
throw exception;
}
}
private static void addSuppressedExceptions(
List<Throwable> suppressedExceptions,
Throwable t,
List<String> descriptions,
String description) {
descriptions.add(description);
// A limit is needed to avoid having a huge exception object. 5 was chosen arbitrarily.
if (suppressedExceptions.size() < 5) {
suppressedExceptions.add(t);
}
}
private static class AddPartition {
}
private static class UpdateStatisticsTask {
private final String dbName;
private final String tableName;
private final Optional<String> partitionName;
private final HivePartitionStatistics updatePartitionStat;
private final boolean merge;
private boolean done;
public UpdateStatisticsTask(String dbName, String tableName, Optional<String> partitionName,
HivePartitionStatistics statistics, boolean merge) {
this.dbName = Objects.requireNonNull(dbName, "dbName is null");
this.tableName = Objects.requireNonNull(tableName, "tableName is null");
this.partitionName = Objects.requireNonNull(partitionName, "partitionName is null");
this.updatePartitionStat = Objects.requireNonNull(statistics, "statistics is null");
this.merge = merge;
}
public void run(HiveMetadataOps hiveOps) {
if (partitionName.isPresent()) {
hiveOps.updatePartitionStatistics(dbName, tableName, partitionName.get(), this::updateStatistics);
} else {
hiveOps.updateTableStatistics(dbName, tableName, this::updateStatistics);
}
done = true;
}
public void undo(HiveMetadataOps hmsOps) {
if (!done) {
return;
}
if (partitionName.isPresent()) {
hmsOps.updatePartitionStatistics(dbName, tableName, partitionName.get(), this::resetStatistics);
} else {
hmsOps.updateTableStatistics(dbName, tableName, this::resetStatistics);
}
}
public String getDescription() {
if (partitionName.isPresent()) {
return "alter partition parameters " + tableName + " " + partitionName.get();
} else {
return "alter table parameters " + tableName;
}
}
private HivePartitionStatistics updateStatistics(HivePartitionStatistics currentStats) {
return merge ? HivePartitionStatistics.merge(currentStats, updatePartitionStat) : updatePartitionStat;
}
private HivePartitionStatistics resetStatistics(HivePartitionStatistics currentStatistics) {
return HivePartitionStatistics
.reduce(currentStatistics, updatePartitionStat, HivePartitionStatistics.ReduceOperator.SUBTRACT);
}
}
public static class AddPartitionsTask {
private final List<HivePartitionWithStatistics> partitions = new ArrayList<>();
private final List<List<String>> createdPartitionValues = new ArrayList<>();
public boolean isEmpty() {
return partitions.isEmpty();
}
public List<HivePartitionWithStatistics> getPartitions() {
return partitions;
}
public void addPartition(HivePartitionWithStatistics partition) {
partitions.add(partition);
}
public void run(HiveMetadataOps hiveOps) {
HivePartition firstPartition = partitions.get(0).getPartition();
String dbName = firstPartition.getDbName();
String tableName = firstPartition.getTblName();
List<List<HivePartitionWithStatistics>> batchedPartitions =
Lists.partition(partitions, PARTITION_COMMIT_BATCH_SIZE);
for (List<HivePartitionWithStatistics> batch : batchedPartitions) {
try {
hiveOps.addPartitions(dbName, tableName, batch);
for (HivePartitionWithStatistics partition : batch) {
createdPartitionValues.add(partition.getPartition().getPartitionValues());
}
} catch (Throwable t) {
LOG.error("Failed to add partition", t);
throw t;
}
}
partitions.clear();
}
public List<List<String>> rollback(HiveMetadataOps hiveOps) {
HivePartition firstPartition = partitions.get(0).getPartition();
String dbName = firstPartition.getDbName();
String tableName = firstPartition.getTblName();
List<List<String>> rollbackFailedPartitions = new ArrayList<>();
for (List<String> createdPartitionValue : createdPartitionValues) {
try {
hiveOps.dropPartition(dbName, tableName, createdPartitionValue, false);
} catch (Throwable t) {
LOG.warn("Failed to drop partition on {}.{}.{} when rollback",
dbName, tableName, rollbackFailedPartitions);
rollbackFailedPartitions.add(createdPartitionValue);
}
}
return rollbackFailedPartitions;
}
}
private static class DirectoryCleanUpTask {
private final Path path;
private final boolean deleteEmptyDir;
public DirectoryCleanUpTask(String path, boolean deleteEmptyDir) {
this.path = new Path(path);
this.deleteEmptyDir = deleteEmptyDir;
}
public Path getPath() {
return path;
}
public boolean isDeleteEmptyDir() {
return deleteEmptyDir;
}
@Override
public String toString() {
return new StringJoiner(", ", DirectoryCleanUpTask.class.getSimpleName() + "[", "]")
.add("path=" + path)
.add("deleteEmptyDir=" + deleteEmptyDir)
.toString();
}
}
public static class DeleteRecursivelyResult {
private final boolean dirNoLongerExists;
private final List<String> notDeletedEligibleItems;
public DeleteRecursivelyResult(boolean dirNoLongerExists, List<String> notDeletedEligibleItems) {
this.dirNoLongerExists = dirNoLongerExists;
this.notDeletedEligibleItems = notDeletedEligibleItems;
}
public boolean dirNotExists() {
return dirNoLongerExists;
}
public List<String> getNotDeletedEligibleItems() {
return notDeletedEligibleItems;
}
}
private void runDirectoryClearUpTasksForAbort() {
for (DirectoryCleanUpTask cleanUpTask : directoryCleanUpTasksForAbort) {
recursiveDeleteItems(cleanUpTask.getPath(), cleanUpTask.isDeleteEmptyDir());
}
}
private static class RenameDirectoryTask {
private final String renameFrom;
private final String renameTo;
public RenameDirectoryTask(String renameFrom, String renameTo) {
this.renameFrom = renameFrom;
this.renameTo = renameTo;
}
public String getRenameFrom() {
return renameFrom;
}
public String getRenameTo() {
return renameTo;
}
@Override
public String toString() {
return new StringJoiner(", ", RenameDirectoryTask.class.getSimpleName() + "[", "]")
.add("renameFrom:" + renameFrom)
.add("renameTo:" + renameTo)
.toString();
}
}
private void runRenameDirTasksForAbort() {
// TODO abort
}
private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir) {
DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, deleteEmptyDir);
if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) {
LOG.error("Failed to delete directory {}. Some eligible items can't be deleted: {}.",
directory.toString(), deleteResult.getNotDeletedEligibleItems());
} else if (deleteEmptyDir && !deleteResult.dirNotExists()) {
LOG.error("Failed to delete directory {} due to dir isn't empty", directory.toString());
}
}
public DeleteRecursivelyResult recursiveDeleteFiles(Path directory, boolean deleteEmptyDir) {
try {
if (!fs.exists(directory.getName()).ok()) {
return new DeleteRecursivelyResult(true, ImmutableList.of());
}
} catch (Exception e) {
ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder();
notDeletedEligibleItems.add(directory.toString() + "/*");
return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build());
}
return doRecursiveDeleteFiles(directory, deleteEmptyDir);
}
private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, boolean deleteEmptyDir) {
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.list(directory.getName(), remoteFiles);
if (!status.ok()) {
ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder();
notDeletedEligibleItems.add(directory + "/*");
return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build());
}
boolean isEmptyDir = true;
List<String> notDeletedEligibleItems = new ArrayList<>();
for (RemoteFile file : remoteFiles) {
if (file.isFile()) {
Path filePath = file.getPath();
isEmptyDir = false;
// TODO Check if this file was created by this query
if (!deleteIfExists(filePath)) {
notDeletedEligibleItems.add(filePath.toString());
}
} else if (file.isDirectory()) {
DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(file.getPath(), deleteEmptyDir);
if (!subResult.dirNotExists()) {
isEmptyDir = false;
}
if (!subResult.getNotDeletedEligibleItems().isEmpty()) {
notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems());
}
} else {
isEmptyDir = false;
notDeletedEligibleItems.add(file.getPath().toString());
}
}
if (isEmptyDir && deleteEmptyDir) {
Verify.verify(notDeletedEligibleItems.isEmpty());
if (!deleteIfExists(directory)) {
return new DeleteRecursivelyResult(false, ImmutableList.of(directory + "/"));
}
// all items of the location have been deleted.
return new DeleteRecursivelyResult(true, ImmutableList.of());
}
return new DeleteRecursivelyResult(false, notDeletedEligibleItems);
}
public boolean deleteIfExists(Path path) {
Status status = fs.delete(path.getName());
if (status.ok()) {
return true;
}
return !fs.exists(path.getName()).ok();
}
}

View File

@ -0,0 +1,30 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.hive;
public class HiveColumnStatistics {
private long totalSizeBytes;
private long numNulls;
private long ndv;
private final double min = Double.NEGATIVE_INFINITY;
private final double max = Double.POSITIVE_INFINITY;
// TODO add hive column statistics
}

View File

@ -0,0 +1,44 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.hive;
public class HiveCommonStatistics {
public static final HiveCommonStatistics EMPTY = new HiveCommonStatistics(0L, 0L, 0L);
private final long rowCount;
private final long fileCount;
private final long totalFileBytes;
public HiveCommonStatistics(long rowCount, long fileCount, long totalFileBytes) {
this.fileCount = fileCount;
this.rowCount = rowCount;
this.totalFileBytes = totalFileBytes;
}
public long getRowCount() {
return rowCount;
}
public long getFileCount() {
return fileCount;
}
public long getTotalFileBytes() {
return totalFileBytes;
}
}

View File

@ -30,16 +30,21 @@ import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.thrift.THivePartitionUpdate;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
public class HiveMetadataOps implements ExternalMetadataOps {
private static final Logger LOG = LogManager.getLogger(HiveMetadataOps.class);
@ -48,6 +53,7 @@ public class HiveMetadataOps implements ExternalMetadataOps {
private HiveConf hiveConf;
private HMSExternalCatalog catalog;
private HMSCachedClient client;
private final RemoteFileSystem fs;
public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, HMSExternalCatalog catalog) {
this.catalog = catalog;
@ -55,6 +61,7 @@ public class HiveMetadataOps implements ExternalMetadataOps {
this.jdbcClientConfig = jdbcClientConfig;
this.client = createCachedClient(hiveConf,
Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size), jdbcClientConfig);
this.fs = new DFSFileSystem(catalog.getProperties());
}
public HMSCachedClient getClient() {
@ -176,4 +183,35 @@ public class HiveMetadataOps implements ExternalMetadataOps {
public List<String> listDatabaseNames() {
return client.getAllDatabases();
}
public void commit(String dbName,
String tableName,
List<THivePartitionUpdate> hivePUs) {
Table table = client.getTable(dbName, tableName);
HMSCommitter hmsCommitter = new HMSCommitter(this, fs, table);
hmsCommitter.commit(hivePUs);
}
public void updateTableStatistics(
String dbName,
String tableName,
Function<HivePartitionStatistics, HivePartitionStatistics> update) {
client.updateTableStatistics(dbName, tableName, update);
}
void updatePartitionStatistics(
String dbName,
String tableName,
String partitionName,
Function<HivePartitionStatistics, HivePartitionStatistics> update) {
client.updatePartitionStatistics(dbName, tableName, partitionName, update);
}
public void addPartitions(String dbName, String tableName, List<HivePartitionWithStatistics> partitions) {
client.addPartitions(dbName, tableName, partitions);
}
public void dropPartition(String dbName, String tableName, List<String> partitionValues, boolean deleteData) {
client.dropPartition(dbName, tableName, partitionValues, deleteData);
}
}

View File

@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
import com.google.common.base.Preconditions;
import lombok.Data;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import java.util.List;
import java.util.Map;
@ -37,7 +38,11 @@ public class HivePartition {
private List<String> partitionValues;
private boolean isDummyPartition;
private Map<String, String> parameters;
private String outputFormat;
private String serde;
private List<FieldSchema> columns;
// If you want to read the data under a partition, you can use this constructor
public HivePartition(String dbName, String tblName, boolean isDummyPartition,
String inputFormat, String path, List<String> partitionValues, Map<String, String> parameters) {
this.dbName = dbName;
@ -52,6 +57,17 @@ public class HivePartition {
this.parameters = parameters;
}
// If you want to update hms with partition, then you can use this constructor,
// as updating hms requires some additional information, such as outputFormat and so on
public HivePartition(String dbName, String tblName, boolean isDummyPartition,
String inputFormat, String path, List<String> partitionValues, Map<String, String> parameters,
String outputFormat, String serde, List<FieldSchema> columns) {
this(dbName, tblName, isDummyPartition, inputFormat, path, partitionValues, parameters);
this.outputFormat = outputFormat;
this.serde = serde;
this.columns = columns;
}
// return partition name like: nation=cn/city=beijing
public String getPartitionName(List<Column> partColumns) {
Preconditions.checkState(partColumns.size() == partitionValues.size());

View File

@ -0,0 +1,117 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/Statistics.java
// and modified by Doris
package org.apache.doris.datasource.hive;
import com.google.common.collect.ImmutableMap;
import java.util.Map;
public class HivePartitionStatistics {
private static final HivePartitionStatistics EMPTY =
new HivePartitionStatistics(HiveCommonStatistics.EMPTY, ImmutableMap.of());
private final HiveCommonStatistics commonStatistics;
private final Map<String, HiveColumnStatistics> columnStatisticsMap;
public HivePartitionStatistics(
HiveCommonStatistics commonStatistics,
Map<String, HiveColumnStatistics> columnStatisticsMap) {
this.commonStatistics = commonStatistics;
this.columnStatisticsMap = columnStatisticsMap;
}
public HiveCommonStatistics getCommonStatistics() {
return commonStatistics;
}
public Map<String, HiveColumnStatistics> getColumnStatisticsMap() {
return columnStatisticsMap;
}
public static HivePartitionStatistics fromCommonStatistics(long rowCount, long fileCount, long totalFileBytes) {
return new HivePartitionStatistics(
new HiveCommonStatistics(rowCount, fileCount, totalFileBytes),
ImmutableMap.of()
);
}
// only used to update the parameters of partition or table.
public static HivePartitionStatistics merge(HivePartitionStatistics current, HivePartitionStatistics update) {
if (current.getCommonStatistics().getRowCount() <= 0) {
return update;
} else if (update.getCommonStatistics().getRowCount() <= 0) {
return current;
}
return new HivePartitionStatistics(
reduce(current.getCommonStatistics(), update.getCommonStatistics(), ReduceOperator.ADD),
// TODO merge columnStatisticsMap
current.getColumnStatisticsMap());
}
public static HivePartitionStatistics reduce(
HivePartitionStatistics first,
HivePartitionStatistics second,
ReduceOperator operator) {
HiveCommonStatistics left = first.getCommonStatistics();
HiveCommonStatistics right = second.getCommonStatistics();
return HivePartitionStatistics.fromCommonStatistics(
reduce(left.getRowCount(), right.getRowCount(), operator),
reduce(left.getFileCount(), right.getFileCount(), operator),
reduce(left.getTotalFileBytes(), right.getTotalFileBytes(), operator));
}
public static HiveCommonStatistics reduce(
HiveCommonStatistics current,
HiveCommonStatistics update,
ReduceOperator operator) {
return new HiveCommonStatistics(
reduce(current.getRowCount(), update.getRowCount(), operator),
reduce(current.getFileCount(), update.getFileCount(), operator),
reduce(current.getTotalFileBytes(), update.getTotalFileBytes(), operator));
}
public static long reduce(long current, long update, ReduceOperator operator) {
if (current >= 0 && update >= 0) {
switch (operator) {
case ADD:
return current + update;
case SUBTRACT:
return current - update;
case MAX:
return Math.max(current, update);
case MIN:
return Math.min(current, update);
default:
throw new IllegalArgumentException("Unexpected operator: " + operator);
}
}
return 0;
}
public enum ReduceOperator {
ADD,
SUBTRACT,
MIN,
MAX,
}
}

View File

@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.hive;
public class HivePartitionWithStatistics {
private String name;
private HivePartition partition;
private HivePartitionStatistics statistics;
public HivePartitionWithStatistics(String name, HivePartition partition, HivePartitionStatistics statistics) {
this.name = name;
this.partition = partition;
this.statistics = statistics;
}
public String getName() {
return name;
}
public HivePartition getPartition() {
return partition;
}
public HivePartitionStatistics getStatistics() {
return statistics;
}
}

View File

@ -22,6 +22,10 @@ import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.RemoteFileSystem;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
import org.apache.hadoop.mapred.InputFormat;
@ -32,6 +36,9 @@ import org.apache.hadoop.util.ReflectionUtils;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Hive util for create or query hive table.
@ -44,14 +51,14 @@ public final class HiveUtil {
/**
* get input format class from inputFormatName.
*
* @param jobConf jobConf used when getInputFormatClass
* @param jobConf jobConf used when getInputFormatClass
* @param inputFormatName inputFormat class name
* @param symlinkTarget use target inputFormat class when inputFormat is SymlinkTextInputFormat
* @param symlinkTarget use target inputFormat class when inputFormat is SymlinkTextInputFormat
* @return a class of inputFormat.
* @throws UserException when class not found.
*/
public static InputFormat<?, ?> getInputFormat(JobConf jobConf,
String inputFormatName, boolean symlinkTarget) throws UserException {
String inputFormatName, boolean symlinkTarget) throws UserException {
try {
Class<? extends InputFormat<?, ?>> inputFormatClass = getInputFormatClass(jobConf, inputFormatName);
if (symlinkTarget && (inputFormatClass == SymlinkTextInputFormat.class)) {
@ -99,4 +106,55 @@ public final class HiveUtil {
throw new RuntimeException(e);
}
}
// "c1=a/c2=b/c3=c" ---> List("a","b","c")
public static List<String> toPartitionValues(String partitionName) {
ImmutableList.Builder<String> resultBuilder = ImmutableList.builder();
int start = 0;
while (true) {
while (start < partitionName.length() && partitionName.charAt(start) != '=') {
start++;
}
start++;
int end = start;
while (end < partitionName.length() && partitionName.charAt(end) != '/') {
end++;
}
if (start > partitionName.length()) {
break;
}
resultBuilder.add(FileUtils.unescapePathName(partitionName.substring(start, end)));
start = end + 1;
}
return resultBuilder.build();
}
// List("c1=a/c2=b/c3=c", "c1=a/c2=b/c3=d")
// |
// |
// v
// Map(
// key:"c1=a/c2=b/c3=c", value:Partition(values=List(a,b,c))
// key:"c1=a/c2=b/c3=d", value:Partition(values=List(a,b,d))
// )
public static Map<String, Partition> convertToNamePartitionMap(
List<String> partitionNames,
List<Partition> partitions) {
Map<String, List<String>> partitionNameToPartitionValues =
partitionNames
.stream()
.collect(Collectors.toMap(partitionName -> partitionName, HiveUtil::toPartitionValues));
Map<List<String>, Partition> partitionValuesToPartition =
partitions.stream()
.collect(Collectors.toMap(Partition::getValues, partition -> partition));
ImmutableMap.Builder<String, Partition> resultBuilder = ImmutableMap.builder();
for (Map.Entry<String, List<String>> entry : partitionNameToPartitionValues.entrySet()) {
Partition partition = partitionValuesToPartition.get(entry.getValue());
resultBuilder.put(entry.getKey(), partition);
}
return resultBuilder.build();
}
}

View File

@ -53,6 +53,7 @@ import java.sql.ResultSet;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
public class PostgreSQLJdbcHMSCachedClient extends JdbcHMSCachedClient {
@ -519,6 +520,31 @@ public class PostgreSQLJdbcHMSCachedClient extends JdbcHMSCachedClient {
throw new NotImplementedException("PostgreSQL createTable not implemented");
}
@Override
public void updateTableStatistics(String dbName,
String tableName,
Function<HivePartitionStatistics, HivePartitionStatistics> update) {
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
}
@Override
public void updatePartitionStatistics(String dbName,
String tableName,
String partitionName,
Function<HivePartitionStatistics, HivePartitionStatistics> update) {
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
}
@Override
public void addPartitions(String dbName, String tableName, List<HivePartitionWithStatistics> partitions) {
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
}
@Override
public void dropPartition(String dbName, String tableName, List<String> partitionValues, boolean deleteData) {
throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient.");
}
public void dropTable(String dbName, String tblName) {
throw new NotImplementedException("PostgreSQL dropTable not implemented");
}

View File

@ -28,8 +28,12 @@ import org.apache.doris.datasource.property.constants.HMSProperties;
import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
@ -73,6 +77,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* This class uses the thrift protocol to directly access the HiveMetaStore service
@ -647,4 +653,135 @@ public class ThriftHMSCachedClient implements HMSCachedClient {
private <T> T ugiDoAs(PrivilegedExceptionAction<T> action) {
return HiveMetaStoreClientHelper.ugiDoAs(hiveConf, action);
}
@Override
public void updateTableStatistics(
String dbName,
String tableName,
Function<HivePartitionStatistics, HivePartitionStatistics> update) {
try (ThriftHMSClient client = getClient()) {
Table originTable = getTable(dbName, tableName);
Map<String, String> originParams = originTable.getParameters();
HivePartitionStatistics updatedStats = update.apply(toHivePartitionStatistics(originParams));
Table newTable = originTable.deepCopy();
Map<String, String> newParams =
updateStatisticsParameters(originParams, updatedStats.getCommonStatistics());
newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000));
newTable.setParameters(newParams);
client.client.alter_table(dbName, tableName, newTable);
} catch (Exception e) {
throw new RuntimeException("failed to update table statistics for " + dbName + "." + tableName);
}
}
@Override
public void updatePartitionStatistics(
String dbName,
String tableName,
String partitionName,
Function<HivePartitionStatistics, HivePartitionStatistics> update) {
try (ThriftHMSClient client = getClient()) {
List<Partition> partitions = client.client.getPartitionsByNames(
dbName, tableName, ImmutableList.of(partitionName));
if (partitions.size() != 1) {
throw new RuntimeException("Metastore returned multiple partitions for name: " + partitionName);
}
Partition originPartition = partitions.get(0);
Map<String, String> originParams = originPartition.getParameters();
HivePartitionStatistics updatedStats = update.apply(toHivePartitionStatistics(originParams));
Partition modifiedPartition = originPartition.deepCopy();
Map<String, String> newParams =
updateStatisticsParameters(originParams, updatedStats.getCommonStatistics());
newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000));
modifiedPartition.setParameters(newParams);
client.client.alter_partition(dbName, tableName, modifiedPartition);
} catch (Exception e) {
throw new RuntimeException("failed to update table statistics for " + dbName + "." + tableName);
}
}
@Override
public void addPartitions(String dbName, String tableName, List<HivePartitionWithStatistics> partitions) {
try (ThriftHMSClient client = getClient()) {
List<Partition> hivePartitions = partitions.stream()
.map(ThriftHMSCachedClient::toMetastoreApiPartition)
.collect(Collectors.toList());
client.client.add_partitions(hivePartitions);
} catch (Exception e) {
throw new RuntimeException("failed to add partitions for " + dbName + "." + tableName, e);
}
}
@Override
public void dropPartition(String dbName, String tableName, List<String> partitionValues, boolean deleteData) {
try (ThriftHMSClient client = getClient()) {
client.client.dropPartition(dbName, tableName, partitionValues, deleteData);
} catch (Exception e) {
throw new RuntimeException("failed to drop partition for " + dbName + "." + tableName);
}
}
private static HivePartitionStatistics toHivePartitionStatistics(Map<String, String> params) {
long rowCount = Long.parseLong(params.getOrDefault(StatsSetupConst.ROW_COUNT, "-1"));
long totalSize = Long.parseLong(params.getOrDefault(StatsSetupConst.TOTAL_SIZE, "-1"));
long numFiles = Long.parseLong(params.getOrDefault(StatsSetupConst.NUM_FILES, "-1"));
return HivePartitionStatistics.fromCommonStatistics(rowCount, numFiles, totalSize);
}
private static Map<String, String> updateStatisticsParameters(
Map<String, String> parameters,
HiveCommonStatistics statistics) {
HashMap<String, String> result = new HashMap<>(parameters);
result.put(StatsSetupConst.NUM_FILES, String.valueOf(statistics.getFileCount()));
result.put(StatsSetupConst.ROW_COUNT, String.valueOf(statistics.getRowCount()));
result.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(statistics.getTotalFileBytes()));
// CDH 5.16 metastore ignores stats unless STATS_GENERATED_VIA_STATS_TASK is set
// https://github.com/cloudera/hive/blob/cdh5.16.2-release/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L227-L231
if (!parameters.containsKey("STATS_GENERATED_VIA_STATS_TASK")) {
result.put("STATS_GENERATED_VIA_STATS_TASK", "workaround for potential lack of HIVE-12730");
}
return result;
}
public static Partition toMetastoreApiPartition(HivePartitionWithStatistics partitionWithStatistics) {
Partition partition =
toMetastoreApiPartition(partitionWithStatistics.getPartition());
partition.setParameters(updateStatisticsParameters(
partition.getParameters(), partitionWithStatistics.getStatistics().getCommonStatistics()));
return partition;
}
public static Partition toMetastoreApiPartition(HivePartition hivePartition) {
Partition result = new Partition();
result.setDbName(hivePartition.getDbName());
result.setTableName(hivePartition.getTblName());
result.setValues(hivePartition.getPartitionValues());
result.setSd(makeStorageDescriptorFromHivePartition(hivePartition));
result.setParameters(hivePartition.getParameters());
return result;
}
private static StorageDescriptor makeStorageDescriptorFromHivePartition(HivePartition partition) {
SerDeInfo serdeInfo = new SerDeInfo();
serdeInfo.setName(partition.getTblName());
serdeInfo.setSerializationLib(partition.getSerde());
StorageDescriptor sd = new StorageDescriptor();
sd.setLocation(Strings.emptyToNull(partition.getPath()));
sd.setCols(partition.getColumns());
sd.setSerdeInfo(serdeInfo);
sd.setInputFormat(partition.getInputFormat());
sd.setOutputFormat(partition.getOutputFormat());
sd.setParameters(ImmutableMap.of());
return sd;
}
}

View File

@ -23,6 +23,9 @@ import org.apache.doris.fs.remote.RemoteFile;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* File system interface.
@ -46,6 +49,24 @@ public interface FileSystem {
Status rename(String origFilePath, String destFilePath);
default void asyncRename(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
List<String> fileNames) {
throw new UnsupportedOperationException("Unsupported operation async rename on current file system.");
}
default void asyncRenameDir(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
throw new UnsupportedOperationException("Unsupported operation async rename dir on current file system.");
}
Status delete(String remotePath);
Status makeDir(String remotePath);

View File

@ -53,6 +53,9 @@ import java.nio.file.Paths;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
public class DFSFileSystem extends RemoteFileSystem {
@ -192,7 +195,7 @@ public class DFSFileSystem extends RemoteFileSystem {
try {
currentStreamOffset = fsDataInputStream.getPos();
} catch (IOException e) {
LOG.error("errors while get file pos from output stream", e);
LOG.warn("errors while get file pos from output stream", e);
throw new IOException("errors while get file pos from output stream", e);
}
if (currentStreamOffset != readOffset) {
@ -230,7 +233,7 @@ public class DFSFileSystem extends RemoteFileSystem {
}
return ByteBuffer.wrap(buf, 0, readLength);
} catch (IOException e) {
LOG.error("errors while read data from stream", e);
LOG.warn("errors while read data from stream", e);
throw new IOException("errors while read data from stream " + e.getMessage());
}
}
@ -261,7 +264,7 @@ public class DFSFileSystem extends RemoteFileSystem {
}
return Status.OK;
} catch (Exception e) {
LOG.error("errors while check path exist " + remotePath, e);
LOG.warn("errors while check path exist " + remotePath, e);
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to check remote path exist: " + remotePath + ". msg: " + e.getMessage());
}
@ -281,7 +284,7 @@ public class DFSFileSystem extends RemoteFileSystem {
try {
fsDataOutputStream.writeBytes(content);
} catch (IOException e) {
LOG.error("errors while write data to output stream", e);
LOG.warn("errors while write data to output stream", e);
status = new Status(Status.ErrCode.COMMON_ERROR, "write exception: " + e.getMessage());
} finally {
Status closeStatus = operations.closeWriter(OpParams.of(fsDataOutputStream));
@ -324,7 +327,7 @@ public class DFSFileSystem extends RemoteFileSystem {
try {
fsDataOutputStream.write(readBuf, 0, bytesRead);
} catch (IOException e) {
LOG.error("errors while write data to output stream", e);
LOG.warn("errors while write data to output stream", e);
lastErrMsg = String.format(
"failed to write hdfs. current write offset: %d, write length: %d, "
+ "file length: %d, file: %s, msg: errors while write data to output stream",
@ -377,7 +380,7 @@ public class DFSFileSystem extends RemoteFileSystem {
} catch (UserException e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
} catch (IOException e) {
LOG.error("errors while rename path from " + srcPath + " to " + destPath);
LOG.warn("errors while rename path from " + srcPath + " to " + destPath);
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to rename remote " + srcPath + " to " + destPath + ", msg: " + e.getMessage());
}
@ -385,6 +388,64 @@ public class DFSFileSystem extends RemoteFileSystem {
return Status.OK;
}
@Override
public void asyncRename(
Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
List<String> fileNames) {
for (String fileName : fileNames) {
Path source = new Path(origFilePath, fileName);
Path target = new Path(destFilePath, fileName);
renameFileFutures.add(CompletableFuture.runAsync(() -> {
if (cancelled.get()) {
return;
}
Status status = rename(source.toString(), target.toString());
if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}
}, executor));
}
}
@Override
public void asyncRenameDir(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
renameFileFutures.add(CompletableFuture.runAsync(() -> {
if (cancelled.get()) {
return;
}
Status status = exists(destFilePath);
if (status.ok()) {
throw new RuntimeException("Destination directory already exists: " + destFilePath);
}
String targetParent = new Path(destFilePath).getParent().toString();
status = exists(targetParent);
if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
makeDir(targetParent);
} else if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}
runWhenPathNotExist.run();
status = rename(origFilePath, destFilePath);
if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}
}, executor));
}
@Override
public Status delete(String remotePath) {
try {
@ -395,7 +456,7 @@ public class DFSFileSystem extends RemoteFileSystem {
} catch (UserException e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
} catch (IOException e) {
LOG.error("errors while delete path " + remotePath);
LOG.warn("errors while delete path " + remotePath);
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to delete remote path: " + remotePath + ", msg: " + e.getMessage());
}
@ -433,7 +494,7 @@ public class DFSFileSystem extends RemoteFileSystem {
LOG.info("file not found: " + e.getMessage());
return new Status(Status.ErrCode.NOT_FOUND, "file not found: " + e.getMessage());
} catch (Exception e) {
LOG.error("errors while get file status ", e);
LOG.warn("errors while get file status ", e);
return new Status(Status.ErrCode.COMMON_ERROR, "errors while get file status " + e.getMessage());
}
LOG.info("finish list path {}", remotePath);
@ -442,6 +503,16 @@ public class DFSFileSystem extends RemoteFileSystem {
@Override
public Status makeDir(String remotePath) {
return new Status(Status.ErrCode.COMMON_ERROR, "mkdir is not implemented.");
try {
FileSystem fileSystem = nativeFileSystem(remotePath);
if (!fileSystem.mkdirs(new Path(remotePath))) {
LOG.warn("failed to make dir for " + remotePath);
return new Status(Status.ErrCode.COMMON_ERROR, "failed to make dir for " + remotePath);
}
} catch (Exception e) {
LOG.warn("failed to make dir for " + remotePath);
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
return Status.OK;
}
}