diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java index 1b05d7e11f..8df6f1ddc0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java @@ -23,7 +23,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.datasource.property.S3ClientBEProperties; import org.apache.doris.datasource.property.constants.BosProperties; -import org.apache.doris.fs.obj.BlobStorage; +import org.apache.doris.fs.PersistentFileSystem; import org.apache.doris.thrift.TFileType; import com.google.common.collect.Maps; @@ -134,7 +134,7 @@ public class BrokerDesc extends StorageDesc implements Writable { @Override public void write(DataOutput out) throws IOException { Text.writeString(out, name); - properties.put(BlobStorage.STORAGE_TYPE, storageType.name()); + properties.put(PersistentFileSystem.STORAGE_TYPE, storageType.name()); out.writeInt(properties.size()); for (Map.Entry entry : properties.entrySet()) { Text.writeString(out, entry.getKey()); @@ -152,7 +152,7 @@ public class BrokerDesc extends StorageDesc implements Writable { properties.put(key, val); } StorageBackend.StorageType st = StorageBackend.StorageType.BROKER; - String typeStr = properties.remove(BlobStorage.STORAGE_TYPE); + String typeStr = properties.remove(PersistentFileSystem.STORAGE_TYPE); if (typeStr != null) { try { st = StorageBackend.StorageType.valueOf(typeStr); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java index fc90fa053e..6427123037 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java @@ -24,7 +24,7 @@ import org.apache.doris.common.util.PrintableMap; import org.apache.doris.datasource.credentials.CloudCredentialWithEndpoint; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.S3Properties; -import org.apache.doris.fs.obj.S3Storage; +import org.apache.doris.fs.remote.S3FileSystem; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -128,17 +128,17 @@ public class S3Resource extends Resource { propertiesPing.put(S3Properties.Env.REGION, credential.getRegion()); propertiesPing.put(PropertyConverter.USE_PATH_STYLE, "false"); properties.putAll(propertiesPing); - S3Storage storage = new S3Storage(properties); + S3FileSystem fileSystem = new S3FileSystem(properties); String testFile = bucket + rootPath + "/test-object-valid.txt"; String content = "doris will be better"; try { - Status status = storage.directUpload(content, testFile); + Status status = fileSystem.directUpload(content, testFile); if (status != Status.OK) { LOG.warn("ping update file status: {}, properties: {}", status, propertiesPing); return false; } } finally { - Status delete = storage.delete(testFile); + Status delete = fileSystem.delete(testFile); if (delete != Status.OK) { LOG.warn("ping delete file status: {}, properties: {}", delete, propertiesPing); return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java index 122e02c95a..74602c1d70 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java @@ -26,6 +26,15 @@ import org.apache.hadoop.fs.RemoteIterator; import java.util.List; +/** + * File system interface. + * All file operations should use DFSFileSystem. + * @see org.apache.doris.fs.remote.dfs.DFSFileSystem + * If the file system use the object storage's SDK, use ObjStorage + * @see org.apache.doris.fs.remote.ObjFileSystem + * Read and Write operation put in FileOperations + * @see org.apache.doris.fs.operations.FileOperations + */ public interface FileSystem { Status exists(String remotePath); @@ -39,15 +48,18 @@ public interface FileSystem { Status delete(String remotePath); + Status makeDir(String remotePath); + default RemoteIterator listLocatedStatus(String remotePath) throws UserException { throw new UserException("Not support to listLocatedStatus."); } // List files in remotePath // The remote file name will only contains file name only(Not full path) - Status list(String remotePath, List result); + default Status list(String remotePath, List result) { + return list(remotePath, result, true); + } Status list(String remotePath, List result, boolean fileNameOnly); - Status makeDir(String remotePath); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java new file mode 100644 index 0000000000..0f59de3458 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystemFactory.java @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.fs.remote.BrokerFileSystem; +import org.apache.doris.fs.remote.S3FileSystem; +import org.apache.doris.fs.remote.dfs.DFSFileSystem; +import org.apache.doris.fs.remote.dfs.JFSFileSystem; +import org.apache.doris.fs.remote.dfs.OFSFileSystem; + +import java.util.Map; + +public class FileSystemFactory { + + public static FileSystem get(StorageBackend.StorageType type, Map properties) { + // use for test + return get(type.name(), type, properties); + } + + public static FileSystem get(String name, StorageBackend.StorageType type, Map properties) { + // TODO: rename StorageBackend.StorageType + if (type == StorageBackend.StorageType.S3) { + return new S3FileSystem(properties); + } else if (type == StorageBackend.StorageType.HDFS || type == StorageBackend.StorageType.GFS) { + return new DFSFileSystem(properties); + } else if (type == StorageBackend.StorageType.OFS) { + return new OFSFileSystem(properties); + } else if (type == StorageBackend.StorageType.JFS) { + return new JFSFileSystem(properties); + } else if (type == StorageBackend.StorageType.BROKER) { + return new BrokerFileSystem(name, properties); + } else { + throw new UnsupportedOperationException(type.toString() + "backend is not implemented"); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java index 5e6aa0b4c4..eda99d0ccb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java @@ -17,5 +17,49 @@ package org.apache.doris.fs; -public class LocalFileSystem { +import org.apache.doris.backup.RemoteFile; +import org.apache.doris.backup.Status; + +import java.util.List; + +public class LocalFileSystem implements FileSystem { + @Override + public Status exists(String remotePath) { + return null; + } + + @Override + public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) { + return null; + } + + @Override + public Status upload(String localPath, String remotePath) { + return null; + } + + @Override + public Status directUpload(String content, String remoteFile) { + return null; + } + + @Override + public Status rename(String origFilePath, String destFilePath) { + return null; + } + + @Override + public Status delete(String remotePath) { + return null; + } + + @Override + public Status makeDir(String remotePath) { + return null; + } + + @Override + public Status list(String remotePath, List result, boolean fileNameOnly) { + return null; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/PersistentFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/PersistentFileSystem.java new file mode 100644 index 0000000000..3bb047ef1b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/PersistentFileSystem.java @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.io.Writable; + +import com.google.common.collect.Maps; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +/** + * Use for persistence, Repository will persist properties of file system. + */ +public abstract class PersistentFileSystem implements FileSystem, Writable { + public static final String STORAGE_TYPE = "_DORIS_STORAGE_TYPE_"; + protected Map properties = Maps.newHashMap(); + protected String name; + protected StorageBackend.StorageType type; + + public PersistentFileSystem(String name, StorageBackend.StorageType type) { + this.name = name; + this.type = type; + } + + public Map getProperties() { + return properties; + } + + public StorageBackend.StorageType getStorageType() { + return type; + } + + /** + * + * @param in persisted data + * @return file systerm + */ + public static FileSystem read(DataInput in) throws IOException { + String name = Text.readString(in); + Map properties = Maps.newHashMap(); + StorageBackend.StorageType type = StorageBackend.StorageType.BROKER; + int size = in.readInt(); + for (int i = 0; i < size; i++) { + String key = Text.readString(in); + String value = Text.readString(in); + properties.put(key, value); + } + if (properties.containsKey(STORAGE_TYPE)) { + type = StorageBackend.StorageType.valueOf(properties.get(STORAGE_TYPE)); + properties.remove(STORAGE_TYPE); + } + return FileSystemFactory.get(name, type, properties); + } + + public void write(DataOutput out) throws IOException { + // must write type first + Text.writeString(out, name); + properties.put(STORAGE_TYPE, type.name()); + out.writeInt(properties.size()); + for (Map.Entry entry : properties.entrySet()) { + Text.writeString(out, entry.getKey()); + Text.writeString(out, entry.getValue()); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/BlobStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/BlobStorage.java index 5a40d42d07..3799410ad6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/BlobStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/BlobStorage.java @@ -37,13 +37,17 @@ import java.io.IOException; import java.util.List; import java.util.Map; +/** + * @see org.apache.doris.fs.PersistentFileSystem + * @see org.apache.doris.fs.FileSystemFactory + */ +@Deprecated public abstract class BlobStorage implements Writable { public static final String STORAGE_TYPE = "_DORIS_STORAGE_TYPE_"; private Map properties = Maps.newHashMap(); private String name; private StorageBackend.StorageType type; - private String location; public static String clientId() { return FrontendOptions.getLocalHostAddress() + ":" + Config.edit_log_port; @@ -88,14 +92,6 @@ public abstract class BlobStorage implements Writable { return BlobStorage.create(name, type, properties); } - public String getLocation() { - return location; - } - - public void setLocation(String location) { - this.location = location; - } - public String getName() { return name; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/BrokerStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/BrokerStorage.java index 9fae34f62e..e182662c6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/BrokerStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/BrokerStorage.java @@ -74,6 +74,10 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +/** + * @see org.apache.doris.fs.remote.BrokerFileSystem + */ +@Deprecated public class BrokerStorage extends BlobStorage { private static final Logger LOG = LogManager.getLogger(BrokerStorage.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/HdfsStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/HdfsStorage.java index 79a65db27b..39c03dee41 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/HdfsStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/HdfsStorage.java @@ -56,8 +56,9 @@ import java.util.Map; /** * HdfsStorage encapsulate interfaces accessing HDFS directly. - * + * @see org.apache.doris.fs.remote.dfs.DFSFileSystem */ +@Deprecated public class HdfsStorage extends BlobStorage { private static final Logger LOG = LogManager.getLogger(HdfsStorage.class); private final Map hdfsProperties; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/ObjStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/ObjStorage.java new file mode 100644 index 0000000000..31f9c065cc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/ObjStorage.java @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.obj; + +import org.apache.doris.backup.Status; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; + +import org.apache.commons.lang3.tuple.Triple; +import org.jetbrains.annotations.Nullable; +import software.amazon.awssdk.core.sync.RequestBody; + +import java.io.File; + +/** + * It is just used for reading remote object storage on cloud. + * @param cloud SDK Client + */ +public interface ObjStorage { + C getClient(String bucket) throws UserException; + + Triple getStsToken() throws DdlException; + + Status headObject(String remotePath); + + Status getObject(String remoteFilePath, File localFile); + + Status putObject(String remotePath, @Nullable RequestBody requestBody); + + Status deleteObject(String remotePath); + + Status copyObject(String origFilePath, String destFilePath); + + RemoteObjects listObjects(String remotePath, String continuationToken) throws DdlException; + + default String normalizePrefix(String prefix) { + return prefix.isEmpty() ? "" : (prefix.endsWith("/") ? prefix : String.format("%s/", prefix)); + } + + default String getRelativePath(String prefix, String key) throws DdlException { + String expectedPrefix = normalizePrefix(prefix); + if (!key.startsWith(expectedPrefix)) { + throw new DdlException( + "List a object whose key: " + key + " does not start with object prefix: " + expectedPrefix); + } + return key.substring(expectedPrefix.length()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/RemoteObject.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/RemoteObject.java new file mode 100644 index 0000000000..6f8077d666 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/RemoteObject.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.obj; + +public class RemoteObject { + private final String key; + private final String relativePath; + private final String etag; + private final long size; + + public RemoteObject(String key, String relativePath, String etag, long size) { + this.key = key; + this.relativePath = relativePath; + this.etag = etag; + this.size = size; + } + + public String getKey() { + return key; + } + + public String getRelativePath() { + return relativePath; + } + + public String getEtag() { + return etag; + } + + public long getSize() { + return size; + } + + @Override + public String toString() { + return "RemoteObject{" + "key='" + key + '\'' + ", relativePath='" + relativePath + '\'' + ", etag='" + etag + + '\'' + ", size=" + size + '}'; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/RemoteObjects.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/RemoteObjects.java new file mode 100644 index 0000000000..ad7a7a4532 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/RemoteObjects.java @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.obj; + +import java.util.List; + +public class RemoteObjects { + private final List objectList; + + private final boolean isTruncated; + + private final String continuationToken; + + public RemoteObjects(List objectList, boolean isTruncated, String continuationToken) { + this.objectList = objectList; + this.isTruncated = isTruncated; + this.continuationToken = continuationToken; + } + + public List getObjectList() { + return objectList; + } + + public boolean isTruncated() { + return isTruncated; + } + + public String getContinuationToken() { + return continuationToken; + } + + @Override + public String toString() { + return "RemoteObjects{" + "objectList=" + objectList + ", isTruncated=" + isTruncated + + ", continuationToken='" + continuationToken + '\'' + '}'; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java new file mode 100644 index 0000000000..7c661f0588 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3ObjStorage.java @@ -0,0 +1,306 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.obj; + +import org.apache.doris.backup.Status; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.S3URI; +import org.apache.doris.datasource.property.PropertyConverter; +import org.apache.doris.datasource.property.constants.S3Properties; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Triple; +import org.apache.http.HttpStatus; +import org.apache.http.client.utils.URIBuilder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.jetbrains.annotations.Nullable; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.auth.signer.AwsS3V4Signer; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.S3Object; + +import java.io.File; +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +public class S3ObjStorage implements ObjStorage { + private static final Logger LOG = LogManager.getLogger(S3ObjStorage.class); + private S3Client client; + + protected Map properties; + + // false: the s3 client will automatically convert endpoint to virtual-hosted style, eg: + // endpoint: http://s3.us-east-2.amazonaws.com + // bucket/path: my_bucket/file.txt + // auto convert: http://my_bucket.s3.us-east-2.amazonaws.com/file.txt + // true: the s3 client will NOT automatically convert endpoint to virtual-hosted style, we need to do some tricks: + // endpoint: http://cos.ap-beijing.myqcloud.com + // bucket/path: my_bucket/file.txt + // convert manually: See S3URI() + private boolean forceHostedStyle = false; + + public S3ObjStorage(Map properties) { + this.properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + setProperties(properties); + } + + private void setProperties(Map properties) { + this.properties.putAll(properties); + try { + S3Properties.requiredS3Properties(this.properties); + } catch (DdlException e) { + throw new IllegalArgumentException(e); + } + // Virtual hosted-style is recommended in the s3 protocol. + // The path-style has been abandoned, but for some unexplainable reasons, + // the s3 client will determine whether the endpiont starts with `s3` + // when generating a virtual hosted-sytle request. + // If not, it will not be converted ( https://github.com/aws/aws-sdk-java-v2/pull/763), + // but the endpoints of many cloud service providers for object storage do not start with s3, + // so they cannot be converted to virtual hosted-sytle. + // Some of them, such as aliyun's oss, only support virtual hosted-style, + // and some of them(ceph) may only support + // path-style, so we need to do some additional conversion. + // + // use_path_style | !use_path_style + // S3 forceHostedStyle=false | forceHostedStyle=false + // !S3 forceHostedStyle=false | forceHostedStyle=true + // + // That is, for S3 endpoint, ignore the `use_path_style` property, and the s3 client will automatically use + // virtual hosted-sytle. + // And for other endpoint, if `use_path_style` is true, use path style. Otherwise, use virtual hosted-sytle. + if (!this.properties.get(S3Properties.ENDPOINT).toLowerCase().contains(S3Properties.S3_PREFIX)) { + forceHostedStyle = !this.properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false") + .equalsIgnoreCase("true"); + } else { + forceHostedStyle = false; + } + } + + @Override + public S3Client getClient(String bucket) throws UserException { + if (client == null) { + URI tmpEndpoint = URI.create(properties.get(S3Properties.ENDPOINT)); + StaticCredentialsProvider scp; + if (!properties.containsKey(S3Properties.SESSION_TOKEN)) { + AwsBasicCredentials awsBasic = AwsBasicCredentials.create( + properties.get(S3Properties.ACCESS_KEY), + properties.get(S3Properties.SECRET_KEY)); + scp = StaticCredentialsProvider.create(awsBasic); + } else { + AwsSessionCredentials awsSession = AwsSessionCredentials.create( + properties.get(S3Properties.ACCESS_KEY), + properties.get(S3Properties.SECRET_KEY), + properties.get(S3Properties.SESSION_TOKEN)); + scp = StaticCredentialsProvider.create(awsSession); + } + EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy + .builder() + .baseDelay(Duration.ofSeconds(1)) + .maxBackoffTime(Duration.ofMinutes(1)) + .build(); + // retry 3 time with Equal backoff + RetryPolicy retryPolicy = RetryPolicy + .builder() + .numRetries(3) + .backoffStrategy(backoffStrategy) + .build(); + ClientOverrideConfiguration clientConf = ClientOverrideConfiguration + .builder() + // set retry policy + .retryPolicy(retryPolicy) + // using AwsS3V4Signer + .putAdvancedOption(SdkAdvancedClientOption.SIGNER, AwsS3V4Signer.create()) + .build(); + URI endpoint = StringUtils.isEmpty(bucket) ? tmpEndpoint : + URI.create(new URIBuilder(tmpEndpoint).setHost(bucket + "." + tmpEndpoint.getHost()).toString()); + client = S3Client.builder() + .endpointOverride(endpoint) + .credentialsProvider(scp) + .region(Region.of(properties.get(S3Properties.REGION))) + .overrideConfiguration(clientConf) + // disable chunkedEncoding because of bos not supported + // use virtual hosted-style access + .serviceConfiguration(S3Configuration.builder() + .chunkedEncodingEnabled(false) + .pathStyleAccessEnabled(false) + .build()) + .build(); + } + return client; + } + + @Override + public Triple getStsToken() throws DdlException { + return null; + } + + @Override + public Status headObject(String remotePath) { + try { + S3URI uri = S3URI.create(remotePath, forceHostedStyle); + HeadObjectResponse response = getClient(uri.getVirtualBucket()) + .headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build()); + LOG.info("head file " + remotePath + " success: " + response.toString()); + return Status.OK; + } catch (S3Exception e) { + if (e.statusCode() == HttpStatus.SC_NOT_FOUND) { + return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath); + } else { + LOG.warn("headObject failed:", e); + return new Status(Status.ErrCode.COMMON_ERROR, "headObject failed: " + e.getMessage()); + } + } catch (UserException ue) { + LOG.warn("connect to s3 failed: ", ue); + return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); + } + } + + @Override + public Status getObject(String remoteFilePath, File localFile) { + try { + S3URI uri = S3URI.create(remoteFilePath, forceHostedStyle); + GetObjectResponse response = getClient(uri.getVirtualBucket()).getObject( + GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), localFile.toPath()); + LOG.info("get file " + remoteFilePath + " success: " + response.toString()); + return Status.OK; + } catch (S3Exception s3Exception) { + return new Status( + Status.ErrCode.COMMON_ERROR, + "get file from s3 error: " + s3Exception.awsErrorDetails().errorMessage()); + } catch (UserException ue) { + LOG.warn("connect to s3 failed: ", ue); + return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); + } catch (Exception e) { + return new Status(Status.ErrCode.COMMON_ERROR, e.toString()); + } + } + + @Override + public Status putObject(String remotePath, @Nullable RequestBody requestBody) { + try { + S3URI uri = S3URI.create(remotePath, forceHostedStyle); + PutObjectResponse response = + getClient(uri.getVirtualBucket()) + .putObject( + PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), + requestBody); + LOG.info("put object success: " + response.eTag()); + return Status.OK; + } catch (S3Exception e) { + LOG.error("put object failed:", e); + return new Status(Status.ErrCode.COMMON_ERROR, "put object failed: " + e.getMessage()); + } catch (Exception ue) { + LOG.error("connect to s3 failed: ", ue); + return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); + } + } + + @Override + public Status deleteObject(String remotePath) { + try { + S3URI uri = S3URI.create(remotePath, forceHostedStyle); + DeleteObjectResponse response = + getClient(uri.getVirtualBucket()) + .deleteObject( + DeleteObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build()); + LOG.info("delete file " + remotePath + " success: " + response.toString()); + return Status.OK; + } catch (S3Exception e) { + LOG.warn("delete file failed: ", e); + if (e.statusCode() == HttpStatus.SC_NOT_FOUND) { + return Status.OK; + } + return new Status(Status.ErrCode.COMMON_ERROR, "delete file failed: " + e.getMessage()); + } catch (UserException ue) { + LOG.warn("connect to s3 failed: ", ue); + return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); + } + } + + public Status copyObject(String origFilePath, String destFilePath) { + try { + S3URI origUri = S3URI.create(origFilePath); + S3URI descUri = S3URI.create(destFilePath, forceHostedStyle); + getClient(descUri.getVirtualBucket()) + .copyObject( + CopyObjectRequest.builder() + .copySource(origUri.getBucket() + "/" + origUri.getKey()) + .destinationBucket(descUri.getBucket()) + .destinationKey(descUri.getKey()) + .build()); + return Status.OK; + } catch (S3Exception e) { + LOG.error("copy file failed: ", e); + return new Status(Status.ErrCode.COMMON_ERROR, "copy file failed: " + e.getMessage()); + } catch (UserException ue) { + LOG.error("copy to s3 failed: ", ue); + return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); + } + } + + @Override + public RemoteObjects listObjects(String absolutePath, String continuationToken) throws DdlException { + try { + S3URI uri = S3URI.create(absolutePath, forceHostedStyle); + String prefix = uri.getKey(); + ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder().bucket(uri.getBucket()) + .prefix(normalizePrefix(prefix)); + if (!StringUtils.isEmpty(continuationToken)) { + requestBuilder.continuationToken(continuationToken); + } + ListObjectsV2Response response = getClient(uri.getVirtualBucket()).listObjectsV2(requestBuilder.build()); + List remoteObjects = new ArrayList<>(); + for (S3Object c : response.contents()) { + String relativePath = getRelativePath(prefix, c.key()); + remoteObjects.add(new RemoteObject(c.key(), relativePath, c.eTag(), c.size())); + } + return new RemoteObjects(remoteObjects, response.isTruncated(), response.nextContinuationToken()); + } catch (Exception e) { + LOG.warn("Failed to list objects for S3", e); + throw new DdlException("Failed to list objects for S3, Error message: " + e.getMessage(), e); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3Storage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3Storage.java index 8a30ebbe59..109db17f30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3Storage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3Storage.java @@ -72,6 +72,11 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +/** + * @see S3ObjStorage + * @see org.apache.doris.fs.remote.S3FileSystem + */ +@Deprecated public class S3Storage extends BlobStorage { private static final Logger LOG = LogManager.getLogger(S3Storage.class); private FileSystem dfsFileSystem = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileOperations.java b/fe/fe-core/src/main/java/org/apache/doris/fs/operations/BrokerFileOperations.java similarity index 61% rename from fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileOperations.java rename to fe/fe-core/src/main/java/org/apache/doris/fs/operations/BrokerFileOperations.java index 8fe71713e9..6cfdcec190 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileOperations.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/operations/BrokerFileOperations.java @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.fs.remote; +package org.apache.doris.fs.operations; import org.apache.doris.backup.Status; +import org.apache.doris.common.Config; import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TBrokerCloseReaderRequest; import org.apache.doris.thrift.TBrokerCloseWriterRequest; import org.apache.doris.thrift.TBrokerFD; @@ -30,8 +32,6 @@ import org.apache.doris.thrift.TBrokerOpenWriterResponse; import org.apache.doris.thrift.TBrokerOperationStatus; import org.apache.doris.thrift.TBrokerOperationStatusCode; import org.apache.doris.thrift.TBrokerVersion; -import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TPaloBrokerService; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -39,72 +39,86 @@ import org.apache.thrift.TException; import java.util.Map; -public class BrokerFileOperations { +public class BrokerFileOperations implements FileOperations { private static final Logger LOG = LogManager.getLogger(BrokerFileOperations.class); - private String name; + private final String name; - private Map properties; + private final Map properties; public BrokerFileOperations(String name, Map properties) { this.name = name; this.properties = properties; } + public static String clientId() { + return FrontendOptions.getLocalHostAddress() + ":" + Config.edit_log_port; + } - public Status openReader(TPaloBrokerService.Client client, TNetworkAddress address, String remoteFilePath, - TBrokerFD fd) { + @Override + public Status openReader(OpParams opParams) { + BrokerOpParams brokerOpParams = (BrokerOpParams) opParams; + String remoteFilePath = brokerOpParams.remotePath(); try { TBrokerOpenReaderRequest req = new TBrokerOpenReaderRequest(TBrokerVersion.VERSION_ONE, remoteFilePath, - 0, BrokerFileSystem.clientId(), properties); - TBrokerOpenReaderResponse rep = client.openReader(req); + 0, clientId(), properties); + TBrokerOpenReaderResponse rep = brokerOpParams.client().openReader(req); TBrokerOperationStatus opst = rep.getOpStatus(); if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) { return new Status(Status.ErrCode.COMMON_ERROR, - "failed to open reader on broker " + BrokerUtil.printBroker(name, address) + "failed to open reader on broker " + + BrokerUtil.printBroker(name, brokerOpParams.address()) + " for file: " + remoteFilePath + ". msg: " + opst.getMessage()); } - fd.setHigh(rep.getFd().getHigh()); - fd.setLow(rep.getFd().getLow()); + brokerOpParams.fd().setHigh(rep.getFd().getHigh()); + brokerOpParams.fd().setLow(rep.getFd().getLow()); } catch (TException e) { return new Status(Status.ErrCode.COMMON_ERROR, - "failed to open reader on broker " + BrokerUtil.printBroker(name, address) + "failed to open reader on broker " + + BrokerUtil.printBroker(name, brokerOpParams.address()) + " for file: " + remoteFilePath + ". msg: " + e.getMessage()); } return Status.OK; } - public Status closeReader(TPaloBrokerService.Client client, TNetworkAddress address, TBrokerFD fd) { + public Status closeReader(OpParams opParams) { + BrokerOpParams brokerOpParams = (BrokerOpParams) opParams; + TBrokerFD fd = brokerOpParams.fd(); try { TBrokerCloseReaderRequest req = new TBrokerCloseReaderRequest(TBrokerVersion.VERSION_ONE, fd); - TBrokerOperationStatus st = client.closeReader(req); + TBrokerOperationStatus st = brokerOpParams.client().closeReader(req); if (st.getStatusCode() != TBrokerOperationStatusCode.OK) { return new Status(Status.ErrCode.COMMON_ERROR, - "failed to close reader on broker " + BrokerUtil.printBroker(name, address) + "failed to close reader on broker " + + BrokerUtil.printBroker(name, brokerOpParams.address()) + " for fd: " + fd); } LOG.info("finished to close reader. fd: {}.", fd); } catch (TException e) { return new Status(Status.ErrCode.BAD_CONNECTION, - "failed to close reader on broker " + BrokerUtil.printBroker(name, address) + "failed to close reader on broker " + + BrokerUtil.printBroker(name, brokerOpParams.address()) + ", fd " + fd + ", msg: " + e.getMessage()); } return Status.OK; } - public Status openWriter(TPaloBrokerService.Client client, TNetworkAddress address, String remoteFile, - TBrokerFD fd) { + public Status openWriter(OpParams desc) { + BrokerOpParams brokerOpParams = (BrokerOpParams) desc; + String remoteFile = brokerOpParams.remotePath(); + TBrokerFD fd = brokerOpParams.fd(); try { TBrokerOpenWriterRequest req = new TBrokerOpenWriterRequest(TBrokerVersion.VERSION_ONE, - remoteFile, TBrokerOpenMode.APPEND, BrokerFileSystem.clientId(), properties); - TBrokerOpenWriterResponse rep = client.openWriter(req); + remoteFile, TBrokerOpenMode.APPEND, clientId(), properties); + TBrokerOpenWriterResponse rep = brokerOpParams.client().openWriter(req); TBrokerOperationStatus opst = rep.getOpStatus(); if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) { return new Status(Status.ErrCode.COMMON_ERROR, - "failed to open writer on broker " + BrokerUtil.printBroker(name, address) + "failed to open writer on broker " + + BrokerUtil.printBroker(name, brokerOpParams.address()) + " for file: " + remoteFile + ". msg: " + opst.getMessage()); } @@ -113,27 +127,32 @@ public class BrokerFileOperations { LOG.info("finished to open writer. fd: {}. directly upload to remote path {}.", fd, remoteFile); } catch (TException e) { return new Status(Status.ErrCode.BAD_CONNECTION, - "failed to open writer on broker " + BrokerUtil.printBroker(name, address) + "failed to open writer on broker " + + BrokerUtil.printBroker(name, brokerOpParams.address()) + ", err: " + e.getMessage()); } return Status.OK; } - public Status closeWriter(TPaloBrokerService.Client client, TNetworkAddress address, TBrokerFD fd) { + public Status closeWriter(OpParams desc) { + BrokerOpParams brokerOpParams = (BrokerOpParams) desc; + TBrokerFD fd = brokerOpParams.fd(); try { TBrokerCloseWriterRequest req = new TBrokerCloseWriterRequest(TBrokerVersion.VERSION_ONE, fd); - TBrokerOperationStatus st = client.closeWriter(req); + TBrokerOperationStatus st = brokerOpParams.client().closeWriter(req); if (st.getStatusCode() != TBrokerOperationStatusCode.OK) { return new Status(Status.ErrCode.COMMON_ERROR, - "failed to close writer on broker " + BrokerUtil.printBroker(name, address) + "failed to close writer on broker " + + BrokerUtil.printBroker(name, brokerOpParams.address()) + " for fd: " + fd); } LOG.info("finished to close writer. fd: {}.", fd); } catch (TException e) { return new Status(Status.ErrCode.BAD_CONNECTION, - "failed to close writer on broker " + BrokerUtil.printBroker(name, address) + "failed to close writer on broker " + + BrokerUtil.printBroker(name, brokerOpParams.address()) + ", fd " + fd + ", msg: " + e.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/operations/BrokerOpParams.java b/fe/fe-core/src/main/java/org/apache/doris/fs/operations/BrokerOpParams.java new file mode 100644 index 0000000000..762073935e --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/operations/BrokerOpParams.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.operations; + +import org.apache.doris.thrift.TBrokerFD; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPaloBrokerService; + +public class BrokerOpParams extends OpParams { + private final TPaloBrokerService.Client client; + private final TNetworkAddress address; + private final TBrokerFD fd; + private String remoteFilePath; + + protected BrokerOpParams(TPaloBrokerService.Client client, TNetworkAddress address, TBrokerFD fd) { + this(client, address, null, fd); + } + + protected BrokerOpParams(TPaloBrokerService.Client client, TNetworkAddress address, + String remoteFilePath, TBrokerFD fd) { + this.client = client; + this.address = address; + this.remoteFilePath = remoteFilePath; + this.fd = fd; + } + + public TPaloBrokerService.Client client() { + return client; + } + + public TNetworkAddress address() { + return address; + } + + public String remotePath() { + return remoteFilePath; + } + + public TBrokerFD fd() { + return fd; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/operations/FileOperations.java b/fe/fe-core/src/main/java/org/apache/doris/fs/operations/FileOperations.java new file mode 100644 index 0000000000..9968885e1d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/operations/FileOperations.java @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.operations; + +import org.apache.doris.backup.Status; + +/** + * FileOperations contains the read and write operations + * Can extend the other method + */ +public interface FileOperations { + Status openReader(OpParams desc); + + Status closeReader(OpParams desc); + + Status openWriter(OpParams desc); + + Status closeWriter(OpParams desc); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/HDFSFileOperations.java b/fe/fe-core/src/main/java/org/apache/doris/fs/operations/HDFSFileOperations.java similarity index 55% rename from fe/fe-core/src/main/java/org/apache/doris/fs/remote/HDFSFileOperations.java rename to fe/fe-core/src/main/java/org/apache/doris/fs/operations/HDFSFileOperations.java index 9e03f8f5a0..44563317bc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/HDFSFileOperations.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/operations/HDFSFileOperations.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.fs.remote; +package org.apache.doris.fs.operations; import org.apache.doris.backup.Status; import org.apache.doris.common.UserException; @@ -30,7 +30,7 @@ import org.apache.logging.log4j.Logger; import java.io.IOException; -public class HDFSFileOperations { +public class HDFSFileOperations implements FileOperations { private static final Logger LOG = LogManager.getLogger(HDFSFileOperations.class); public static final int READ_BUFFER_SIZE = 128 << 10; // 128k public static final int WRITE_BUFFER_SIZE = 128 << 10; // 128k @@ -45,33 +45,38 @@ public class HDFSFileOperations { /** * open remotePath for read. * - * @param remotePath hdfs://namenode:port/path. - * @param startOffset the offset to read. - * @return FSDataInputStream if success. - * @throws UserException when get filesystem failed. - * @throws IOException when open file error. + * @param opParams hdfsOpParams.remotePath: hdfs://namenode:port/path. + * hdfsOpParams.startOffset: the offset to read. + * @return Status.OK and set fsDataInputStream if success */ - public FSDataInputStream openReader(String remotePath, long startOffset) throws UserException, IOException { - URI pathUri = URI.create(remotePath); - Path inputFilePath = new Path(pathUri.getPath()); + public Status openReader(OpParams opParams) { + HDFSOpParams hdfsOpParams = (HDFSOpParams) opParams; try { + URI pathUri = URI.create(hdfsOpParams.remotePath()); + Path inputFilePath = new Path(pathUri.getPath()); FSDataInputStream fsDataInputStream = hdfsClient.open(inputFilePath, READ_BUFFER_SIZE); - fsDataInputStream.seek(startOffset); - return fsDataInputStream; + fsDataInputStream.seek(hdfsOpParams.startOffset()); + hdfsOpParams.withFsDataInputStream(fsDataInputStream); + return Status.OK; } catch (IOException e) { LOG.error("errors while open path", e); - throw new IOException(e.getMessage()); + return new Status(Status.ErrCode.COMMON_ERROR, "failed to open reader, msg:" + e.getMessage()); + } catch (UserException ex) { + LOG.error("errors while get filesystem", ex); + return new Status(Status.ErrCode.COMMON_ERROR, "failed to get filesystem, msg:" + ex.getMessage()); } } /** * close for read. * - * @param fsDataInputStream the input stream. + * @param opParams hdfsOpParams.fsDataInputStream: the input stream. * @return Status.OK if success. */ - public Status closeReader(FSDataInputStream fsDataInputStream) { - synchronized (fsDataInputStream) { + public Status closeReader(OpParams opParams) { + HDFSOpParams hdfsOpParams = (HDFSOpParams) opParams; + FSDataInputStream fsDataInputStream = hdfsOpParams.fsDataInputStream(); + synchronized (this) { try { fsDataInputStream.close(); } catch (IOException e) { @@ -87,31 +92,35 @@ public class HDFSFileOperations { /** * open remotePath for write. * - * @param remotePath hdfs://namenode:port/path. - * @return FSDataOutputStream - * @throws UserException when get filesystem failed. - * @throws IOException when open path error. + * @param opParams hdfsOpParams.remotePath: hdfs://namenode:port/path. + * @return Status.OK and set FSDataOutputStream if success */ - public FSDataOutputStream openWriter(String remotePath) throws UserException, IOException { - URI pathUri = URI.create(remotePath); - Path inputFilePath = new Path(pathUri.getPath()); + public Status openWriter(OpParams opParams) { + HDFSOpParams hdfsOpParams = (HDFSOpParams) opParams; try { - return hdfsClient.create(inputFilePath, true, WRITE_BUFFER_SIZE); + URI pathUri = URI.create(hdfsOpParams.remotePath()); + Path inputFilePath = new Path(pathUri.getPath()); + hdfsOpParams.withFsDataOutputStream(hdfsClient.create(inputFilePath, true, WRITE_BUFFER_SIZE)); + return Status.OK; } catch (IOException e) { LOG.error("errors while open path", e); - throw new IOException(e.getMessage()); + return new Status(Status.ErrCode.COMMON_ERROR, "failed to open writer, msg:" + e.getMessage()); + } catch (UserException ex) { + LOG.error("errors while get filesystem", ex); + return new Status(Status.ErrCode.COMMON_ERROR, "failed to get filesystem, msg:" + ex.getMessage()); } } /** * close for write. * - * @param fsDataOutputStream output stream. + * @param opParams hdfsOpParams.fsDataOutputStream: output stream. * @return Status.OK if success. */ - - public Status closeWriter(FSDataOutputStream fsDataOutputStream) { - synchronized (fsDataOutputStream) { + public Status closeWriter(OpParams opParams) { + HDFSOpParams hdfsOpParams = (HDFSOpParams) opParams; + FSDataOutputStream fsDataOutputStream = hdfsOpParams.fsDataOutputStream(); + synchronized (this) { try { fsDataOutputStream.flush(); fsDataOutputStream.close(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/operations/HDFSOpParams.java b/fe/fe-core/src/main/java/org/apache/doris/fs/operations/HDFSOpParams.java new file mode 100644 index 0000000000..4e534aca0c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/operations/HDFSOpParams.java @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.operations; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; + +public class HDFSOpParams extends OpParams { + private String remotePath; + private long startOffset; + private FSDataOutputStream fsDataOutputStream; + private FSDataInputStream fsDataInputStream; + + protected HDFSOpParams(FSDataInputStream fsDataInputStream) { + this(null, 0, fsDataInputStream, null); + } + + protected HDFSOpParams(FSDataOutputStream fsDataOutputStream) { + this(null, 0, null, fsDataOutputStream); + } + + protected HDFSOpParams(String remotePath, long startOffset) { + this(remotePath, startOffset, null, null); + } + + protected HDFSOpParams(String remotePath, long startOffset, + FSDataInputStream fsDataInputStream, FSDataOutputStream fsDataOutputStream) { + this.remotePath = remotePath; + this.startOffset = startOffset; + this.fsDataInputStream = fsDataInputStream; + this.fsDataOutputStream = fsDataOutputStream; + } + + public String remotePath() { + return remotePath; + } + + public long startOffset() { + return startOffset; + } + + public FSDataOutputStream fsDataOutputStream() { + return fsDataOutputStream; + } + + public FSDataInputStream fsDataInputStream() { + return fsDataInputStream; + } + + public void withFsDataOutputStream(FSDataOutputStream fsDataOutputStream) { + this.fsDataOutputStream = fsDataOutputStream; + } + + public void withFsDataInputStream(FSDataInputStream fsDataInputStream) { + this.fsDataInputStream = fsDataInputStream; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/operations/OpParams.java b/fe/fe-core/src/main/java/org/apache/doris/fs/operations/OpParams.java new file mode 100644 index 0000000000..542000ca93 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/operations/OpParams.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.operations; + +import org.apache.doris.thrift.TBrokerFD; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPaloBrokerService; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; + +/** + * All arguments of the FileOperations implementation class should get from this class + * @see org.apache.doris.fs.operations.FileOperations + * Should avoid using this class elsewhere + */ +public class OpParams { + + public static BrokerOpParams of(TPaloBrokerService.Client client, TNetworkAddress address, TBrokerFD fd) { + return new BrokerOpParams(client, address, fd); + } + + public static BrokerOpParams of(TPaloBrokerService.Client client, TNetworkAddress address, + String remoteFilePath, TBrokerFD fd) { + return new BrokerOpParams(client, address, remoteFilePath, fd); + } + + public static HDFSOpParams of(String remotePath) { + return new HDFSOpParams(remotePath, 0); + } + + public static HDFSOpParams of(FSDataOutputStream fsDataOutputStream) { + return new HDFSOpParams(fsDataOutputStream); + } + + public static HDFSOpParams of(FSDataInputStream fsDataInputStream) { + return new HDFSOpParams(fsDataInputStream); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java index 435e1219be..25cb7de806 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java @@ -17,16 +17,17 @@ package org.apache.doris.fs.remote; +import org.apache.doris.analysis.StorageBackend; import org.apache.doris.backup.RemoteFile; import org.apache.doris.backup.Status; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.ClientPool; -import org.apache.doris.common.Config; import org.apache.doris.common.Pair; -import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.fs.operations.BrokerFileOperations; +import org.apache.doris.fs.operations.OpParams; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TBrokerCheckPathExistRequest; import org.apache.doris.thrift.TBrokerCheckPathExistResponse; @@ -46,8 +47,6 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloBrokerService; import com.google.common.base.Preconditions; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; @@ -72,20 +71,14 @@ import java.util.Map; public class BrokerFileSystem extends RemoteFileSystem { private static final Logger LOG = LogManager.getLogger(BrokerFileSystem.class); - private String name; - - private BrokerFileOperations operations; + private final BrokerFileOperations operations; public BrokerFileSystem(String name, Map properties) { - this.name = name; + super(name, StorageBackend.StorageType.BROKER); this.properties = properties; this.operations = new BrokerFileOperations(name, properties); } - public static String clientId() { - return FrontendOptions.getLocalHostAddress() + ":" + Config.edit_log_port; - } - public Pair getBroker() { Pair result = Pair.of(null, null); FsBroker broker; @@ -171,7 +164,7 @@ public class BrokerFileSystem extends RemoteFileSystem { // 2. open file reader with broker TBrokerFD fd = new TBrokerFD(); - Status opStatus = operations.openReader(client, address, remoteFilePath, fd); + Status opStatus = operations.openReader(OpParams.of(client, address, remoteFilePath, fd)); if (!opStatus.ok()) { return opStatus; } @@ -296,7 +289,7 @@ public class BrokerFileSystem extends RemoteFileSystem { + BrokerUtil.printBroker(name, address)); } finally { // close broker reader - Status closeStatus = operations.closeReader(client, address, fd); + Status closeStatus = operations.closeReader(OpParams.of(client, address, fd)); if (!closeStatus.ok()) { LOG.warn(closeStatus.getErrMsg()); if (status.ok()) { @@ -329,7 +322,7 @@ public class BrokerFileSystem extends RemoteFileSystem { Status status = Status.OK; try { // 2. open file writer with broker - status = operations.openWriter(client, address, remoteFile, fd); + status = operations.openWriter(OpParams.of(client, address, remoteFile, fd)); if (!status.ok()) { return status; } @@ -349,7 +342,7 @@ public class BrokerFileSystem extends RemoteFileSystem { + ", broker: " + BrokerUtil.printBroker(name, address)); } } finally { - Status closeStatus = operations.closeWriter(client, address, fd); + Status closeStatus = operations.closeWriter(OpParams.of(client, address, fd)); if (closeStatus.getErrCode() == Status.ErrCode.BAD_CONNECTION || status.getErrCode() == Status.ErrCode.BAD_CONNECTION) { ClientPool.brokerPool.invalidateObject(address, client); @@ -374,7 +367,7 @@ public class BrokerFileSystem extends RemoteFileSystem { // 2. open file write with broker TBrokerFD fd = new TBrokerFD(); - Status status = operations.openWriter(client, address, remotePath, fd); + Status status = operations.openWriter(OpParams.of(client, address, remotePath, fd)); if (!status.ok()) { return status; } @@ -463,7 +456,7 @@ public class BrokerFileSystem extends RemoteFileSystem { + ", broker: " + BrokerUtil.printBroker(name, address)); } finally { // close write - Status closeStatus = operations.closeWriter(client, address, fd); + Status closeStatus = operations.closeWriter(OpParams.of(client, address, fd)); if (!closeStatus.ok()) { LOG.warn(closeStatus.getErrMsg()); if (status.ok()) { @@ -562,16 +555,6 @@ public class BrokerFileSystem extends RemoteFileSystem { return Status.OK; } - @Override - public RemoteIterator listLocatedStatus(String remotePath) throws UserException { - return null; - } - - @Override - public Status list(String remotePath, List result) { - return list(remotePath, result, true); - } - // List files in remotePath @Override public Status list(String remotePath, List result, boolean fileNameOnly) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java index d1f99db596..fdeddd6198 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java @@ -17,5 +17,119 @@ package org.apache.doris.fs.remote; +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.backup.Status; +import org.apache.doris.fs.obj.ObjStorage; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import software.amazon.awssdk.core.sync.RequestBody; + +import java.io.File; +import java.io.IOException; +import java.nio.file.FileVisitOption; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; + public abstract class ObjFileSystem extends RemoteFileSystem { + private static final Logger LOG = LogManager.getLogger(ObjFileSystem.class); + + protected final ObjStorage objStorage; + + public ObjFileSystem(String name, StorageBackend.StorageType type, ObjStorage objStorage) { + super(name, type); + this.objStorage = objStorage; + } + + @Override + public Status exists(String remotePath) { + return objStorage.headObject(remotePath); + } + + @Override + public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) { + long start = System.currentTimeMillis(); + // Write the data to a local file + File localFile = new File(localFilePath); + if (localFile.exists()) { + try { + Files.walk(Paths.get(localFilePath), FileVisitOption.FOLLOW_LINKS) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } catch (IOException e) { + return new Status( + Status.ErrCode.COMMON_ERROR, "failed to delete exist local file: " + localFilePath); + } + } + Status st = objStorage.getObject(remoteFilePath, localFile); + if (st != Status.OK) { + return st; + } + if (localFile.length() == fileSize) { + LOG.info( + "finished to get file from {} to {} with size: {}. cost {} ms", + remoteFilePath, + localFile.toPath(), + fileSize, + (System.currentTimeMillis() - start)); + return Status.OK; + } else { + return new Status(Status.ErrCode.COMMON_ERROR, localFile.toString()); + } + } + + @Override + public Status directUpload(String content, String remoteFile) { + Status st = objStorage.putObject(remoteFile, RequestBody.fromBytes(content.getBytes())); + if (st != Status.OK) { + return st; + } + LOG.info("upload content success."); + return Status.OK; + } + + @Override + public Status upload(String localPath, String remotePath) { + Status st = objStorage.putObject(remotePath, RequestBody.fromFile(new File(localPath))); + if (st != Status.OK) { + return st; + } + LOG.info("upload file " + localPath + " success."); + return Status.OK; + } + + @Override + public Status makeDir(String remotePath) { + if (!remotePath.endsWith("/")) { + remotePath += "/"; + } + Status st = objStorage.putObject(remotePath, RequestBody.empty()); + if (st != Status.OK) { + return st; + } + LOG.info("makeDir success."); + return Status.OK; + } + + @Override + public Status rename(String origFilePath, String destFilePath) { + Status status = objStorage.copyObject(origFilePath, destFilePath); + if (status.ok()) { + return delete(origFilePath); + } else { + return status; + } + } + + public Status copy(String origFilePath, String destFilePath) { + return objStorage.copyObject(origFilePath, destFilePath); + } + + @Override + public Status delete(String remotePath) { + return objStorage.deleteObject(remotePath); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java index 9bb089fad0..1a7dbdc9e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java @@ -17,11 +17,34 @@ package org.apache.doris.fs.remote; -import org.apache.doris.fs.FileSystem; +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.common.UserException; +import org.apache.doris.fs.PersistentFileSystem; -import java.util.Map; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; -public abstract class RemoteFileSystem implements FileSystem { +import java.io.IOException; + +public abstract class RemoteFileSystem extends PersistentFileSystem { protected org.apache.hadoop.fs.FileSystem dfsFileSystem = null; - protected Map properties; + + public RemoteFileSystem(String name, StorageBackend.StorageType type) { + super(name, type); + } + + protected org.apache.hadoop.fs.FileSystem getFileSystem(String remotePath) throws UserException { + throw new UserException("Not support to getFileSystem."); + } + + @Override + public RemoteIterator listLocatedStatus(String remotePath) throws UserException { + org.apache.hadoop.fs.FileSystem fileSystem = getFileSystem(remotePath); + try { + return fileSystem.listLocatedStatus(new Path(remotePath)); + } catch (IOException e) { + throw new UserException("Failed to list located status for path: " + remotePath, e); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java index 72f6b487b8..32ed2484f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java @@ -17,114 +17,37 @@ package org.apache.doris.fs.remote; +import org.apache.doris.analysis.StorageBackend; import org.apache.doris.backup.RemoteFile; import org.apache.doris.backup.Status; -import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; -import org.apache.doris.common.util.S3URI; import org.apache.doris.datasource.property.PropertyConverter; -import org.apache.doris.datasource.property.constants.S3Properties; -import org.apache.doris.fs.obj.S3Storage; +import org.apache.doris.fs.obj.S3ObjStorage; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.http.HttpStatus; -import org.apache.http.client.utils.URIBuilder; +import org.apache.hadoop.fs.Path; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; -import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; -import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.auth.signer.AwsS3V4Signer; -import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; -import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; -import software.amazon.awssdk.core.retry.RetryPolicy; -import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy; -import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.regions.Region; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.S3Configuration; -import software.amazon.awssdk.services.s3.model.CopyObjectRequest; -import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; -import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; -import software.amazon.awssdk.services.s3.model.HeadObjectRequest; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.PutObjectResponse; -import software.amazon.awssdk.services.s3.model.S3Exception; -import java.io.File; import java.io.FileNotFoundException; -import java.io.IOException; import java.net.URI; -import java.nio.file.FileVisitOption; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.time.Duration; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.TreeMap; public class S3FileSystem extends ObjFileSystem { - private static final Logger LOG = LogManager.getLogger(S3Storage.class); - private S3Client client; - // false: the s3 client will automatically convert endpoint to virtual-hosted style, eg: - // endpoint: http://s3.us-east-2.amazonaws.com - // bucket/path: my_bucket/file.txt - // auto convert: http://my_bucket.s3.us-east-2.amazonaws.com/file.txt - // true: the s3 client will NOT automatically convert endpoint to virtual-hosted style, we need to do some tricks: - // endpoint: http://cos.ap-beijing.myqcloud.com - // bucket/path: my_bucket/file.txt - // convert manually: See S3URI() - private boolean forceHostedStyle = false; + private static final Logger LOG = LogManager.getLogger(S3FileSystem.class); public S3FileSystem(Map properties) { + super(StorageBackend.StorageType.S3.name(), StorageBackend.StorageType.S3, new S3ObjStorage(properties)); this.properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); - setProperties(properties); } - private void setProperties(Map properties) { - this.properties.putAll(properties); - try { - S3Properties.requiredS3Properties(this.properties); - } catch (DdlException e) { - throw new IllegalArgumentException(e); - } - // Virtual hosted-style is recommended in the s3 protocol. - // The path-style has been abandoned, but for some unexplainable reasons, - // the s3 client will determine whether the endpiont starts with `s3` - // when generating a virtual hosted-sytle request. - // If not, it will not be converted ( https://github.com/aws/aws-sdk-java-v2/pull/763), - // but the endpoints of many cloud service providers for object storage do not start with s3, - // so they cannot be converted to virtual hosted-sytle. - // Some of them, such as aliyun's oss, only support virtual hosted-style, - // and some of them(ceph) may only support - // path-style, so we need to do some additional conversion. - // - // use_path_style | !use_path_style - // S3 forceHostedStyle=false | forceHostedStyle=false - // !S3 forceHostedStyle=false | forceHostedStyle=true - // - // That is, for S3 endpoint, ignore the `use_path_style` property, and the s3 client will automatically use - // virtual hosted-sytle. - // And for other endpoint, if `use_path_style` is true, use path style. Otherwise, use virtual hosted-sytle. - if (!this.properties.get(S3Properties.ENDPOINT).toLowerCase().startsWith("s3")) { - forceHostedStyle = !this.properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false") - .equalsIgnoreCase("true"); - } else { - forceHostedStyle = false; - } - } - - private FileSystem getFileSystem(String remotePath) throws UserException { + @Override + protected FileSystem getFileSystem(String remotePath) throws UserException { if (dfsFileSystem == null) { Configuration conf = new Configuration(); System.setProperty("com.amazonaws.services.s3.enableV4", "true"); @@ -138,238 +61,12 @@ public class S3FileSystem extends ObjFileSystem { return dfsFileSystem; } - private S3Client getClient(String bucket) throws UserException { - if (client == null) { - URI tmpEndpoint = URI.create(properties.get(S3Properties.ENDPOINT)); - StaticCredentialsProvider scp; - if (!properties.containsKey(S3Properties.SESSION_TOKEN)) { - AwsBasicCredentials awsBasic = AwsBasicCredentials.create( - properties.get(S3Properties.ACCESS_KEY), - properties.get(S3Properties.SECRET_KEY)); - scp = StaticCredentialsProvider.create(awsBasic); - } else { - AwsSessionCredentials awsSession = AwsSessionCredentials.create( - properties.get(S3Properties.ACCESS_KEY), - properties.get(S3Properties.SECRET_KEY), - properties.get(S3Properties.SESSION_TOKEN)); - scp = StaticCredentialsProvider.create(awsSession); - } - EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy - .builder() - .baseDelay(Duration.ofSeconds(1)) - .maxBackoffTime(Duration.ofMinutes(1)) - .build(); - // retry 3 time with Equal backoff - RetryPolicy retryPolicy = RetryPolicy - .builder() - .numRetries(3) - .backoffStrategy(backoffStrategy) - .build(); - ClientOverrideConfiguration clientConf = ClientOverrideConfiguration - .builder() - // set retry policy - .retryPolicy(retryPolicy) - // using AwsS3V4Signer - .putAdvancedOption(SdkAdvancedClientOption.SIGNER, AwsS3V4Signer.create()) - .build(); - URI endpoint = StringUtils.isEmpty(bucket) ? tmpEndpoint : - URI.create(new URIBuilder(tmpEndpoint).setHost(bucket + "." + tmpEndpoint.getHost()).toString()); - client = S3Client.builder() - .endpointOverride(endpoint) - .credentialsProvider(scp) - .region(Region.of(properties.get(S3Properties.REGION))) - .overrideConfiguration(clientConf) - // disable chunkedEncoding because of bos not supported - // use virtual hosted-style access - .serviceConfiguration(S3Configuration.builder() - .chunkedEncodingEnabled(false) - .pathStyleAccessEnabled(false) - .build()) - .build(); - } - return client; - } - - @Override - public Status exists(String remotePath) { - try { - S3URI uri = S3URI.create(remotePath, forceHostedStyle); - getClient(uri.getVirtualBucket()) - .headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build()); - return Status.OK; - } catch (S3Exception e) { - if (e.statusCode() == HttpStatus.SC_NOT_FOUND) { - return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath); - } else { - LOG.warn("headObject failed:", e); - return new Status(Status.ErrCode.COMMON_ERROR, "headObject failed: " + e.getMessage()); - } - } catch (UserException ue) { - LOG.warn("connect to s3 failed: ", ue); - return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); - } - } - - @Override - public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) { - long start = System.currentTimeMillis(); - // Write the data to a local file - File localFile = new File(localFilePath); - if (localFile.exists()) { - try { - Files.walk(Paths.get(localFilePath), FileVisitOption.FOLLOW_LINKS) - .sorted(Comparator.reverseOrder()) - .map(Path::toFile) - .forEach(File::delete); - } catch (IOException e) { - return new Status( - Status.ErrCode.COMMON_ERROR, "failed to delete exist local file: " + localFilePath); - } - } - try { - S3URI uri = S3URI.create(remoteFilePath, forceHostedStyle); - GetObjectResponse response = getClient(uri.getVirtualBucket()).getObject( - GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), localFile.toPath()); - if (localFile.length() == fileSize) { - LOG.info( - "finished to download from {} to {} with size: {}. cost {} ms", - remoteFilePath, - localFilePath, - fileSize, - (System.currentTimeMillis() - start)); - return Status.OK; - } else { - return new Status(Status.ErrCode.COMMON_ERROR, response.toString()); - } - } catch (S3Exception s3Exception) { - return new Status( - Status.ErrCode.COMMON_ERROR, - "get file from s3 error: " + s3Exception.awsErrorDetails().errorMessage()); - } catch (UserException ue) { - LOG.warn("connect to s3 failed: ", ue); - return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); - } catch (Exception e) { - return new Status(Status.ErrCode.COMMON_ERROR, e.toString()); - } - } - - @Override - public Status directUpload(String content, String remoteFile) { - try { - S3URI uri = S3URI.create(remoteFile, forceHostedStyle); - PutObjectResponse response = - getClient(uri.getVirtualBucket()) - .putObject( - PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), - RequestBody.fromBytes(content.getBytes())); - LOG.info("upload content success: " + response.eTag()); - return Status.OK; - } catch (S3Exception e) { - LOG.warn("write content failed:", e); - return new Status(Status.ErrCode.COMMON_ERROR, "write content failed: " + e.getMessage()); - } catch (UserException ue) { - LOG.warn("connect to s3 failed: ", ue); - return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); - } catch (Exception e) { - LOG.warn("connect to s3 failed: ", e); - return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + e.getMessage()); - } - } - - public Status copy(String origFilePath, String destFilePath) { - try { - S3URI origUri = S3URI.create(origFilePath); - S3URI descUri = S3URI.create(destFilePath, forceHostedStyle); - getClient(descUri.getVirtualBucket()) - .copyObject( - CopyObjectRequest.builder() - .copySource(origUri.getBucket() + "/" + origUri.getKey()) - .destinationBucket(descUri.getBucket()) - .destinationKey(descUri.getKey()) - .build()); - return Status.OK; - } catch (S3Exception e) { - LOG.error("copy file failed: ", e); - return new Status(Status.ErrCode.COMMON_ERROR, "copy file failed: " + e.getMessage()); - } catch (UserException ue) { - LOG.error("copy to s3 failed: ", ue); - return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); - } - } - - @Override - public Status upload(String localPath, String remotePath) { - try { - S3URI uri = S3URI.create(remotePath, forceHostedStyle); - PutObjectResponse response = - getClient(uri.getVirtualBucket()) - .putObject( - PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), - RequestBody.fromFile(new File(localPath))); - LOG.info("upload file " + localPath + " success: " + response.eTag()); - return Status.OK; - } catch (S3Exception e) { - LOG.error("write file failed:", e); - return new Status(Status.ErrCode.COMMON_ERROR, "write file failed: " + e.getMessage()); - } catch (UserException ue) { - LOG.error("connect to s3 failed: ", ue); - return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); - } - } - - @Override - public Status rename(String origFilePath, String destFilePath) { - Status status = copy(origFilePath, destFilePath); - if (status.ok()) { - return delete(origFilePath); - } else { - return status; - } - } - - @Override - public Status delete(String remotePath) { - try { - S3URI uri = S3URI.create(remotePath, forceHostedStyle); - DeleteObjectResponse response = - getClient(uri.getVirtualBucket()) - .deleteObject( - DeleteObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build()); - LOG.info("delete file " + remotePath + " success: " + response.toString()); - return Status.OK; - } catch (S3Exception e) { - LOG.warn("delete file failed: ", e); - if (e.statusCode() == HttpStatus.SC_NOT_FOUND) { - return Status.OK; - } - return new Status(Status.ErrCode.COMMON_ERROR, "delete file failed: " + e.getMessage()); - } catch (UserException ue) { - LOG.warn("connect to s3 failed: ", ue); - return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); - } - } - - @Override - public RemoteIterator listLocatedStatus(String remotePath) throws UserException { - FileSystem fileSystem = getFileSystem(remotePath); - try { - return fileSystem.listLocatedStatus(new org.apache.hadoop.fs.Path(remotePath)); - } catch (IOException e) { - throw new UserException("Failed to list located status for path: " + remotePath, e); - } - } - - @Override - public Status list(String remotePath, List result) { - return list(remotePath, result, true); - } - // broker file pattern glob is too complex, so we use hadoop directly @Override public Status list(String remotePath, List result, boolean fileNameOnly) { try { FileSystem s3AFileSystem = getFileSystem(remotePath); - org.apache.hadoop.fs.Path pathPattern = new org.apache.hadoop.fs.Path(remotePath); + Path pathPattern = new Path(remotePath); FileStatus[] files = s3AFileSystem.globStatus(pathPattern); if (files == null) { return Status.OK; @@ -390,27 +87,4 @@ public class S3FileSystem extends ObjFileSystem { } return Status.OK; } - - @Override - public Status makeDir(String remotePath) { - if (!remotePath.endsWith("/")) { - remotePath += "/"; - } - try { - S3URI uri = S3URI.create(remotePath, forceHostedStyle); - PutObjectResponse response = - getClient(uri.getVirtualBucket()) - .putObject( - PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), - RequestBody.empty()); - LOG.info("makeDir success: " + response.eTag()); - return Status.OK; - } catch (S3Exception e) { - LOG.error("makeDir failed:", e); - return new Status(Status.ErrCode.COMMON_ERROR, "makeDir failed: " + e.getMessage()); - } catch (UserException ue) { - LOG.error("connect to s3 failed: ", ue); - return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); - } - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java similarity index 89% rename from fe/fe-core/src/main/java/org/apache/doris/fs/remote/DFSFileSystem.java rename to fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index c4c082b3aa..c6e38ebab2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -15,24 +15,26 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.fs.remote; +package org.apache.doris.fs.remote.dfs; +import org.apache.doris.analysis.StorageBackend; import org.apache.doris.backup.RemoteFile; import org.apache.doris.backup.Status; import org.apache.doris.catalog.AuthType; import org.apache.doris.catalog.HdfsResource; import org.apache.doris.common.UserException; import org.apache.doris.common.util.URI; -import org.apache.doris.fs.obj.HdfsStorage; +import org.apache.doris.fs.operations.HDFSFileOperations; +import org.apache.doris.fs.operations.HDFSOpParams; +import org.apache.doris.fs.operations.OpParams; +import org.apache.doris.fs.remote.RemoteFileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.logging.log4j.LogManager; @@ -43,29 +45,32 @@ import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.FileVisitOption; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Comparator; -import java.util.HashMap; import java.util.List; import java.util.Map; public class DFSFileSystem extends RemoteFileSystem { - private static final Logger LOG = LogManager.getLogger(HdfsStorage.class); + private static final Logger LOG = LogManager.getLogger(DFSFileSystem.class); private HDFSFileOperations operations = null; public DFSFileSystem(Map properties) { - this.properties = new HashMap<>(properties); + this(StorageBackend.StorageType.HDFS, properties); } - private FileSystem getHdfsClient(String remotePath) - throws UserException { + public DFSFileSystem(StorageBackend.StorageType type, Map properties) { + super(type.name(), type); + this.properties.putAll(properties); + } + + @Override + protected FileSystem getFileSystem(String remotePath) throws UserException { if (dfsFileSystem != null) { return dfsFileSystem; } @@ -103,12 +108,12 @@ public class DFSFileSystem extends RemoteFileSystem { public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) { LOG.debug("download from {} to {}, file size: {}.", remoteFilePath, localFilePath, fileSize); final long start = System.currentTimeMillis(); - FSDataInputStream fsDataInputStream; - try { - fsDataInputStream = operations.openReader(remoteFilePath, 0); - } catch (Exception e) { - return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); + HDFSOpParams hdfsOpParams = OpParams.of(remoteFilePath); + Status st = operations.openReader(hdfsOpParams); + if (st != Status.OK) { + return st; } + FSDataInputStream fsDataInputStream = hdfsOpParams.fsDataInputStream(); LOG.info("finished to open reader. download {} to {}.", remoteFilePath, localFilePath); // delete local file if exist @@ -132,9 +137,9 @@ public class DFSFileSystem extends RemoteFileSystem { "failed to create local file: " + localFilePath + ", msg: " + e.getMessage()); } - String lastErrMsg = null; + String lastErrMsg; Status status = Status.OK; - try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(localFile))) { + try (BufferedOutputStream out = new BufferedOutputStream(Files.newOutputStream(localFile.toPath()))) { final long bufSize = 1024 * 1024; // 1MB long leftSize = fileSize; long readOffset = 0; @@ -165,7 +170,7 @@ public class DFSFileSystem extends RemoteFileSystem { } catch (IOException e) { return new Status(Status.ErrCode.COMMON_ERROR, "Got exception: " + e.getMessage()); } finally { - Status closeStatus = operations.closeReader(fsDataInputStream); + Status closeStatus = operations.closeReader(OpParams.of(fsDataInputStream)); if (!closeStatus.ok()) { LOG.warn(closeStatus.getErrMsg()); if (status.ok()) { @@ -256,7 +261,7 @@ public class DFSFileSystem extends RemoteFileSystem { try { URI pathUri = URI.create(remotePath); Path inputFilePath = new Path(pathUri.getPath()); - FileSystem fileSystem = getHdfsClient(remotePath); + FileSystem fileSystem = getFileSystem(remotePath); boolean isPathExist = fileSystem.exists(inputFilePath); if (!isPathExist) { return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath); @@ -271,12 +276,12 @@ public class DFSFileSystem extends RemoteFileSystem { @Override public Status directUpload(String content, String remoteFile) { - FSDataOutputStream fsDataOutputStream = null; - try { - fsDataOutputStream = operations.openWriter(remoteFile); - } catch (Exception e) { - return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); + HDFSOpParams hdfsOpParams = OpParams.of(remoteFile); + Status wst = operations.openWriter(hdfsOpParams); + if (wst != Status.OK) { + return wst; } + FSDataOutputStream fsDataOutputStream = hdfsOpParams.fsDataOutputStream(); LOG.info("finished to open writer. directly upload to remote path {}.", remoteFile); Status status = Status.OK; @@ -286,7 +291,7 @@ public class DFSFileSystem extends RemoteFileSystem { LOG.error("errors while write data to output stream", e); status = new Status(Status.ErrCode.COMMON_ERROR, "write exception: " + e.getMessage()); } finally { - Status closeStatus = operations.closeWriter(fsDataOutputStream); + Status closeStatus = operations.closeWriter(OpParams.of(fsDataOutputStream)); if (!closeStatus.ok()) { LOG.warn(closeStatus.getErrMsg()); if (status.ok()) { @@ -301,13 +306,12 @@ public class DFSFileSystem extends RemoteFileSystem { public Status upload(String localPath, String remotePath) { long start = System.currentTimeMillis(); LOG.debug("local path {}, remote path {}", localPath, remotePath); - FSDataOutputStream fsDataOutputStream = null; - try { - fsDataOutputStream = operations.openWriter(remotePath); - } catch (Exception e) { - return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); + HDFSOpParams hdfsOpParams = OpParams.of(remotePath); + Status wst = operations.openWriter(hdfsOpParams); + if (wst != Status.OK) { + return wst; } - + FSDataOutputStream fsDataOutputStream = hdfsOpParams.fsDataOutputStream(); LOG.info("finished to open writer. directly upload to remote path {}.", remotePath); // read local file and write remote File localFile = new File(localPath); @@ -342,7 +346,7 @@ public class DFSFileSystem extends RemoteFileSystem { } catch (IOException e1) { return new Status(Status.ErrCode.COMMON_ERROR, "encounter io exception: " + e1.getMessage()); } finally { - Status closeStatus = operations.closeWriter(fsDataOutputStream); + Status closeStatus = operations.closeWriter(OpParams.of(fsDataOutputStream)); if (!closeStatus.ok()) { LOG.warn(closeStatus.getErrMsg()); if (status.ok()) { @@ -368,7 +372,7 @@ public class DFSFileSystem extends RemoteFileSystem { if (!srcPathUri.getAuthority().trim().equals(destPathUri.getAuthority().trim())) { return new Status(Status.ErrCode.COMMON_ERROR, "only allow rename in same file system"); } - FileSystem fileSystem = getHdfsClient(destPath); + FileSystem fileSystem = getFileSystem(destPath); Path srcfilePath = new Path(srcPathUri.getPath()); Path destfilePath = new Path(destPathUri.getPath()); boolean isRenameSuccess = fileSystem.rename(srcfilePath, destfilePath); @@ -391,7 +395,7 @@ public class DFSFileSystem extends RemoteFileSystem { try { URI pathUri = URI.create(remotePath); Path inputFilePath = new Path(pathUri.getPath()); - FileSystem fileSystem = getHdfsClient(remotePath); + FileSystem fileSystem = getFileSystem(remotePath); fileSystem.delete(inputFilePath, true); } catch (UserException e) { return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); @@ -404,21 +408,6 @@ public class DFSFileSystem extends RemoteFileSystem { return Status.OK; } - @Override - public RemoteIterator listLocatedStatus(String remotePath) throws UserException { - FileSystem fileSystem = getHdfsClient(remotePath); - try { - return fileSystem.listLocatedStatus(new Path(remotePath)); - } catch (IOException e) { - throw new UserException("Failed to list located status for path: " + remotePath, e); - } - } - - @Override - public Status list(String remotePath, List result) { - return list(remotePath, result, true); - } - /** * get files in remotePath of HDFS. * @@ -431,7 +420,7 @@ public class DFSFileSystem extends RemoteFileSystem { public Status list(String remotePath, List result, boolean fileNameOnly) { try { URI pathUri = URI.create(remotePath); - FileSystem fileSystem = getHdfsClient(remotePath); + FileSystem fileSystem = getFileSystem(remotePath); Path pathPattern = new Path(pathUri.getPath()); FileStatus[] files = fileSystem.globStatus(pathPattern); if (files == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/JFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/JFSFileSystem.java new file mode 100644 index 0000000000..ffabb211d0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/JFSFileSystem.java @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.remote.dfs; + +import org.apache.doris.analysis.StorageBackend; + +import java.util.Map; + +public class JFSFileSystem extends DFSFileSystem { + public JFSFileSystem(Map properties) { + super(StorageBackend.StorageType.JFS, properties); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/OFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/OFSFileSystem.java new file mode 100644 index 0000000000..dd69a30039 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/OFSFileSystem.java @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.remote.dfs; + +import org.apache.doris.analysis.StorageBackend; + +import java.util.Map; + +public class OFSFileSystem extends DFSFileSystem { + public OFSFileSystem(Map properties) { + super(StorageBackend.StorageType.OFS, properties); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java index 4d2b1ebbfa..e1be748598 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java @@ -27,10 +27,9 @@ import org.apache.doris.common.util.S3URI; import org.apache.doris.datasource.credentials.CloudCredentialWithEndpoint; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.S3Properties; -import org.apache.doris.fs.obj.BlobStorage; +import org.apache.doris.fs.FileSystemFactory; import org.apache.doris.thrift.TFileType; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import java.util.HashMap; @@ -89,17 +88,12 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction { locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle); if (FeConstants.runningUnitTest) { // Just check - BlobStorage.create(null, StorageBackend.StorageType.S3, locationProperties); + FileSystemFactory.get(StorageBackend.StorageType.S3, locationProperties); } else { parseFile(); } } - @VisibleForTesting - public static Map getParams(Map params) throws AnalysisException { - return getValidParams(params); - } - private static Map getValidParams(Map params) throws AnalysisException { if (!params.containsKey(S3_URI)) { throw new AnalysisException("Missing required property: " + S3_URI); diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java b/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java similarity index 65% rename from fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java rename to fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java index 208aff157a..375a5de1ab 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/fs/obj/S3FileSystemTest.java @@ -15,10 +15,14 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.backup; +package org.apache.doris.fs.obj; +import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.backup.RemoteFile; +import org.apache.doris.backup.Status; import org.apache.doris.datasource.property.PropertyConverter; -import org.apache.doris.fs.obj.S3Storage; +import org.apache.doris.fs.FileSystemFactory; +import org.apache.doris.fs.remote.S3FileSystem; import org.apache.commons.codec.digest.DigestUtils; import org.junit.Assert; @@ -40,11 +44,11 @@ import java.util.Random; import java.util.UUID; @Ignore -public class S3StorageTest { +public class S3FileSystemTest { private static String basePath; private final String bucket = "s3://doris-test/"; private Map properties; - private S3Storage storage; + private S3FileSystem fileSystem; private String testFile; private String content; @@ -62,7 +66,7 @@ public class S3StorageTest { properties.put(PropertyConverter.USE_PATH_STYLE, "false"); properties.put("AWS_REGION", "bj"); - storage = new S3Storage(properties); + fileSystem = (S3FileSystem) FileSystemFactory.get(StorageBackend.StorageType.S3, properties); testFile = bucket + basePath + "/Ode_to_the_West_Wind"; content = @@ -80,17 +84,17 @@ public class S3StorageTest { + "With living hues and odors plain and hill:\n" + "Wild Spirit, which art moving everywhere;\n" + "Destroyer and preserver; hear, oh, hear!"; - Assert.assertEquals(Status.OK, storage.directUpload(content, testFile)); + Assert.assertEquals(Status.OK, fileSystem.directUpload(content, testFile)); } @Test public void downloadWithFileSize() throws IOException { File localFile = File.createTempFile("s3unittest", ".dat"); localFile.deleteOnExit(); - Status status = storage.downloadWithFileSize(testFile, localFile.getAbsolutePath(), content.getBytes().length); + Status status = fileSystem.downloadWithFileSize(testFile, localFile.getAbsolutePath(), content.getBytes().length); Assert.assertEquals(Status.OK, status); Assert.assertEquals(DigestUtils.md5Hex(content.getBytes()), DigestUtils.md5Hex(new FileInputStream(localFile))); - status = storage.downloadWithFileSize(bucket + basePath + "/Ode_to_the_West_Wind", localFile.getAbsolutePath(), content.getBytes().length + 1); + status = fileSystem.downloadWithFileSize(bucket + basePath + "/Ode_to_the_West_Wind", localFile.getAbsolutePath(), content.getBytes().length + 1); Assert.assertNotEquals(Status.OK, status); } @@ -105,11 +109,11 @@ public class S3StorageTest { os.write(buf); os.close(); String remote = bucket + basePath + "/" + localFile.getName(); - Status status = storage.upload(localFile.getAbsolutePath(), remote); + Status status = fileSystem.upload(localFile.getAbsolutePath(), remote); Assert.assertEquals(Status.OK, status); File localFile2 = File.createTempFile("s3unittest", ".dat"); localFile2.deleteOnExit(); - status = storage.downloadWithFileSize(remote, localFile2.getAbsolutePath(), 1024 * 1024); + status = fileSystem.downloadWithFileSize(remote, localFile2.getAbsolutePath(), 1024 * 1024); Assert.assertEquals(Status.OK, status); Assert.assertEquals(DigestUtils.md5Hex(new FileInputStream(localFile)), DigestUtils.md5Hex(new FileInputStream(localFile2))); @@ -117,54 +121,54 @@ public class S3StorageTest { @Test public void copy() { - Assert.assertEquals(Status.OK, storage.copy(testFile, testFile + ".bak")); - Assert.assertEquals(Status.OK, storage.checkPathExist(testFile + ".bak")); - Assert.assertNotEquals(Status.OK, storage.copy(testFile + ".bakxxx", testFile + ".bak")); + Assert.assertEquals(Status.OK, fileSystem.copy(testFile, testFile + ".bak")); + Assert.assertEquals(Status.OK, fileSystem.exists(testFile + ".bak")); + Assert.assertNotEquals(Status.OK, fileSystem.copy(testFile + ".bakxxx", testFile + ".bak")); } @Test public void rename() { - Assert.assertEquals(Status.OK, storage.directUpload(content, testFile + ".bak")); - storage.rename(testFile + ".bak", testFile + ".bak1"); - Assert.assertEquals(Status.ErrCode.NOT_FOUND, storage.checkPathExist(testFile + ".bak").getErrCode()); - Assert.assertEquals(Status.OK, storage.checkPathExist(testFile + ".bak1")); + Assert.assertEquals(Status.OK, fileSystem.directUpload(content, testFile + ".bak")); + fileSystem.rename(testFile + ".bak", testFile + ".bak1"); + Assert.assertEquals(Status.ErrCode.NOT_FOUND, fileSystem.exists(testFile + ".bak").getErrCode()); + Assert.assertEquals(Status.OK, fileSystem.exists(testFile + ".bak1")); } @Test public void delete() { String deleteFile = testFile + ".to_be_delete"; - Assert.assertEquals(Status.OK, storage.directUpload(content, deleteFile)); - Assert.assertEquals(Status.OK, storage.delete(deleteFile)); - Assert.assertEquals(Status.ErrCode.NOT_FOUND, storage.checkPathExist(deleteFile).getErrCode()); - Assert.assertEquals(Status.OK, storage.delete(deleteFile + "xxxx")); + Assert.assertEquals(Status.OK, fileSystem.directUpload(content, deleteFile)); + Assert.assertEquals(Status.OK, fileSystem.delete(deleteFile)); + Assert.assertEquals(Status.ErrCode.NOT_FOUND, fileSystem.exists(deleteFile).getErrCode()); + Assert.assertEquals(Status.OK, fileSystem.delete(deleteFile + "xxxx")); } @Test public void list() { List result = new ArrayList<>(); String listPath = bucket + basePath + "_list" + "/Ode_to_the_West_Wind"; - Assert.assertEquals(Status.OK, storage.directUpload(content, listPath + ".1")); - Assert.assertEquals(Status.OK, storage.directUpload(content, listPath + ".2")); - Assert.assertEquals(Status.OK, storage.directUpload(content, listPath + ".3")); - Assert.assertEquals(Status.OK, storage.list(bucket + basePath + "_list/*", result)); + Assert.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".1")); + Assert.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".2")); + Assert.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".3")); + Assert.assertEquals(Status.OK, fileSystem.list(bucket + basePath + "_list/*", result)); Assert.assertEquals(3, result.size()); } @Test public void makeDir() { String path = bucket + basePath + "/test_path"; - Assert.assertEquals(Status.OK, storage.makeDir(path)); - Assert.assertNotEquals(Status.OK, storage.checkPathExist(path)); + Assert.assertEquals(Status.OK, fileSystem.makeDir(path)); + Assert.assertNotEquals(Status.OK, fileSystem.exists(path)); String path1 = bucket + basePath + "/test_path1/"; - Assert.assertEquals(Status.OK, storage.makeDir(path1)); - Assert.assertEquals(Status.OK, storage.checkPathExist(path1)); + Assert.assertEquals(Status.OK, fileSystem.makeDir(path1)); + Assert.assertEquals(Status.OK, fileSystem.exists(path1)); } @Test public void checkPathExist() { - Status status = storage.checkPathExist(testFile); + Status status = fileSystem.exists(testFile); Assert.assertEquals(Status.OK, status); - status = storage.checkPathExist(testFile + ".NOT_EXIST"); + status = fileSystem.exists(testFile + ".NOT_EXIST"); Assert.assertEquals(Status.ErrCode.NOT_FOUND, status.getErrCode()); } }