diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 1ce305a06e..761b34e8d4 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -753,6 +753,10 @@ under the License. org.immutables value + + io.airlift + concurrent + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 783f8a0fdf..aa6ef1f14e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -135,6 +135,11 @@ public abstract class ExternalCatalog return conf; } + // only for test + public void setInitialized() { + initialized = true; + } + /** * set some default properties when creating catalog * @return list of database names in this catalog diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java index c26de66058..b55ed7bdf0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCachedClient.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hive.metastore.api.Table; import java.util.List; import java.util.Map; +import java.util.function.Function; /** * A hive metastore client pool for a specific catalog with hive configuration. @@ -90,4 +91,19 @@ public interface HMSCachedClient { void dropTable(String dbName, String tableName); void createTable(TableMetadata catalogTable, boolean ignoreIfExists); + + void updateTableStatistics( + String dbName, + String tableName, + Function update); + + void updatePartitionStatistics( + String dbName, + String tableName, + String partitionName, + Function update); + + void addPartitions(String dbName, String tableName, List partitions); + + void dropPartition(String dbName, String tableName, List partitionValues, boolean deleteData); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java new file mode 100644 index 0000000000..64abb985fc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSCommitter.java @@ -0,0 +1,668 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/SemiTransactionalHiveMetastore.java +// and modified by Doris + +package org.apache.doris.datasource.hive; + +import org.apache.doris.backup.Status; +import org.apache.doris.common.Pair; +import org.apache.doris.fs.remote.RemoteFile; +import org.apache.doris.fs.remote.RemoteFileSystem; +import org.apache.doris.thrift.THivePartitionUpdate; +import org.apache.doris.thrift.TUpdateMode; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Verify; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.airlift.concurrent.MoreFutures; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.StringJoiner; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +public class HMSCommitter { + private static final Logger LOG = LogManager.getLogger(HMSCommitter.class); + private final HiveMetadataOps hiveOps; + private final RemoteFileSystem fs; + private final Table table; + + // update statistics for unPartitioned table or existed partition + private final List updateStatisticsTasks = new ArrayList<>(); + Executor updateStatisticsExecutor = Executors.newFixedThreadPool(16); + + // add new partition + private final AddPartitionsTask addPartitionsTask = new AddPartitionsTask(); + private static final int PARTITION_COMMIT_BATCH_SIZE = 20; + + // for file system rename operation + // whether to cancel the file system tasks + private final AtomicBoolean fileSystemTaskCancelled = new AtomicBoolean(false); + // file system tasks that are executed asynchronously, including rename_file, rename_dir + private final List> asyncFileSystemTaskFutures = new ArrayList<>(); + // when aborted, we need to delete all files under this path, even the current directory + private final Queue directoryCleanUpTasksForAbort = new ConcurrentLinkedQueue<>(); + // when aborted, we need restore directory + private final List renameDirectoryTasksForAbort = new ArrayList<>(); + Executor fileSystemExecutor = Executors.newFixedThreadPool(16); + + public HMSCommitter(HiveMetadataOps hiveOps, RemoteFileSystem fs, Table table) { + this.hiveOps = hiveOps; + this.fs = fs; + this.table = table; + } + + public void commit(List hivePUs) { + try { + prepare(mergePartitions(hivePUs)); + doCommit(); + } catch (Throwable t) { + LOG.warn("Failed to commit for {}.{}, abort it.", table.getDbName(), table.getTableName()); + try { + cancelUnStartedAsyncFileSystemTask(); + undoUpdateStatisticsTasks(); + undoAddPartitionsTask(); + waitForAsyncFileSystemTaskSuppressThrowable(); + runDirectoryClearUpTasksForAbort(); + runRenameDirTasksForAbort(); + } catch (Throwable e) { + t.addSuppressed(new Exception("Failed to roll back after commit failure", e)); + } + throw t; + } + } + + public void prepare(List hivePUs) { + + List> insertExistsPartitions = new ArrayList<>(); + + for (THivePartitionUpdate pu : hivePUs) { + TUpdateMode updateMode = pu.getUpdateMode(); + HivePartitionStatistics hivePartitionStatistics = HivePartitionStatistics.fromCommonStatistics( + pu.getRowCount(), + pu.getFileNamesSize(), + pu.getFileSize()); + if (table.getPartitionKeysSize() == 0) { + Preconditions.checkArgument(hivePUs.size() == 1, + "When updating a non-partitioned table, multiple partitions should not be written"); + switch (updateMode) { + case APPEND: + prepareAppendTable(pu, hivePartitionStatistics); + break; + case OVERWRITE: + prepareOverwriteTable(pu, hivePartitionStatistics); + break; + default: + throw new RuntimeException("Not support mode:[" + updateMode + "] in unPartitioned table"); + } + } else { + switch (updateMode) { + case NEW: + prepareCreateNewPartition(pu, hivePartitionStatistics); + break; + case APPEND: + insertExistsPartitions.add(Pair.of(pu, hivePartitionStatistics)); + break; + case OVERWRITE: + prepareOverwritePartition(pu, hivePartitionStatistics); + break; + default: + throw new RuntimeException("Not support mode:[" + updateMode + "] in unPartitioned table"); + } + } + } + + if (!insertExistsPartitions.isEmpty()) { + prepareInsertExistPartition(insertExistsPartitions); + } + } + + public List mergePartitions(List hivePUs) { + Map mm = new HashMap<>(); + for (THivePartitionUpdate pu : hivePUs) { + if (mm.containsKey(pu.getName())) { + THivePartitionUpdate old = mm.get(pu.getName()); + old.setFileSize(old.getFileSize() + pu.getFileSize()); + old.setRowCount(old.getRowCount() + pu.getRowCount()); + old.getFileNames().addAll(pu.getFileNames()); + } else { + mm.put(pu.getName(), pu); + } + } + return new ArrayList<>(mm.values()); + } + + public void doCommit() { + waitForAsyncFileSystemTasks(); + doAddPartitionsTask(); + doUpdateStatisticsTasks(); + } + + public void rollback() { + + } + + public void cancelUnStartedAsyncFileSystemTask() { + fileSystemTaskCancelled.set(true); + } + + private void undoUpdateStatisticsTasks() { + ImmutableList.Builder> undoUpdateFutures = ImmutableList.builder(); + for (UpdateStatisticsTask task : updateStatisticsTasks) { + undoUpdateFutures.add(CompletableFuture.runAsync(() -> { + try { + task.undo(hiveOps); + } catch (Throwable throwable) { + LOG.warn("Failed to rollback: {}", task.getDescription(), throwable); + } + }, updateStatisticsExecutor)); + } + + for (CompletableFuture undoUpdateFuture : undoUpdateFutures.build()) { + MoreFutures.getFutureValue(undoUpdateFuture); + } + } + + private void undoAddPartitionsTask() { + if (addPartitionsTask.isEmpty()) { + return; + } + + HivePartition firstPartition = addPartitionsTask.getPartitions().get(0).getPartition(); + String dbName = firstPartition.getDbName(); + String tableName = firstPartition.getTblName(); + List> rollbackFailedPartitions = addPartitionsTask.rollback(hiveOps); + if (!rollbackFailedPartitions.isEmpty()) { + LOG.warn("Failed to rollback: add_partition for partition values {}.{}.{}", + dbName, tableName, rollbackFailedPartitions); + } + } + + private void waitForAsyncFileSystemTaskSuppressThrowable() { + for (CompletableFuture future : asyncFileSystemTaskFutures) { + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Throwable t) { + // ignore + } + } + } + + public void prepareAppendTable(THivePartitionUpdate pu, HivePartitionStatistics ps) { + String targetPath = pu.getLocation().getTargetPath(); + String writePath = pu.getLocation().getWritePath(); + if (!targetPath.equals(writePath)) { + fs.asyncRename( + fileSystemExecutor, + asyncFileSystemTaskFutures, + fileSystemTaskCancelled, + writePath, + targetPath, + pu.getFileNames()); + } + directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, false)); + updateStatisticsTasks.add( + new UpdateStatisticsTask( + table.getDbName(), + table.getTableName(), + Optional.empty(), + ps, + true + )); + } + + public void prepareOverwriteTable(THivePartitionUpdate pu, HivePartitionStatistics ps) { + + } + + public void prepareCreateNewPartition(THivePartitionUpdate pu, HivePartitionStatistics ps) { + + String targetPath = pu.getLocation().getTargetPath(); + String writePath = pu.getLocation().getWritePath(); + + if (!targetPath.equals(writePath)) { + fs.asyncRenameDir( + fileSystemExecutor, + asyncFileSystemTaskFutures, + fileSystemTaskCancelled, + writePath, + targetPath, + () -> directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, true))); + } + + StorageDescriptor sd = table.getSd(); + + HivePartition hivePartition = new HivePartition( + table.getDbName(), + table.getTableName(), + false, + sd.getInputFormat(), + pu.getLocation().getTargetPath(), + HiveUtil.toPartitionValues(pu.getName()), + Maps.newHashMap(), + sd.getOutputFormat(), + sd.getSerdeInfo().getSerializationLib(), + hiveOps.getClient().getSchema(table.getDbName(), table.getTableName()) + ); + HivePartitionWithStatistics partitionWithStats = + new HivePartitionWithStatistics(pu.getName(), hivePartition, ps); + addPartitionsTask.addPartition(partitionWithStats); + } + + public void prepareInsertExistPartition(List> partitions) { + for (List> partitionBatch : + Iterables.partition(partitions, 100)) { + List partitionNames = partitionBatch.stream() + .map(pair -> pair.first.getName()) + .collect(Collectors.toList()); + + Map partitionsByNamesMap = HiveUtil.convertToNamePartitionMap( + partitionNames, + hiveOps.getClient().getPartitions(table.getDbName(), table.getTableName(), partitionNames)); + + for (int i = 0; i < partitionsByNamesMap.size(); i++) { + String partitionName = partitionNames.get(i); + if (partitionsByNamesMap.get(partitionName) == null) { + // Prevent this partition from being deleted by other engines + throw new RuntimeException("Not found partition: " + partitionName); + } + + THivePartitionUpdate pu = partitionBatch.get(i).first; + HivePartitionStatistics updateStats = partitionBatch.get(i).second; + + String writePath = pu.getLocation().getWritePath(); + String targetPath = pu.getLocation().getTargetPath(); + directoryCleanUpTasksForAbort.add(new DirectoryCleanUpTask(targetPath, false)); + + if (!targetPath.equals(writePath)) { + fs.asyncRename( + fileSystemExecutor, + asyncFileSystemTaskFutures, + fileSystemTaskCancelled, + writePath, + targetPath, + pu.getFileNames()); + } + + updateStatisticsTasks.add( + new UpdateStatisticsTask( + table.getDbName(), + table.getTableName(), + Optional.of(pu.getName()), + updateStats, + true)); + } + } + } + + + public void prepareOverwritePartition(THivePartitionUpdate pu, HivePartitionStatistics ps) { + + } + + + private void waitForAsyncFileSystemTasks() { + for (CompletableFuture future : asyncFileSystemTaskFutures) { + MoreFutures.getFutureValue(future, RuntimeException.class); + } + } + + private void doAddPartitionsTask() { + if (!addPartitionsTask.isEmpty()) { + addPartitionsTask.run(hiveOps); + } + } + + private void doUpdateStatisticsTasks() { + ImmutableList.Builder> updateStatsFutures = ImmutableList.builder(); + List failedTaskDescriptions = new ArrayList<>(); + List suppressedExceptions = new ArrayList<>(); + for (UpdateStatisticsTask task : updateStatisticsTasks) { + updateStatsFutures.add(CompletableFuture.runAsync(() -> { + try { + task.run(hiveOps); + } catch (Throwable t) { + synchronized (suppressedExceptions) { + addSuppressedExceptions(suppressedExceptions, t, failedTaskDescriptions, task.getDescription()); + } + } + }, updateStatisticsExecutor)); + } + + for (CompletableFuture executeUpdateFuture : updateStatsFutures.build()) { + MoreFutures.getFutureValue(executeUpdateFuture); + } + if (!suppressedExceptions.isEmpty()) { + StringBuilder message = new StringBuilder(); + message.append("Failed to execute some updating statistics tasks: "); + Joiner.on("; ").appendTo(message, failedTaskDescriptions); + RuntimeException exception = new RuntimeException(message.toString()); + suppressedExceptions.forEach(exception::addSuppressed); + throw exception; + } + } + + private static void addSuppressedExceptions( + List suppressedExceptions, + Throwable t, + List descriptions, + String description) { + descriptions.add(description); + // A limit is needed to avoid having a huge exception object. 5 was chosen arbitrarily. + if (suppressedExceptions.size() < 5) { + suppressedExceptions.add(t); + } + } + + private static class AddPartition { + + } + + private static class UpdateStatisticsTask { + private final String dbName; + private final String tableName; + private final Optional partitionName; + private final HivePartitionStatistics updatePartitionStat; + private final boolean merge; + + private boolean done; + + public UpdateStatisticsTask(String dbName, String tableName, Optional partitionName, + HivePartitionStatistics statistics, boolean merge) { + this.dbName = Objects.requireNonNull(dbName, "dbName is null"); + this.tableName = Objects.requireNonNull(tableName, "tableName is null"); + this.partitionName = Objects.requireNonNull(partitionName, "partitionName is null"); + this.updatePartitionStat = Objects.requireNonNull(statistics, "statistics is null"); + this.merge = merge; + } + + public void run(HiveMetadataOps hiveOps) { + if (partitionName.isPresent()) { + hiveOps.updatePartitionStatistics(dbName, tableName, partitionName.get(), this::updateStatistics); + } else { + hiveOps.updateTableStatistics(dbName, tableName, this::updateStatistics); + } + done = true; + } + + public void undo(HiveMetadataOps hmsOps) { + if (!done) { + return; + } + if (partitionName.isPresent()) { + hmsOps.updatePartitionStatistics(dbName, tableName, partitionName.get(), this::resetStatistics); + } else { + hmsOps.updateTableStatistics(dbName, tableName, this::resetStatistics); + } + } + + public String getDescription() { + if (partitionName.isPresent()) { + return "alter partition parameters " + tableName + " " + partitionName.get(); + } else { + return "alter table parameters " + tableName; + } + } + + private HivePartitionStatistics updateStatistics(HivePartitionStatistics currentStats) { + return merge ? HivePartitionStatistics.merge(currentStats, updatePartitionStat) : updatePartitionStat; + } + + private HivePartitionStatistics resetStatistics(HivePartitionStatistics currentStatistics) { + return HivePartitionStatistics + .reduce(currentStatistics, updatePartitionStat, HivePartitionStatistics.ReduceOperator.SUBTRACT); + } + } + + public static class AddPartitionsTask { + private final List partitions = new ArrayList<>(); + private final List> createdPartitionValues = new ArrayList<>(); + + public boolean isEmpty() { + return partitions.isEmpty(); + } + + public List getPartitions() { + return partitions; + } + + public void addPartition(HivePartitionWithStatistics partition) { + partitions.add(partition); + } + + public void run(HiveMetadataOps hiveOps) { + HivePartition firstPartition = partitions.get(0).getPartition(); + String dbName = firstPartition.getDbName(); + String tableName = firstPartition.getTblName(); + List> batchedPartitions = + Lists.partition(partitions, PARTITION_COMMIT_BATCH_SIZE); + for (List batch : batchedPartitions) { + try { + hiveOps.addPartitions(dbName, tableName, batch); + for (HivePartitionWithStatistics partition : batch) { + createdPartitionValues.add(partition.getPartition().getPartitionValues()); + } + } catch (Throwable t) { + LOG.error("Failed to add partition", t); + throw t; + } + } + partitions.clear(); + } + + public List> rollback(HiveMetadataOps hiveOps) { + HivePartition firstPartition = partitions.get(0).getPartition(); + String dbName = firstPartition.getDbName(); + String tableName = firstPartition.getTblName(); + List> rollbackFailedPartitions = new ArrayList<>(); + for (List createdPartitionValue : createdPartitionValues) { + try { + hiveOps.dropPartition(dbName, tableName, createdPartitionValue, false); + } catch (Throwable t) { + LOG.warn("Failed to drop partition on {}.{}.{} when rollback", + dbName, tableName, rollbackFailedPartitions); + rollbackFailedPartitions.add(createdPartitionValue); + } + } + return rollbackFailedPartitions; + } + } + + private static class DirectoryCleanUpTask { + private final Path path; + private final boolean deleteEmptyDir; + + public DirectoryCleanUpTask(String path, boolean deleteEmptyDir) { + this.path = new Path(path); + this.deleteEmptyDir = deleteEmptyDir; + } + + public Path getPath() { + return path; + } + + public boolean isDeleteEmptyDir() { + return deleteEmptyDir; + } + + @Override + public String toString() { + return new StringJoiner(", ", DirectoryCleanUpTask.class.getSimpleName() + "[", "]") + .add("path=" + path) + .add("deleteEmptyDir=" + deleteEmptyDir) + .toString(); + } + } + + public static class DeleteRecursivelyResult { + private final boolean dirNoLongerExists; + private final List notDeletedEligibleItems; + + public DeleteRecursivelyResult(boolean dirNoLongerExists, List notDeletedEligibleItems) { + this.dirNoLongerExists = dirNoLongerExists; + this.notDeletedEligibleItems = notDeletedEligibleItems; + } + + public boolean dirNotExists() { + return dirNoLongerExists; + } + + public List getNotDeletedEligibleItems() { + return notDeletedEligibleItems; + } + } + + private void runDirectoryClearUpTasksForAbort() { + for (DirectoryCleanUpTask cleanUpTask : directoryCleanUpTasksForAbort) { + recursiveDeleteItems(cleanUpTask.getPath(), cleanUpTask.isDeleteEmptyDir()); + } + } + + private static class RenameDirectoryTask { + private final String renameFrom; + private final String renameTo; + + public RenameDirectoryTask(String renameFrom, String renameTo) { + this.renameFrom = renameFrom; + this.renameTo = renameTo; + } + + public String getRenameFrom() { + return renameFrom; + } + + public String getRenameTo() { + return renameTo; + } + + @Override + public String toString() { + return new StringJoiner(", ", RenameDirectoryTask.class.getSimpleName() + "[", "]") + .add("renameFrom:" + renameFrom) + .add("renameTo:" + renameTo) + .toString(); + } + } + + private void runRenameDirTasksForAbort() { + // TODO abort + } + + + private void recursiveDeleteItems(Path directory, boolean deleteEmptyDir) { + DeleteRecursivelyResult deleteResult = recursiveDeleteFiles(directory, deleteEmptyDir); + + if (!deleteResult.getNotDeletedEligibleItems().isEmpty()) { + LOG.error("Failed to delete directory {}. Some eligible items can't be deleted: {}.", + directory.toString(), deleteResult.getNotDeletedEligibleItems()); + } else if (deleteEmptyDir && !deleteResult.dirNotExists()) { + LOG.error("Failed to delete directory {} due to dir isn't empty", directory.toString()); + } + } + + public DeleteRecursivelyResult recursiveDeleteFiles(Path directory, boolean deleteEmptyDir) { + try { + if (!fs.exists(directory.getName()).ok()) { + return new DeleteRecursivelyResult(true, ImmutableList.of()); + } + } catch (Exception e) { + ImmutableList.Builder notDeletedEligibleItems = ImmutableList.builder(); + notDeletedEligibleItems.add(directory.toString() + "/*"); + return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build()); + } + + return doRecursiveDeleteFiles(directory, deleteEmptyDir); + } + + private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, boolean deleteEmptyDir) { + List remoteFiles = new ArrayList<>(); + + Status status = fs.list(directory.getName(), remoteFiles); + if (!status.ok()) { + ImmutableList.Builder notDeletedEligibleItems = ImmutableList.builder(); + notDeletedEligibleItems.add(directory + "/*"); + return new DeleteRecursivelyResult(false, notDeletedEligibleItems.build()); + } + + boolean isEmptyDir = true; + List notDeletedEligibleItems = new ArrayList<>(); + for (RemoteFile file : remoteFiles) { + if (file.isFile()) { + Path filePath = file.getPath(); + isEmptyDir = false; + // TODO Check if this file was created by this query + if (!deleteIfExists(filePath)) { + notDeletedEligibleItems.add(filePath.toString()); + } + } else if (file.isDirectory()) { + DeleteRecursivelyResult subResult = doRecursiveDeleteFiles(file.getPath(), deleteEmptyDir); + if (!subResult.dirNotExists()) { + isEmptyDir = false; + } + if (!subResult.getNotDeletedEligibleItems().isEmpty()) { + notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems()); + } + } else { + isEmptyDir = false; + notDeletedEligibleItems.add(file.getPath().toString()); + } + } + + if (isEmptyDir && deleteEmptyDir) { + Verify.verify(notDeletedEligibleItems.isEmpty()); + if (!deleteIfExists(directory)) { + return new DeleteRecursivelyResult(false, ImmutableList.of(directory + "/")); + } + // all items of the location have been deleted. + return new DeleteRecursivelyResult(true, ImmutableList.of()); + } + + return new DeleteRecursivelyResult(false, notDeletedEligibleItems); + } + + public boolean deleteIfExists(Path path) { + Status status = fs.delete(path.getName()); + if (status.ok()) { + return true; + } + return !fs.exists(path.getName()).ok(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveColumnStatistics.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveColumnStatistics.java new file mode 100644 index 0000000000..96cb6b5728 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveColumnStatistics.java @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + + +public class HiveColumnStatistics { + + private long totalSizeBytes; + private long numNulls; + private long ndv; + private final double min = Double.NEGATIVE_INFINITY; + private final double max = Double.POSITIVE_INFINITY; + + // TODO add hive column statistics +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveCommonStatistics.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveCommonStatistics.java new file mode 100644 index 0000000000..3d8fb2512a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveCommonStatistics.java @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +public class HiveCommonStatistics { + public static final HiveCommonStatistics EMPTY = new HiveCommonStatistics(0L, 0L, 0L); + + private final long rowCount; + private final long fileCount; + private final long totalFileBytes; + + public HiveCommonStatistics(long rowCount, long fileCount, long totalFileBytes) { + this.fileCount = fileCount; + this.rowCount = rowCount; + this.totalFileBytes = totalFileBytes; + } + + public long getRowCount() { + return rowCount; + } + + public long getFileCount() { + return fileCount; + } + + public long getTotalFileBytes() { + return totalFileBytes; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index 6779a602cb..886d6d76fa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -30,16 +30,21 @@ import org.apache.doris.datasource.ExternalDatabase; import org.apache.doris.datasource.jdbc.client.JdbcClient; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; import org.apache.doris.datasource.operations.ExternalMetadataOps; +import org.apache.doris.fs.remote.RemoteFileSystem; +import org.apache.doris.fs.remote.dfs.DFSFileSystem; +import org.apache.doris.thrift.THivePartitionUpdate; import com.google.common.base.Preconditions; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.function.Function; public class HiveMetadataOps implements ExternalMetadataOps { private static final Logger LOG = LogManager.getLogger(HiveMetadataOps.class); @@ -48,6 +53,7 @@ public class HiveMetadataOps implements ExternalMetadataOps { private HiveConf hiveConf; private HMSExternalCatalog catalog; private HMSCachedClient client; + private final RemoteFileSystem fs; public HiveMetadataOps(HiveConf hiveConf, JdbcClientConfig jdbcClientConfig, HMSExternalCatalog catalog) { this.catalog = catalog; @@ -55,6 +61,7 @@ public class HiveMetadataOps implements ExternalMetadataOps { this.jdbcClientConfig = jdbcClientConfig; this.client = createCachedClient(hiveConf, Math.max(MIN_CLIENT_POOL_SIZE, Config.max_external_cache_loader_thread_pool_size), jdbcClientConfig); + this.fs = new DFSFileSystem(catalog.getProperties()); } public HMSCachedClient getClient() { @@ -176,4 +183,35 @@ public class HiveMetadataOps implements ExternalMetadataOps { public List listDatabaseNames() { return client.getAllDatabases(); } + + public void commit(String dbName, + String tableName, + List hivePUs) { + Table table = client.getTable(dbName, tableName); + HMSCommitter hmsCommitter = new HMSCommitter(this, fs, table); + hmsCommitter.commit(hivePUs); + } + + public void updateTableStatistics( + String dbName, + String tableName, + Function update) { + client.updateTableStatistics(dbName, tableName, update); + } + + void updatePartitionStatistics( + String dbName, + String tableName, + String partitionName, + Function update) { + client.updatePartitionStatistics(dbName, tableName, partitionName, update); + } + + public void addPartitions(String dbName, String tableName, List partitions) { + client.addPartitions(dbName, tableName, partitions); + } + + public void dropPartition(String dbName, String tableName, List partitionValues, boolean deleteData) { + client.dropPartition(dbName, tableName, partitionValues, deleteData); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java index 0663edb48d..a9d97b4062 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartition.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column; import com.google.common.base.Preconditions; import lombok.Data; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import java.util.List; import java.util.Map; @@ -37,7 +38,11 @@ public class HivePartition { private List partitionValues; private boolean isDummyPartition; private Map parameters; + private String outputFormat; + private String serde; + private List columns; + // If you want to read the data under a partition, you can use this constructor public HivePartition(String dbName, String tblName, boolean isDummyPartition, String inputFormat, String path, List partitionValues, Map parameters) { this.dbName = dbName; @@ -52,6 +57,17 @@ public class HivePartition { this.parameters = parameters; } + // If you want to update hms with partition, then you can use this constructor, + // as updating hms requires some additional information, such as outputFormat and so on + public HivePartition(String dbName, String tblName, boolean isDummyPartition, + String inputFormat, String path, List partitionValues, Map parameters, + String outputFormat, String serde, List columns) { + this(dbName, tblName, isDummyPartition, inputFormat, path, partitionValues, parameters); + this.outputFormat = outputFormat; + this.serde = serde; + this.columns = columns; + } + // return partition name like: nation=cn/city=beijing public String getPartitionName(List partColumns) { Preconditions.checkState(partColumns.size() == partitionValues.size()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java new file mode 100644 index 0000000000..49b1450475 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionStatistics.java @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/trinodb/trino/blob/438/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/Statistics.java +// and modified by Doris + +package org.apache.doris.datasource.hive; + +import com.google.common.collect.ImmutableMap; + +import java.util.Map; + +public class HivePartitionStatistics { + private static final HivePartitionStatistics EMPTY = + new HivePartitionStatistics(HiveCommonStatistics.EMPTY, ImmutableMap.of()); + + private final HiveCommonStatistics commonStatistics; + private final Map columnStatisticsMap; + + public HivePartitionStatistics( + HiveCommonStatistics commonStatistics, + Map columnStatisticsMap) { + this.commonStatistics = commonStatistics; + this.columnStatisticsMap = columnStatisticsMap; + } + + public HiveCommonStatistics getCommonStatistics() { + return commonStatistics; + } + + public Map getColumnStatisticsMap() { + return columnStatisticsMap; + } + + public static HivePartitionStatistics fromCommonStatistics(long rowCount, long fileCount, long totalFileBytes) { + return new HivePartitionStatistics( + new HiveCommonStatistics(rowCount, fileCount, totalFileBytes), + ImmutableMap.of() + ); + } + + // only used to update the parameters of partition or table. + public static HivePartitionStatistics merge(HivePartitionStatistics current, HivePartitionStatistics update) { + if (current.getCommonStatistics().getRowCount() <= 0) { + return update; + } else if (update.getCommonStatistics().getRowCount() <= 0) { + return current; + } + + return new HivePartitionStatistics( + reduce(current.getCommonStatistics(), update.getCommonStatistics(), ReduceOperator.ADD), + // TODO merge columnStatisticsMap + current.getColumnStatisticsMap()); + } + + public static HivePartitionStatistics reduce( + HivePartitionStatistics first, + HivePartitionStatistics second, + ReduceOperator operator) { + HiveCommonStatistics left = first.getCommonStatistics(); + HiveCommonStatistics right = second.getCommonStatistics(); + return HivePartitionStatistics.fromCommonStatistics( + reduce(left.getRowCount(), right.getRowCount(), operator), + reduce(left.getFileCount(), right.getFileCount(), operator), + reduce(left.getTotalFileBytes(), right.getTotalFileBytes(), operator)); + } + + public static HiveCommonStatistics reduce( + HiveCommonStatistics current, + HiveCommonStatistics update, + ReduceOperator operator) { + return new HiveCommonStatistics( + reduce(current.getRowCount(), update.getRowCount(), operator), + reduce(current.getFileCount(), update.getFileCount(), operator), + reduce(current.getTotalFileBytes(), update.getTotalFileBytes(), operator)); + } + + public static long reduce(long current, long update, ReduceOperator operator) { + if (current >= 0 && update >= 0) { + switch (operator) { + case ADD: + return current + update; + case SUBTRACT: + return current - update; + case MAX: + return Math.max(current, update); + case MIN: + return Math.min(current, update); + default: + throw new IllegalArgumentException("Unexpected operator: " + operator); + } + } + + return 0; + } + + public enum ReduceOperator { + ADD, + SUBTRACT, + MIN, + MAX, + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java new file mode 100644 index 0000000000..b7c28b68ff --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HivePartitionWithStatistics.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +public class HivePartitionWithStatistics { + private String name; + private HivePartition partition; + private HivePartitionStatistics statistics; + + public HivePartitionWithStatistics(String name, HivePartition partition, HivePartitionStatistics statistics) { + this.name = name; + this.partition = partition; + this.statistics = statistics; + } + + public String getName() { + return name; + } + + public HivePartition getPartition() { + return partition; + } + + public HivePartitionStatistics getStatistics() { + return statistics; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java index 2fdd4f21f3..eb107464bf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveUtil.java @@ -22,6 +22,10 @@ import org.apache.doris.fs.remote.BrokerFileSystem; import org.apache.doris.fs.remote.RemoteFileSystem; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; import org.apache.hadoop.mapred.InputFormat; @@ -32,6 +36,9 @@ import org.apache.hadoop.util.ReflectionUtils; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; /** * Hive util for create or query hive table. @@ -44,14 +51,14 @@ public final class HiveUtil { /** * get input format class from inputFormatName. * - * @param jobConf jobConf used when getInputFormatClass + * @param jobConf jobConf used when getInputFormatClass * @param inputFormatName inputFormat class name - * @param symlinkTarget use target inputFormat class when inputFormat is SymlinkTextInputFormat + * @param symlinkTarget use target inputFormat class when inputFormat is SymlinkTextInputFormat * @return a class of inputFormat. * @throws UserException when class not found. */ public static InputFormat getInputFormat(JobConf jobConf, - String inputFormatName, boolean symlinkTarget) throws UserException { + String inputFormatName, boolean symlinkTarget) throws UserException { try { Class> inputFormatClass = getInputFormatClass(jobConf, inputFormatName); if (symlinkTarget && (inputFormatClass == SymlinkTextInputFormat.class)) { @@ -99,4 +106,55 @@ public final class HiveUtil { throw new RuntimeException(e); } } + + // "c1=a/c2=b/c3=c" ---> List("a","b","c") + public static List toPartitionValues(String partitionName) { + ImmutableList.Builder resultBuilder = ImmutableList.builder(); + int start = 0; + while (true) { + while (start < partitionName.length() && partitionName.charAt(start) != '=') { + start++; + } + start++; + int end = start; + while (end < partitionName.length() && partitionName.charAt(end) != '/') { + end++; + } + if (start > partitionName.length()) { + break; + } + resultBuilder.add(FileUtils.unescapePathName(partitionName.substring(start, end))); + start = end + 1; + } + return resultBuilder.build(); + } + + // List("c1=a/c2=b/c3=c", "c1=a/c2=b/c3=d") + // | + // | + // v + // Map( + // key:"c1=a/c2=b/c3=c", value:Partition(values=List(a,b,c)) + // key:"c1=a/c2=b/c3=d", value:Partition(values=List(a,b,d)) + // ) + public static Map convertToNamePartitionMap( + List partitionNames, + List partitions) { + + Map> partitionNameToPartitionValues = + partitionNames + .stream() + .collect(Collectors.toMap(partitionName -> partitionName, HiveUtil::toPartitionValues)); + + Map, Partition> partitionValuesToPartition = + partitions.stream() + .collect(Collectors.toMap(Partition::getValues, partition -> partition)); + + ImmutableMap.Builder resultBuilder = ImmutableMap.builder(); + for (Map.Entry> entry : partitionNameToPartitionValues.entrySet()) { + Partition partition = partitionValuesToPartition.get(entry.getValue()); + resultBuilder.put(entry.getKey(), partition); + } + return resultBuilder.build(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java index e587debdb3..c18fa30189 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java @@ -53,6 +53,7 @@ import java.sql.ResultSet; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; public class PostgreSQLJdbcHMSCachedClient extends JdbcHMSCachedClient { @@ -519,6 +520,31 @@ public class PostgreSQLJdbcHMSCachedClient extends JdbcHMSCachedClient { throw new NotImplementedException("PostgreSQL createTable not implemented"); } + @Override + public void updateTableStatistics(String dbName, + String tableName, + Function update) { + throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); + } + + @Override + public void updatePartitionStatistics(String dbName, + String tableName, + String partitionName, + Function update) { + throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); + } + + @Override + public void addPartitions(String dbName, String tableName, List partitions) { + throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); + } + + @Override + public void dropPartition(String dbName, String tableName, List partitionValues, boolean deleteData) { + throw new HMSClientException("Do not support in PostgreSQLJdbcHMSCachedClient."); + } + public void dropTable(String dbName, String tblName) { throw new NotImplementedException("PostgreSQL dropTable not implemented"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java index c820790662..cb5328395c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/ThriftHMSCachedClient.java @@ -28,8 +28,12 @@ import org.apache.doris.datasource.property.constants.HMSProperties; import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient; import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidTxnWriteIdList; @@ -73,6 +77,8 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Queue; +import java.util.function.Function; +import java.util.stream.Collectors; /** * This class uses the thrift protocol to directly access the HiveMetaStore service @@ -647,4 +653,135 @@ public class ThriftHMSCachedClient implements HMSCachedClient { private T ugiDoAs(PrivilegedExceptionAction action) { return HiveMetaStoreClientHelper.ugiDoAs(hiveConf, action); } + + @Override + public void updateTableStatistics( + String dbName, + String tableName, + Function update) { + try (ThriftHMSClient client = getClient()) { + + Table originTable = getTable(dbName, tableName); + Map originParams = originTable.getParameters(); + HivePartitionStatistics updatedStats = update.apply(toHivePartitionStatistics(originParams)); + + Table newTable = originTable.deepCopy(); + Map newParams = + updateStatisticsParameters(originParams, updatedStats.getCommonStatistics()); + newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000)); + newTable.setParameters(newParams); + client.client.alter_table(dbName, tableName, newTable); + } catch (Exception e) { + throw new RuntimeException("failed to update table statistics for " + dbName + "." + tableName); + } + } + + @Override + public void updatePartitionStatistics( + String dbName, + String tableName, + String partitionName, + Function update) { + try (ThriftHMSClient client = getClient()) { + List partitions = client.client.getPartitionsByNames( + dbName, tableName, ImmutableList.of(partitionName)); + if (partitions.size() != 1) { + throw new RuntimeException("Metastore returned multiple partitions for name: " + partitionName); + } + + Partition originPartition = partitions.get(0); + Map originParams = originPartition.getParameters(); + HivePartitionStatistics updatedStats = update.apply(toHivePartitionStatistics(originParams)); + + Partition modifiedPartition = originPartition.deepCopy(); + Map newParams = + updateStatisticsParameters(originParams, updatedStats.getCommonStatistics()); + newParams.put("transient_lastDdlTime", String.valueOf(System.currentTimeMillis() / 1000)); + modifiedPartition.setParameters(newParams); + client.client.alter_partition(dbName, tableName, modifiedPartition); + } catch (Exception e) { + throw new RuntimeException("failed to update table statistics for " + dbName + "." + tableName); + } + } + + @Override + public void addPartitions(String dbName, String tableName, List partitions) { + try (ThriftHMSClient client = getClient()) { + List hivePartitions = partitions.stream() + .map(ThriftHMSCachedClient::toMetastoreApiPartition) + .collect(Collectors.toList()); + client.client.add_partitions(hivePartitions); + } catch (Exception e) { + throw new RuntimeException("failed to add partitions for " + dbName + "." + tableName, e); + } + } + + @Override + public void dropPartition(String dbName, String tableName, List partitionValues, boolean deleteData) { + try (ThriftHMSClient client = getClient()) { + client.client.dropPartition(dbName, tableName, partitionValues, deleteData); + } catch (Exception e) { + throw new RuntimeException("failed to drop partition for " + dbName + "." + tableName); + } + } + + private static HivePartitionStatistics toHivePartitionStatistics(Map params) { + long rowCount = Long.parseLong(params.getOrDefault(StatsSetupConst.ROW_COUNT, "-1")); + long totalSize = Long.parseLong(params.getOrDefault(StatsSetupConst.TOTAL_SIZE, "-1")); + long numFiles = Long.parseLong(params.getOrDefault(StatsSetupConst.NUM_FILES, "-1")); + return HivePartitionStatistics.fromCommonStatistics(rowCount, numFiles, totalSize); + } + + private static Map updateStatisticsParameters( + Map parameters, + HiveCommonStatistics statistics) { + HashMap result = new HashMap<>(parameters); + + result.put(StatsSetupConst.NUM_FILES, String.valueOf(statistics.getFileCount())); + result.put(StatsSetupConst.ROW_COUNT, String.valueOf(statistics.getRowCount())); + result.put(StatsSetupConst.TOTAL_SIZE, String.valueOf(statistics.getTotalFileBytes())); + + // CDH 5.16 metastore ignores stats unless STATS_GENERATED_VIA_STATS_TASK is set + // https://github.com/cloudera/hive/blob/cdh5.16.2-release/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java#L227-L231 + if (!parameters.containsKey("STATS_GENERATED_VIA_STATS_TASK")) { + result.put("STATS_GENERATED_VIA_STATS_TASK", "workaround for potential lack of HIVE-12730"); + } + + return result; + } + + public static Partition toMetastoreApiPartition(HivePartitionWithStatistics partitionWithStatistics) { + Partition partition = + toMetastoreApiPartition(partitionWithStatistics.getPartition()); + partition.setParameters(updateStatisticsParameters( + partition.getParameters(), partitionWithStatistics.getStatistics().getCommonStatistics())); + return partition; + } + + public static Partition toMetastoreApiPartition(HivePartition hivePartition) { + Partition result = new Partition(); + result.setDbName(hivePartition.getDbName()); + result.setTableName(hivePartition.getTblName()); + result.setValues(hivePartition.getPartitionValues()); + result.setSd(makeStorageDescriptorFromHivePartition(hivePartition)); + result.setParameters(hivePartition.getParameters()); + return result; + } + + private static StorageDescriptor makeStorageDescriptorFromHivePartition(HivePartition partition) { + SerDeInfo serdeInfo = new SerDeInfo(); + serdeInfo.setName(partition.getTblName()); + serdeInfo.setSerializationLib(partition.getSerde()); + + StorageDescriptor sd = new StorageDescriptor(); + sd.setLocation(Strings.emptyToNull(partition.getPath())); + sd.setCols(partition.getColumns()); + sd.setSerdeInfo(serdeInfo); + sd.setInputFormat(partition.getInputFormat()); + sd.setOutputFormat(partition.getOutputFormat()); + sd.setParameters(ImmutableMap.of()); + + return sd; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java index 10e2b2b04a..798f93a61c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java @@ -23,6 +23,9 @@ import org.apache.doris.fs.remote.RemoteFile; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; /** * File system interface. @@ -46,6 +49,24 @@ public interface FileSystem { Status rename(String origFilePath, String destFilePath); + default void asyncRename(Executor executor, + List> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + List fileNames) { + throw new UnsupportedOperationException("Unsupported operation async rename on current file system."); + } + + default void asyncRenameDir(Executor executor, + List> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + Runnable runWhenPathNotExist) { + throw new UnsupportedOperationException("Unsupported operation async rename dir on current file system."); + } + Status delete(String remotePath); Status makeDir(String remotePath); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index e27e27ddbf..e8c645f3c9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -53,6 +53,9 @@ import java.nio.file.Paths; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; public class DFSFileSystem extends RemoteFileSystem { @@ -192,7 +195,7 @@ public class DFSFileSystem extends RemoteFileSystem { try { currentStreamOffset = fsDataInputStream.getPos(); } catch (IOException e) { - LOG.error("errors while get file pos from output stream", e); + LOG.warn("errors while get file pos from output stream", e); throw new IOException("errors while get file pos from output stream", e); } if (currentStreamOffset != readOffset) { @@ -230,7 +233,7 @@ public class DFSFileSystem extends RemoteFileSystem { } return ByteBuffer.wrap(buf, 0, readLength); } catch (IOException e) { - LOG.error("errors while read data from stream", e); + LOG.warn("errors while read data from stream", e); throw new IOException("errors while read data from stream " + e.getMessage()); } } @@ -261,7 +264,7 @@ public class DFSFileSystem extends RemoteFileSystem { } return Status.OK; } catch (Exception e) { - LOG.error("errors while check path exist " + remotePath, e); + LOG.warn("errors while check path exist " + remotePath, e); return new Status(Status.ErrCode.COMMON_ERROR, "failed to check remote path exist: " + remotePath + ". msg: " + e.getMessage()); } @@ -281,7 +284,7 @@ public class DFSFileSystem extends RemoteFileSystem { try { fsDataOutputStream.writeBytes(content); } catch (IOException e) { - LOG.error("errors while write data to output stream", e); + LOG.warn("errors while write data to output stream", e); status = new Status(Status.ErrCode.COMMON_ERROR, "write exception: " + e.getMessage()); } finally { Status closeStatus = operations.closeWriter(OpParams.of(fsDataOutputStream)); @@ -324,7 +327,7 @@ public class DFSFileSystem extends RemoteFileSystem { try { fsDataOutputStream.write(readBuf, 0, bytesRead); } catch (IOException e) { - LOG.error("errors while write data to output stream", e); + LOG.warn("errors while write data to output stream", e); lastErrMsg = String.format( "failed to write hdfs. current write offset: %d, write length: %d, " + "file length: %d, file: %s, msg: errors while write data to output stream", @@ -377,7 +380,7 @@ public class DFSFileSystem extends RemoteFileSystem { } catch (UserException e) { return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); } catch (IOException e) { - LOG.error("errors while rename path from " + srcPath + " to " + destPath); + LOG.warn("errors while rename path from " + srcPath + " to " + destPath); return new Status(Status.ErrCode.COMMON_ERROR, "failed to rename remote " + srcPath + " to " + destPath + ", msg: " + e.getMessage()); } @@ -385,6 +388,64 @@ public class DFSFileSystem extends RemoteFileSystem { return Status.OK; } + @Override + public void asyncRename( + Executor executor, + List> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + List fileNames) { + + for (String fileName : fileNames) { + Path source = new Path(origFilePath, fileName); + Path target = new Path(destFilePath, fileName); + renameFileFutures.add(CompletableFuture.runAsync(() -> { + if (cancelled.get()) { + return; + } + Status status = rename(source.toString(), target.toString()); + if (!status.ok()) { + throw new RuntimeException(status.getErrMsg()); + } + }, executor)); + } + } + + @Override + public void asyncRenameDir(Executor executor, + List> renameFileFutures, + AtomicBoolean cancelled, + String origFilePath, + String destFilePath, + Runnable runWhenPathNotExist) { + renameFileFutures.add(CompletableFuture.runAsync(() -> { + if (cancelled.get()) { + return; + } + + Status status = exists(destFilePath); + if (status.ok()) { + throw new RuntimeException("Destination directory already exists: " + destFilePath); + } + + String targetParent = new Path(destFilePath).getParent().toString(); + status = exists(targetParent); + if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) { + makeDir(targetParent); + } else if (!status.ok()) { + throw new RuntimeException(status.getErrMsg()); + } + + runWhenPathNotExist.run(); + + status = rename(origFilePath, destFilePath); + if (!status.ok()) { + throw new RuntimeException(status.getErrMsg()); + } + }, executor)); + } + @Override public Status delete(String remotePath) { try { @@ -395,7 +456,7 @@ public class DFSFileSystem extends RemoteFileSystem { } catch (UserException e) { return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); } catch (IOException e) { - LOG.error("errors while delete path " + remotePath); + LOG.warn("errors while delete path " + remotePath); return new Status(Status.ErrCode.COMMON_ERROR, "failed to delete remote path: " + remotePath + ", msg: " + e.getMessage()); } @@ -433,7 +494,7 @@ public class DFSFileSystem extends RemoteFileSystem { LOG.info("file not found: " + e.getMessage()); return new Status(Status.ErrCode.NOT_FOUND, "file not found: " + e.getMessage()); } catch (Exception e) { - LOG.error("errors while get file status ", e); + LOG.warn("errors while get file status ", e); return new Status(Status.ErrCode.COMMON_ERROR, "errors while get file status " + e.getMessage()); } LOG.info("finish list path {}", remotePath); @@ -442,6 +503,16 @@ public class DFSFileSystem extends RemoteFileSystem { @Override public Status makeDir(String remotePath) { - return new Status(Status.ErrCode.COMMON_ERROR, "mkdir is not implemented."); + try { + FileSystem fileSystem = nativeFileSystem(remotePath); + if (!fileSystem.mkdirs(new Path(remotePath))) { + LOG.warn("failed to make dir for " + remotePath); + return new Status(Status.ErrCode.COMMON_ERROR, "failed to make dir for " + remotePath); + } + } catch (Exception e) { + LOG.warn("failed to make dir for " + remotePath); + return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); + } + return Status.OK; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java new file mode 100644 index 0000000000..5f1abf12e6 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/HmsCommitTest.java @@ -0,0 +1,247 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.hive; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.thrift.THiveLocationParams; +import org.apache.doris.thrift.THivePartitionUpdate; +import org.apache.doris.thrift.TUpdateMode; + +import com.google.common.collect.Lists; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +@Ignore +public class HmsCommitTest { + + private static HMSExternalCatalog hmsCatalog; + private static HiveMetadataOps hmsOps; + private static HMSCachedClient hmsClient; + private static final String dbName = "test_db"; + private static final String tbWithPartition = "test_tb_with_partition"; + private static final String tbWithoutPartition = "test_tb_without_partition"; + private static Path warehousePath; + static String dbLocation; + private String inputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"; + private String outputFormat = "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"; + private String serde = "org.apache.hadoop.hive.ql.io.orc.OrcSerde"; + + @BeforeClass + public static void beforeClass() throws Throwable { + warehousePath = Files.createTempDirectory("test_warehouse_"); + dbLocation = "file://" + warehousePath.toAbsolutePath() + "/"; + createTestHiveCatalog(); + createTestHiveDatabase(); + } + + @AfterClass + public static void afterClass() { + hmsClient.dropTable(dbName, tbWithPartition); + hmsClient.dropTable(dbName, tbWithoutPartition); + hmsClient.dropDatabase(dbName); + } + + public static void createTestHiveCatalog() { + Map props = new HashMap<>(); + props.put("type", "hms"); + props.put("hive.metastore.uris", "thrift://127.0.0.1:9083"); + props.put("hadoop.username", "hadoop"); + hmsCatalog = new HMSExternalCatalog(1, "hive_catalog", null, props, "comment"); + hmsCatalog.setInitialized(); + hmsCatalog.initLocalObjectsImpl(); + hmsOps = (HiveMetadataOps) hmsCatalog.getMetadataOps(); + hmsClient = hmsOps.getClient(); + } + + public static void createTestHiveDatabase() { + // create database + HiveDatabaseMetadata dbMetadata = new HiveDatabaseMetadata(); + dbMetadata.setDbName(dbName); + dbMetadata.setLocationUri(dbLocation); + hmsClient.createDatabase(dbMetadata); + } + + @Before + public void before() { + // create table + List columns = new ArrayList<>(); + columns.add(new Column("c1", PrimitiveType.INT, true)); + columns.add(new Column("c2", PrimitiveType.STRING, true)); + List partitionKeys = new ArrayList<>(); + partitionKeys.add(new FieldSchema("c3", "string", "comment")); + HiveTableMetadata tableMetadata = new HiveTableMetadata( + dbName, tbWithPartition, columns, partitionKeys, + new HashMap<>(), inputFormat, outputFormat, serde); + hmsClient.createTable(tableMetadata, true); + HiveTableMetadata tableMetadata2 = new HiveTableMetadata( + dbName, tbWithoutPartition, columns, new ArrayList<>(), + new HashMap<>(), inputFormat, outputFormat, serde); + hmsClient.createTable(tableMetadata2, true); + } + + @After + public void after() { + hmsClient.dropTable(dbName, tbWithoutPartition); + hmsClient.dropTable(dbName, tbWithPartition); + } + + @Test + public void testNewPartitionForUnPartitionedTable() { + List pus = new ArrayList<>(); + pus.add(createRandomNew("a")); + try { + hmsOps.commit(dbName, tbWithoutPartition, pus); + } catch (Exception e) { + Assert.assertEquals("Not support mode:[NEW] in unPartitioned table", e.getMessage()); + } + } + + @Test + public void testAppendPartitionForUnPartitionedTable() { + List pus = new ArrayList<>(); + pus.add(createRandomAppend("")); + pus.add(createRandomAppend("")); + pus.add(createRandomAppend("")); + hmsOps.commit(dbName, tbWithoutPartition, pus); + Table table = hmsClient.getTable(dbName, tbWithoutPartition); + Assert.assertEquals(3, Long.parseLong(table.getParameters().get("numRows"))); + + List pus2 = new ArrayList<>(); + pus2.add(createRandomAppend("")); + pus2.add(createRandomAppend("")); + pus2.add(createRandomAppend("")); + hmsOps.commit(dbName, tbWithoutPartition, pus2); + table = hmsClient.getTable(dbName, tbWithoutPartition); + Assert.assertEquals(6, Long.parseLong(table.getParameters().get("numRows"))); + } + + @Test + public void testOverwritePartitionForUnPartitionedTable() { + // TODO + } + + @Test + public void testNewPartitionForPartitionedTable() { + List pus = new ArrayList<>(); + pus.add(createRandomNew("a")); + pus.add(createRandomNew("a")); + pus.add(createRandomNew("a")); + pus.add(createRandomNew("b")); + pus.add(createRandomNew("b")); + pus.add(createRandomNew("c")); + hmsOps.commit(dbName, tbWithPartition, pus); + + Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a")); + Assert.assertEquals(3, Long.parseLong(pa.getParameters().get("numRows"))); + Partition pb = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("b")); + Assert.assertEquals(2, Long.parseLong(pb.getParameters().get("numRows"))); + Partition pc = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("c")); + Assert.assertEquals(1, Long.parseLong(pc.getParameters().get("numRows"))); + } + + @Test + public void testAppendPartitionForPartitionedTable() { + testNewPartitionForPartitionedTable(); + + List pus = new ArrayList<>(); + pus.add(createRandomAppend("a")); + pus.add(createRandomAppend("a")); + pus.add(createRandomAppend("a")); + pus.add(createRandomAppend("b")); + pus.add(createRandomAppend("b")); + pus.add(createRandomAppend("c")); + hmsOps.commit(dbName, tbWithPartition, pus); + + Partition pa = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("a")); + Assert.assertEquals(6, Long.parseLong(pa.getParameters().get("numRows"))); + Partition pb = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("b")); + Assert.assertEquals(4, Long.parseLong(pb.getParameters().get("numRows"))); + Partition pc = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("c")); + Assert.assertEquals(2, Long.parseLong(pc.getParameters().get("numRows"))); + } + + @Test + public void testNewManyPartitionForPartitionedTable() { + List pus = new ArrayList<>(); + int nums = 150; + for (int i = 0; i < nums; i++) { + pus.add(createRandomNew("" + i)); + } + + hmsOps.commit(dbName, tbWithPartition, pus); + for (int i = 0; i < nums; i++) { + Partition p = hmsClient.getPartition(dbName, tbWithPartition, Lists.newArrayList("" + i)); + Assert.assertEquals(1, Long.parseLong(p.getParameters().get("numRows"))); + } + + try { + hmsOps.commit(dbName, tbWithPartition, pus); + } catch (Exception e) { + Assert.assertTrue(e.getMessage().contains("failed to add partitions")); + } + } + + public THivePartitionUpdate genOnePartitionUpdate(String partitionValue, TUpdateMode mode) { + + String uuid = UUID.randomUUID().toString(); + THiveLocationParams location = new THiveLocationParams(); + String targetPath = dbLocation + uuid; + location.setTargetPath(targetPath); + location.setWritePath(targetPath); + + THivePartitionUpdate pu = new THivePartitionUpdate(); + pu.setName(partitionValue); + pu.setUpdateMode(mode); + pu.setRowCount(1); + pu.setFileSize(1); + pu.setLocation(location); + pu.setFileNames(new ArrayList() { + { + add(targetPath + "/f1"); + add(targetPath + "/f2"); + add(targetPath + "/f3"); + } + }); + return pu; + } + + public THivePartitionUpdate createRandomNew(String partition) { + return genOnePartitionUpdate("c3=" + partition, TUpdateMode.NEW); + } + + public THivePartitionUpdate createRandomAppend(String partition) { + return genOnePartitionUpdate("c3=" + partition, TUpdateMode.APPEND); + } +} diff --git a/fe/pom.xml b/fe/pom.xml index d96d469788..d21c05c7cc 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -349,6 +349,7 @@ under the License. shade-format-flatbuffers 1.12.0 0.8.10 + 202 @@ -1618,6 +1619,11 @@ under the License. ${immutables.version} provided + + io.airlift + concurrent + ${airlift.version} +