[refactor](filesystem)refactor filesystem interface (#33361)

1. Remame`list` to `globList` . The path of this `list` needs to have a wildcard character, and the corresponding hdfs interface is `globStatus`, so the modified name is `globList`.
2. If you only need to view files based on paths, you can use the `listFiles` operation.
3. Merge `listLocatedFiles` function into `listFiles` function.
This commit is contained in:
wuwenchi
2024-04-18 15:57:59 +08:00
committed by yiguolei
parent 34a97d5e8b
commit 3eca9da0dd
14 changed files with 253 additions and 302 deletions

View File

@ -245,7 +245,7 @@ public class Repository implements Writable {
String repoInfoFilePath = assembleRepoInfoFilePath();
// check if the repo is already exist in remote
List<RemoteFile> remoteFiles = Lists.newArrayList();
Status st = fileSystem.list(repoInfoFilePath, remoteFiles);
Status st = fileSystem.globList(repoInfoFilePath, remoteFiles);
if (!st.ok()) {
return st;
}
@ -417,7 +417,7 @@ public class Repository implements Writable {
String listPath = Joiner.on(PATH_DELIMITER).join(location, joinPrefix(PREFIX_REPO, name), PREFIX_SNAPSHOT_DIR)
+ "*";
List<RemoteFile> result = Lists.newArrayList();
Status st = fileSystem.list(listPath, result);
Status st = fileSystem.globList(listPath, result);
if (!st.ok()) {
return st;
}
@ -595,7 +595,7 @@ public class Repository implements Writable {
public Status download(String remoteFilePath, String localFilePath) {
// 0. list to get to full name(with checksum)
List<RemoteFile> remoteFiles = Lists.newArrayList();
Status status = fileSystem.list(remoteFilePath + "*", remoteFiles);
Status status = fileSystem.globList(remoteFilePath + "*", remoteFiles);
if (!status.ok()) {
return status;
}
@ -759,7 +759,7 @@ public class Repository implements Writable {
LOG.debug("assemble infoFilePath: {}, snapshot: {}", infoFilePath, snapshotName);
}
List<RemoteFile> results = Lists.newArrayList();
Status st = fileSystem.list(infoFilePath + "*", results);
Status st = fileSystem.globList(infoFilePath + "*", results);
if (!st.ok()) {
info.add(snapshotName);
info.add(FeConstants.null_string);

View File

@ -88,7 +88,7 @@ public class BrokerUtil {
try {
RemoteFileSystem fileSystem = FileSystemFactory.get(
brokerDesc.getName(), brokerDesc.getStorageType(), brokerDesc.getProperties());
Status st = fileSystem.list(path, rfiles, false);
Status st = fileSystem.globList(path, rfiles, false);
if (!st.ok()) {
throw new UserException(st.getErrMsg());
}

View File

@ -25,6 +25,7 @@ import org.apache.doris.backup.Status;
import org.apache.doris.common.Pair;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.fs.FileSystem;
import org.apache.doris.fs.FileSystemUtil;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.THivePartitionUpdate;
@ -238,6 +239,7 @@ public class HMSTransaction implements Transaction {
hmsCommitter.doCommit();
} catch (Throwable t) {
LOG.warn("Failed to commit for {}.{}, abort it.", dbName, tbName);
hmsCommitter.abort();
hmsCommitter.rollback();
throw t;
} finally {
@ -564,7 +566,7 @@ public class HMSTransaction implements Transaction {
private DeleteRecursivelyResult doRecursiveDeleteFiles(Path directory, boolean deleteEmptyDir) {
List<RemoteFile> allFiles = new ArrayList<>();
Set<String> allDirs = new HashSet<>();
Status statusFile = fs.listFiles(directory.toString(), allFiles);
Status statusFile = fs.listFiles(directory.toString(), true, allFiles);
Status statusDir = fs.listDirectories(directory.toString(), allDirs);
if (!statusFile.ok() || !statusDir.ok()) {
ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder();
@ -1329,7 +1331,7 @@ public class HMSTransaction implements Transaction {
doNothing();
}
public void rollback() {
public void abort() {
cancelUnStartedAsyncFileSystemTask();
undoUpdateStatisticsTasks();
undoAddPartitionsTask();
@ -1337,6 +1339,10 @@ public class HMSTransaction implements Transaction {
runDirectoryClearUpTasksForAbort();
runRenameDirTasksForAbort();
}
public void rollback() {
//delete write path
}
}
public Status wrapperRenameDirWithProfileSummary(String origFilePath,
@ -1366,22 +1372,24 @@ public class HMSTransaction implements Transaction {
}
public void wrapperAsyncRenameWithProfileSummary(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
List<String> fileNames) {
fs.asyncRename(executor, renameFileFutures, cancelled, origFilePath, destFilePath, fileNames);
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
List<String> fileNames) {
FileSystemUtil.asyncRenameFiles(
fs, executor, renameFileFutures, cancelled, origFilePath, destFilePath, fileNames);
summaryProfile.ifPresent(profile -> profile.addRenameFileCnt(fileNames.size()));
}
public void wrapperAsyncRenameDirWithProfileSummary(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
fs.asyncRenameDir(executor, renameFileFutures, cancelled, origFilePath, destFilePath, runWhenPathNotExist);
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
FileSystemUtil.asyncRenameDir(
fs, executor, renameFileFutures, cancelled, origFilePath, destFilePath, runWhenPathNotExist);
summaryProfile.ifPresent(SummaryProfile::incRenameDirCnt);
}
}

View File

@ -40,7 +40,6 @@ import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.fs.FileSystemCache;
import org.apache.doris.fs.RemoteFiles;
import org.apache.doris.fs.remote.RemoteFile;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.doris.metric.GaugeMetric;
@ -80,7 +79,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.FileNotFoundException;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@ -355,16 +353,17 @@ public class HiveMetaStoreCache {
new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
location, bindBrokerName), jobConf, bindBrokerName));
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf));
try {
// For Tez engine, it may generate subdirectoies for "union" query.
// So there may be files and directories in the table directory at the same time. eg:
// /user/hive/warehouse/region_tmp_union_all2/000000_0
// /user/hive/warehouse/region_tmp_union_all2/1
// /user/hive/warehouse/region_tmp_union_all2/2
// So we need to recursively list data location.
// https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, true);
for (RemoteFile remoteFile : locatedFiles.files()) {
// For Tez engine, it may generate subdirectoies for "union" query.
// So there may be files and directories in the table directory at the same time. eg:
// /user/hive/warehouse/region_tmp_union_all2/000000_0
// /user/hive/warehouse/region_tmp_union_all2/1
// /user/hive/warehouse/region_tmp_union_all2/2
// So we need to recursively list data location.
// https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, true, remoteFiles);
if (status.ok()) {
for (RemoteFile remoteFile : remoteFiles) {
String srcPath = remoteFile.getPath().toString();
LocationPath locationPath = new LocationPath(srcPath, catalog.getProperties());
Path convertedPath = locationPath.toScanRangeLocation();
@ -373,15 +372,13 @@ public class HiveMetaStoreCache {
}
result.addFile(remoteFile);
}
} catch (Exception e) {
} else if (status.getErrCode().equals(ErrCode.NOT_FOUND)) {
// User may manually remove partition under HDFS, in this case,
// Hive doesn't aware that the removed partition is missing.
// Here is to support this case without throw an exception.
if (e.getCause() instanceof FileNotFoundException) {
LOG.warn(String.format("File %s not exist.", location));
} else {
throw e;
}
LOG.warn(String.format("File %s not exist.", location));
} else {
throw new RuntimeException(status.getErrMsg());
}
// Must copy the partitionValues to avoid concurrent modification of key and value
result.setPartitionValues(Lists.newArrayList(partitionValues));
@ -807,17 +804,22 @@ public class HiveMetaStoreCache {
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location, bindBrokerName),
jobConf, bindBrokerName));
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
if (delta.isDeleteDelta()) {
List<String> deleteDeltaFileNames = locatedFiles.files().stream().map(f -> f.getName()).filter(
name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.collect(Collectors.toList());
deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames));
continue;
}
locatedFiles.files().stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
if (delta.isDeleteDelta()) {
List<String> deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter(
name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.collect(Collectors.toList());
deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames));
continue;
}
remoteFiles.stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(fileCacheValue::addFile);
} else {
throw new RuntimeException(status.getErrMsg());
}
}
// base
@ -827,10 +829,15 @@ public class HiveMetaStoreCache {
new FileSystemCache.FileSystemCacheKey(
LocationPath.getFSIdentity(location, bindBrokerName),
jobConf, bindBrokerName));
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
locatedFiles.files().stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(fileCacheValue::addFile);
List<RemoteFile> remoteFiles = new ArrayList<>();
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
remoteFiles.stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(fileCacheValue::addFile);
} else {
throw new RuntimeException(status.getErrMsg());
}
}
fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas));
fileCacheValues.add(fileCacheValue);

View File

@ -18,15 +18,11 @@
package org.apache.doris.fs;
import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.fs.remote.RemoteFile;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* File system interface.
@ -50,49 +46,44 @@ public interface FileSystem {
Status rename(String origFilePath, String destFilePath);
default Status renameDir(String origFilePath, String destFilePath) {
return renameDir(origFilePath, destFilePath, () -> {});
}
default Status renameDir(String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
throw new UnsupportedOperationException("Unsupported operation rename dir on current file system.");
}
default void asyncRename(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
List<String> fileNames) {
throw new UnsupportedOperationException("Unsupported operation async rename on current file system.");
}
default void asyncRenameDir(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
throw new UnsupportedOperationException("Unsupported operation async rename dir on current file system.");
Runnable runWhenPathNotExist) {
throw new UnsupportedOperationException("Unsupported operation rename dir on current file system.");
}
Status delete(String remotePath);
Status makeDir(String remotePath);
RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException;
Status listFiles(String remotePath, boolean recursive, List<RemoteFile> result);
// List files in remotePath
// The remote file name will only contain file name only(Not full path)
default Status list(String remotePath, List<RemoteFile> result) {
return list(remotePath, result, true);
/**
* List files in remotePath by wildcard <br/>
* The {@link RemoteFile}'name will only contain file name (Not full path)
* @param remotePath remote path
* @param result All eligible files under the path
* @return
*/
default Status globList(String remotePath, List<RemoteFile> result) {
return globList(remotePath, result, true);
}
Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly);
default Status listFiles(String remotePath, List<RemoteFile> result) {
throw new UnsupportedOperationException("Unsupported operation list files on current file system.");
}
/**
* List files in remotePath by wildcard <br/>
* @param remotePath remote path
* @param result All eligible files under the path
* @param fileNameOnly for {@link RemoteFile}'name: whether the full path is included.<br/>
* true: only contains file name, false: contains full path<br/>
* @return
*/
Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly);
default Status listDirectories(String remotePath, Set<String> result) {
throw new UnsupportedOperationException("Unsupported operation list directores on current file system.");
throw new UnsupportedOperationException("Unsupported operation list directories on current file system.");
}
}

View File

@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs;
import org.apache.doris.backup.Status;
import org.apache.hadoop.fs.Path;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
public class FileSystemUtil {
public static void asyncRenameFiles(FileSystem fs,
Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
List<String> fileNames) {
for (String fileName : fileNames) {
Path source = new Path(origFilePath, fileName);
Path target = new Path(destFilePath, fileName);
renameFileFutures.add(CompletableFuture.runAsync(() -> {
if (cancelled.get()) {
return;
}
Status status = fs.rename(source.toString(), target.toString());
if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}
}, executor));
}
}
public static void asyncRenameDir(FileSystem fs,
Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
renameFileFutures.add(CompletableFuture.runAsync(() -> {
if (cancelled.get()) {
return;
}
Status status = fs.renameDir(origFilePath, destFilePath, runWhenPathNotExist);
if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}
}, executor));
}
}

View File

@ -18,7 +18,6 @@
package org.apache.doris.fs;
import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.fs.remote.RemoteFile;
import com.google.common.collect.ImmutableSet;
@ -30,14 +29,12 @@ import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
public class LocalDfsFileSystem implements FileSystem {
@ -111,46 +108,6 @@ public class LocalDfsFileSystem implements FileSystem {
return rename(origFilePath, destFilePath);
}
@Override
public void asyncRename(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
List<String> fileNames) {
for (String fileName : fileNames) {
Path source = new Path(origFilePath, fileName);
Path target = new Path(destFilePath, fileName);
renameFileFutures.add(CompletableFuture.runAsync(() -> {
if (cancelled.get()) {
return;
}
Status status = rename(source.toString(), target.toString());
if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}
}, executor));
}
}
@Override
public void asyncRenameDir(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
renameFileFutures.add(CompletableFuture.runAsync(() -> {
if (cancelled.get()) {
return;
}
Status status = renameDir(origFilePath, destFilePath, runWhenPathNotExist);
if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}
}, executor));
}
@Override
public Status delete(String remotePath) {
try {
@ -172,14 +129,9 @@ public class LocalDfsFileSystem implements FileSystem {
}
@Override
public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException {
return null;
}
@Override
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
try {
FileStatus[] locatedFileStatusRemoteIterator = fs.listStatus(new Path(remotePath));
FileStatus[] locatedFileStatusRemoteIterator = fs.globStatus(new Path(remotePath));
if (locatedFileStatusRemoteIterator == null) {
return Status.OK;
}
@ -197,23 +149,20 @@ public class LocalDfsFileSystem implements FileSystem {
}
@Override
public Status listFiles(String remotePath, List<RemoteFile> result) {
RemoteIterator<LocatedFileStatus> iterator;
public Status listFiles(String remotePath, boolean recursive, List<RemoteFile> result) {
try {
Path dirPath = new Path(remotePath);
iterator = fs.listFiles(dirPath, true);
while (iterator.hasNext()) {
LocatedFileStatus next = iterator.next();
String location = next.getPath().toString();
String child = location.substring(dirPath.toString().length());
while (child.startsWith("/")) {
child = child.substring(1);
}
if (!child.contains("/")) {
result.add(new RemoteFile(location, next.isFile(), next.getLen(), next.getBlockSize()));
}
Path locatedPath = new Path(remotePath);
RemoteIterator<LocatedFileStatus> locatedFiles = fs.listFiles(locatedPath, recursive);
while (locatedFiles.hasNext()) {
LocatedFileStatus fileStatus = locatedFiles.next();
RemoteFile location = new RemoteFile(
fileStatus.getPath(), fileStatus.isDirectory(), fileStatus.getLen(),
fileStatus.getBlockSize(), fileStatus.getModificationTime(), fileStatus.getBlockLocations());
result.add(location);
}
} catch (IOException e) {
} catch (FileNotFoundException e) {
return new Status(Status.ErrCode.NOT_FOUND, e.getMessage());
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
return Status.OK;

View File

@ -27,7 +27,6 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.fs.RemoteFiles;
import org.apache.doris.fs.operations.BrokerFileOperations;
import org.apache.doris.fs.operations.OpParams;
import org.apache.doris.service.FrontendOptions;
@ -69,7 +68,6 @@ import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@ -568,11 +566,11 @@ public class BrokerFileSystem extends RemoteFileSystem {
}
@Override
public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException {
public Status listFiles(String remotePath, boolean recursive, List<RemoteFile> result) {
// get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
throw new UserException("failed to get broker client");
return new Status(Status.ErrCode.BAD_CONNECTION, "failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
@ -582,14 +580,14 @@ public class BrokerFileSystem extends RemoteFileSystem {
try {
TBrokerListPathRequest req = new TBrokerListPathRequest(TBrokerVersion.VERSION_ONE, remotePath,
recursive, properties);
req.setOnlyFiles(onlyFiles);
req.setOnlyFiles(true);
TBrokerListResponse response = client.listLocatedFiles(req);
TBrokerOperationStatus operationStatus = response.getOpStatus();
if (operationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
throw new UserException("failed to listLocatedFiles, remote path: " + remotePath + ". msg: "
+ operationStatus.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address));
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to listLocatedFiles, remote path: " + remotePath + ". msg: "
+ operationStatus.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address));
}
List<RemoteFile> result = new ArrayList<>();
List<TBrokerFileStatus> fileStatus = response.getFiles();
for (TBrokerFileStatus tFile : fileStatus) {
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(tFile.path);
@ -598,10 +596,10 @@ public class BrokerFileSystem extends RemoteFileSystem {
result.add(file);
}
LOG.info("finished to listLocatedFiles, remote path {}. get files: {}", remotePath, result);
return new RemoteFiles(result);
return Status.OK;
} catch (TException e) {
needReturn = false;
throw new UserException("failed to listLocatedFiles, remote path: "
return new Status(Status.ErrCode.COMMON_ERROR, "failed to listLocatedFiles, remote path: "
+ remotePath + ". msg: " + e.getMessage() + ", broker: " + BrokerUtil.printBroker(name, address));
} finally {
if (needReturn) {
@ -651,7 +649,7 @@ public class BrokerFileSystem extends RemoteFileSystem {
// List files in remotePath
@Override
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
// get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {

View File

@ -18,17 +18,21 @@
package org.apache.doris.fs.remote;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.fs.PersistentFileSystem;
import org.apache.doris.fs.RemoteFiles;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import java.io.IOException;
import java.util.ArrayList;
import java.io.FileNotFoundException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
public abstract class RemoteFileSystem extends PersistentFileSystem {
// this field will be visited by multi-threads, better use volatile qualifier
@ -43,26 +47,62 @@ public abstract class RemoteFileSystem extends PersistentFileSystem {
}
@Override
public RemoteFiles listLocatedFiles(String remotePath, boolean onlyFiles, boolean recursive) throws UserException {
org.apache.hadoop.fs.FileSystem fileSystem = nativeFileSystem(remotePath);
public Status listFiles(String remotePath, boolean recursive, List<RemoteFile> result) {
try {
org.apache.hadoop.fs.FileSystem fileSystem = nativeFileSystem(remotePath);
Path locatedPath = new Path(remotePath);
RemoteIterator<LocatedFileStatus> locatedFiles = onlyFiles ? fileSystem.listFiles(locatedPath, recursive)
: fileSystem.listLocatedStatus(locatedPath);
return getFileLocations(locatedFiles);
} catch (IOException e) {
throw new UserException("Failed to list located status for path: " + remotePath, e);
RemoteIterator<LocatedFileStatus> locatedFiles = fileSystem.listFiles(locatedPath, recursive);
while (locatedFiles.hasNext()) {
LocatedFileStatus fileStatus = locatedFiles.next();
RemoteFile location = new RemoteFile(
fileStatus.getPath(), fileStatus.isDirectory(), fileStatus.getLen(),
fileStatus.getBlockSize(), fileStatus.getModificationTime(), fileStatus.getBlockLocations());
result.add(location);
}
} catch (FileNotFoundException e) {
return new Status(Status.ErrCode.NOT_FOUND, e.getMessage());
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
return Status.OK;
}
private RemoteFiles getFileLocations(RemoteIterator<LocatedFileStatus> locatedFiles) throws IOException {
List<RemoteFile> locations = new ArrayList<>();
while (locatedFiles.hasNext()) {
LocatedFileStatus fileStatus = locatedFiles.next();
RemoteFile location = new RemoteFile(fileStatus.getPath(), fileStatus.isDirectory(), fileStatus.getLen(),
fileStatus.getBlockSize(), fileStatus.getModificationTime(), fileStatus.getBlockLocations());
locations.add(location);
@Override
public Status listDirectories(String remotePath, Set<String> result) {
try {
FileSystem fileSystem = nativeFileSystem(remotePath);
FileStatus[] fileStatuses = fileSystem.listStatus(new Path(remotePath));
result.addAll(
Arrays.stream(fileStatuses)
.filter(FileStatus::isDirectory)
.map(file -> file.getPath().toString() + "/")
.collect(ImmutableSet.toImmutableSet()));
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
return new RemoteFiles(locations);
return Status.OK;
}
@Override
public Status renameDir(String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
Status status = exists(destFilePath);
if (status.ok()) {
return new Status(Status.ErrCode.COMMON_ERROR, "Destination directory already exists: " + destFilePath);
}
String targetParent = new Path(destFilePath).getParent().toString();
status = exists(targetParent);
if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
status = makeDir(targetParent);
}
if (!status.ok()) {
return new Status(Status.ErrCode.COMMON_ERROR, status.getErrMsg());
}
runWhenPathNotExist.run();
return rename(origFilePath, destFilePath);
}
}

View File

@ -72,7 +72,7 @@ public class S3FileSystem extends ObjFileSystem {
// broker file pattern glob is too complex, so we use hadoop directly
@Override
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
try {
FileSystem s3AFileSystem = nativeFileSystem(remotePath);
Path pathPattern = new Path(remotePath);

View File

@ -31,15 +31,12 @@ import org.apache.doris.fs.remote.RemoteFileSystem;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -54,14 +51,9 @@ import java.nio.ByteBuffer;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
public class DFSFileSystem extends RemoteFileSystem {
@ -395,70 +387,6 @@ public class DFSFileSystem extends RemoteFileSystem {
return Status.OK;
}
@Override
public void asyncRename(
Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
List<String> fileNames) {
for (String fileName : fileNames) {
Path source = new Path(origFilePath, fileName);
Path target = new Path(destFilePath, fileName);
renameFileFutures.add(CompletableFuture.runAsync(() -> {
if (cancelled.get()) {
return;
}
Status status = rename(source.toString(), target.toString());
if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}
}, executor));
}
}
public Status renameDir(String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
Status status = exists(destFilePath);
if (status.ok()) {
throw new RuntimeException("Destination directory already exists: " + destFilePath);
}
String targetParent = new Path(destFilePath).getParent().toString();
status = exists(targetParent);
if (Status.ErrCode.NOT_FOUND.equals(status.getErrCode())) {
status = makeDir(targetParent);
}
if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}
runWhenPathNotExist.run();
return rename(origFilePath, destFilePath);
}
@Override
public void asyncRenameDir(Executor executor,
List<CompletableFuture<?>> renameFileFutures,
AtomicBoolean cancelled,
String origFilePath,
String destFilePath,
Runnable runWhenPathNotExist) {
renameFileFutures.add(CompletableFuture.runAsync(() -> {
if (cancelled.get()) {
return;
}
Status status = renameDir(origFilePath, destFilePath, runWhenPathNotExist);
if (!status.ok()) {
throw new RuntimeException(status.getErrMsg());
}
}, executor));
}
@Override
public Status delete(String remotePath) {
try {
@ -486,7 +414,7 @@ public class DFSFileSystem extends RemoteFileSystem {
* @return Status.OK if success.
*/
@Override
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
try {
URI pathUri = URI.create(remotePath);
FileSystem fileSystem = nativeFileSystem(remotePath);
@ -528,44 +456,4 @@ public class DFSFileSystem extends RemoteFileSystem {
}
return Status.OK;
}
@Override
public Status listFiles(String remotePath, List<RemoteFile> result) {
RemoteIterator<LocatedFileStatus> iterator;
try {
FileSystem fileSystem = nativeFileSystem(remotePath);
Path dirPath = new Path(remotePath);
iterator = fileSystem.listFiles(dirPath, true);
while (iterator.hasNext()) {
LocatedFileStatus next = iterator.next();
String location = next.getPath().toString();
String child = location.substring(dirPath.toString().length());
while (child.startsWith("/")) {
child = child.substring(1);
}
if (!child.contains("/")) {
result.add(new RemoteFile(location, next.isFile(), next.getLen(), next.getBlockSize()));
}
}
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
return Status.OK;
}
@Override
public Status listDirectories(String remotePath, Set<String> result) {
try {
FileSystem fileSystem = nativeFileSystem(remotePath);
FileStatus[] fileStatuses = fileSystem.listStatus(new Path(remotePath));
result.addAll(
Arrays.stream(fileStatuses)
.filter(FileStatus::isDirectory)
.map(file -> file.getPath().toString() + "/")
.collect(ImmutableSet.toImmutableSet()));
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
return Status.OK;
}
}

View File

@ -176,7 +176,7 @@ public class BrokerStorageTest {
Assert.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".1"));
Assert.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".2"));
Assert.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".3"));
Assert.assertEquals(Status.OK, fileSystem.list(bucket + basePath + "_list/*", result));
Assert.assertEquals(Status.OK, fileSystem.globList(bucket + basePath + "_list/*", result));
Assert.assertEquals(3, result.size());
}

View File

@ -106,7 +106,7 @@ public class RepositoryTest {
public void testInit() {
new Expectations() {
{
fileSystem.list(anyString, (List<RemoteFile>) any);
fileSystem.globList(anyString, (List<RemoteFile>) any);
minTimes = 0;
result = new Delegate<Status>() {
public Status list(String remotePath, List<RemoteFile> result) {
@ -180,7 +180,7 @@ public class RepositoryTest {
public void testListSnapshots() {
new Expectations() {
{
fileSystem.list(anyString, (List<RemoteFile>) any);
fileSystem.globList(anyString, (List<RemoteFile>) any);
minTimes = 0;
result = new Delegate() {
public Status list(String remotePath, List<RemoteFile> result) {
@ -250,7 +250,7 @@ public class RepositoryTest {
new Expectations() {
{
fileSystem.list(anyString, (List<RemoteFile>) any);
fileSystem.globList(anyString, (List<RemoteFile>) any);
minTimes = 0;
result = new Delegate() {
public Status list(String remotePath, List<RemoteFile> result) {
@ -285,7 +285,7 @@ public class RepositoryTest {
public void testGetSnapshotInfo() {
new Expectations() {
{
fileSystem.list(anyString, (List<RemoteFile>) any);
fileSystem.globList(anyString, (List<RemoteFile>) any);
minTimes = 0;
result = new Delegate() {
public Status list(String remotePath, List<RemoteFile> result) {

View File

@ -108,7 +108,7 @@ public class S3FileSystemTest {
fileSystem = new S3FileSystem(mockedStorage);
new MockUp<S3FileSystem>(S3FileSystem.class) {
@Mock
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
public Status globList(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
try {
S3URI uri = S3URI.create(remotePath, false);
ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder().bucket(uri.getBucket());
@ -225,7 +225,7 @@ public class S3FileSystemTest {
Assertions.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".1"));
Assertions.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".2"));
Assertions.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".3"));
Assertions.assertEquals(Status.OK, fileSystem.list(bucket + basePath + "_list/*", result));
Assertions.assertEquals(Status.OK, fileSystem.globList(bucket + basePath + "_list/*", result));
Assertions.assertEquals(3, result.size());
}