[refactor](fs)(step2)separate the storage and filesystem methods (#19012)

Co-authored-by: jinzhe <jinzhe@selectdb.com>
This commit is contained in:
slothever
2023-04-26 15:06:31 +08:00
committed by GitHub
parent 6356146274
commit 45874bbf62
29 changed files with 1288 additions and 532 deletions

View File

@ -23,7 +23,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.datasource.property.constants.BosProperties;
import org.apache.doris.fs.obj.BlobStorage;
import org.apache.doris.fs.PersistentFileSystem;
import org.apache.doris.thrift.TFileType;
import com.google.common.collect.Maps;
@ -134,7 +134,7 @@ public class BrokerDesc extends StorageDesc implements Writable {
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, name);
properties.put(BlobStorage.STORAGE_TYPE, storageType.name());
properties.put(PersistentFileSystem.STORAGE_TYPE, storageType.name());
out.writeInt(properties.size());
for (Map.Entry<String, String> entry : properties.entrySet()) {
Text.writeString(out, entry.getKey());
@ -152,7 +152,7 @@ public class BrokerDesc extends StorageDesc implements Writable {
properties.put(key, val);
}
StorageBackend.StorageType st = StorageBackend.StorageType.BROKER;
String typeStr = properties.remove(BlobStorage.STORAGE_TYPE);
String typeStr = properties.remove(PersistentFileSystem.STORAGE_TYPE);
if (typeStr != null) {
try {
st = StorageBackend.StorageType.valueOf(typeStr);

View File

@ -24,7 +24,7 @@ import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.obj.S3Storage;
import org.apache.doris.fs.remote.S3FileSystem;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@ -128,17 +128,17 @@ public class S3Resource extends Resource {
propertiesPing.put(S3Properties.Env.REGION, credential.getRegion());
propertiesPing.put(PropertyConverter.USE_PATH_STYLE, "false");
properties.putAll(propertiesPing);
S3Storage storage = new S3Storage(properties);
S3FileSystem fileSystem = new S3FileSystem(properties);
String testFile = bucket + rootPath + "/test-object-valid.txt";
String content = "doris will be better";
try {
Status status = storage.directUpload(content, testFile);
Status status = fileSystem.directUpload(content, testFile);
if (status != Status.OK) {
LOG.warn("ping update file status: {}, properties: {}", status, propertiesPing);
return false;
}
} finally {
Status delete = storage.delete(testFile);
Status delete = fileSystem.delete(testFile);
if (delete != Status.OK) {
LOG.warn("ping delete file status: {}, properties: {}", delete, propertiesPing);
return false;

View File

@ -26,6 +26,15 @@ import org.apache.hadoop.fs.RemoteIterator;
import java.util.List;
/**
* File system interface.
* All file operations should use DFSFileSystem.
* @see org.apache.doris.fs.remote.dfs.DFSFileSystem
* If the file system use the object storage's SDK, use ObjStorage
* @see org.apache.doris.fs.remote.ObjFileSystem
* Read and Write operation put in FileOperations
* @see org.apache.doris.fs.operations.FileOperations
*/
public interface FileSystem {
Status exists(String remotePath);
@ -39,15 +48,18 @@ public interface FileSystem {
Status delete(String remotePath);
Status makeDir(String remotePath);
default RemoteIterator<LocatedFileStatus> listLocatedStatus(String remotePath) throws UserException {
throw new UserException("Not support to listLocatedStatus.");
}
// List files in remotePath
// The remote file name will only contains file name only(Not full path)
Status list(String remotePath, List<RemoteFile> result);
default Status list(String remotePath, List<RemoteFile> result) {
return list(remotePath, result, true);
}
Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly);
Status makeDir(String remotePath);
}

View File

@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.S3FileSystem;
import org.apache.doris.fs.remote.dfs.DFSFileSystem;
import org.apache.doris.fs.remote.dfs.JFSFileSystem;
import org.apache.doris.fs.remote.dfs.OFSFileSystem;
import java.util.Map;
public class FileSystemFactory {
public static FileSystem get(StorageBackend.StorageType type, Map<String, String> properties) {
// use for test
return get(type.name(), type, properties);
}
public static FileSystem get(String name, StorageBackend.StorageType type, Map<String, String> properties) {
// TODO: rename StorageBackend.StorageType
if (type == StorageBackend.StorageType.S3) {
return new S3FileSystem(properties);
} else if (type == StorageBackend.StorageType.HDFS || type == StorageBackend.StorageType.GFS) {
return new DFSFileSystem(properties);
} else if (type == StorageBackend.StorageType.OFS) {
return new OFSFileSystem(properties);
} else if (type == StorageBackend.StorageType.JFS) {
return new JFSFileSystem(properties);
} else if (type == StorageBackend.StorageType.BROKER) {
return new BrokerFileSystem(name, properties);
} else {
throw new UnsupportedOperationException(type.toString() + "backend is not implemented");
}
}
}

View File

@ -17,5 +17,49 @@
package org.apache.doris.fs;
public class LocalFileSystem {
import org.apache.doris.backup.RemoteFile;
import org.apache.doris.backup.Status;
import java.util.List;
public class LocalFileSystem implements FileSystem {
@Override
public Status exists(String remotePath) {
return null;
}
@Override
public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) {
return null;
}
@Override
public Status upload(String localPath, String remotePath) {
return null;
}
@Override
public Status directUpload(String content, String remoteFile) {
return null;
}
@Override
public Status rename(String origFilePath, String destFilePath) {
return null;
}
@Override
public Status delete(String remotePath) {
return null;
}
@Override
public Status makeDir(String remotePath) {
return null;
}
@Override
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
return null;
}
}

View File

@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import com.google.common.collect.Maps;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
/**
* Use for persistence, Repository will persist properties of file system.
*/
public abstract class PersistentFileSystem implements FileSystem, Writable {
public static final String STORAGE_TYPE = "_DORIS_STORAGE_TYPE_";
protected Map<String, String> properties = Maps.newHashMap();
protected String name;
protected StorageBackend.StorageType type;
public PersistentFileSystem(String name, StorageBackend.StorageType type) {
this.name = name;
this.type = type;
}
public Map<String, String> getProperties() {
return properties;
}
public StorageBackend.StorageType getStorageType() {
return type;
}
/**
*
* @param in persisted data
* @return file systerm
*/
public static FileSystem read(DataInput in) throws IOException {
String name = Text.readString(in);
Map<String, String> properties = Maps.newHashMap();
StorageBackend.StorageType type = StorageBackend.StorageType.BROKER;
int size = in.readInt();
for (int i = 0; i < size; i++) {
String key = Text.readString(in);
String value = Text.readString(in);
properties.put(key, value);
}
if (properties.containsKey(STORAGE_TYPE)) {
type = StorageBackend.StorageType.valueOf(properties.get(STORAGE_TYPE));
properties.remove(STORAGE_TYPE);
}
return FileSystemFactory.get(name, type, properties);
}
public void write(DataOutput out) throws IOException {
// must write type first
Text.writeString(out, name);
properties.put(STORAGE_TYPE, type.name());
out.writeInt(properties.size());
for (Map.Entry<String, String> entry : properties.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}
}
}

View File

@ -37,13 +37,17 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* @see org.apache.doris.fs.PersistentFileSystem
* @see org.apache.doris.fs.FileSystemFactory
*/
@Deprecated
public abstract class BlobStorage implements Writable {
public static final String STORAGE_TYPE = "_DORIS_STORAGE_TYPE_";
private Map<String, String> properties = Maps.newHashMap();
private String name;
private StorageBackend.StorageType type;
private String location;
public static String clientId() {
return FrontendOptions.getLocalHostAddress() + ":" + Config.edit_log_port;
@ -88,14 +92,6 @@ public abstract class BlobStorage implements Writable {
return BlobStorage.create(name, type, properties);
}
public String getLocation() {
return location;
}
public void setLocation(String location) {
this.location = location;
}
public String getName() {
return name;
}

View File

@ -74,6 +74,10 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
/**
* @see org.apache.doris.fs.remote.BrokerFileSystem
*/
@Deprecated
public class BrokerStorage extends BlobStorage {
private static final Logger LOG = LogManager.getLogger(BrokerStorage.class);

View File

@ -56,8 +56,9 @@ import java.util.Map;
/**
* HdfsStorage encapsulate interfaces accessing HDFS directly.
*
* @see org.apache.doris.fs.remote.dfs.DFSFileSystem
*/
@Deprecated
public class HdfsStorage extends BlobStorage {
private static final Logger LOG = LogManager.getLogger(HdfsStorage.class);
private final Map<String, String> hdfsProperties;

View File

@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.obj;
import org.apache.doris.backup.Status;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.commons.lang3.tuple.Triple;
import org.jetbrains.annotations.Nullable;
import software.amazon.awssdk.core.sync.RequestBody;
import java.io.File;
/**
* It is just used for reading remote object storage on cloud.
* @param <C> cloud SDK Client
*/
public interface ObjStorage<C> {
C getClient(String bucket) throws UserException;
Triple<String, String, String> getStsToken() throws DdlException;
Status headObject(String remotePath);
Status getObject(String remoteFilePath, File localFile);
Status putObject(String remotePath, @Nullable RequestBody requestBody);
Status deleteObject(String remotePath);
Status copyObject(String origFilePath, String destFilePath);
RemoteObjects listObjects(String remotePath, String continuationToken) throws DdlException;
default String normalizePrefix(String prefix) {
return prefix.isEmpty() ? "" : (prefix.endsWith("/") ? prefix : String.format("%s/", prefix));
}
default String getRelativePath(String prefix, String key) throws DdlException {
String expectedPrefix = normalizePrefix(prefix);
if (!key.startsWith(expectedPrefix)) {
throw new DdlException(
"List a object whose key: " + key + " does not start with object prefix: " + expectedPrefix);
}
return key.substring(expectedPrefix.length());
}
}

View File

@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.obj;
public class RemoteObject {
private final String key;
private final String relativePath;
private final String etag;
private final long size;
public RemoteObject(String key, String relativePath, String etag, long size) {
this.key = key;
this.relativePath = relativePath;
this.etag = etag;
this.size = size;
}
public String getKey() {
return key;
}
public String getRelativePath() {
return relativePath;
}
public String getEtag() {
return etag;
}
public long getSize() {
return size;
}
@Override
public String toString() {
return "RemoteObject{" + "key='" + key + '\'' + ", relativePath='" + relativePath + '\'' + ", etag='" + etag
+ '\'' + ", size=" + size + '}';
}
}

View File

@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.obj;
import java.util.List;
public class RemoteObjects {
private final List<RemoteObject> objectList;
private final boolean isTruncated;
private final String continuationToken;
public RemoteObjects(List<RemoteObject> objectList, boolean isTruncated, String continuationToken) {
this.objectList = objectList;
this.isTruncated = isTruncated;
this.continuationToken = continuationToken;
}
public List<RemoteObject> getObjectList() {
return objectList;
}
public boolean isTruncated() {
return isTruncated;
}
public String getContinuationToken() {
return continuationToken;
}
@Override
public String toString() {
return "RemoteObjects{" + "objectList=" + objectList + ", isTruncated=" + isTruncated
+ ", continuationToken='" + continuationToken + '\'' + '}';
}
}

View File

@ -0,0 +1,306 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.obj;
import org.apache.doris.backup.Status;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3URI;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.http.HttpStatus;
import org.apache.http.client.utils.URIBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
import java.io.File;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
public class S3ObjStorage implements ObjStorage<S3Client> {
private static final Logger LOG = LogManager.getLogger(S3ObjStorage.class);
private S3Client client;
protected Map<String, String> properties;
// false: the s3 client will automatically convert endpoint to virtual-hosted style, eg:
// endpoint: http://s3.us-east-2.amazonaws.com
// bucket/path: my_bucket/file.txt
// auto convert: http://my_bucket.s3.us-east-2.amazonaws.com/file.txt
// true: the s3 client will NOT automatically convert endpoint to virtual-hosted style, we need to do some tricks:
// endpoint: http://cos.ap-beijing.myqcloud.com
// bucket/path: my_bucket/file.txt
// convert manually: See S3URI()
private boolean forceHostedStyle = false;
public S3ObjStorage(Map<String, String> properties) {
this.properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
setProperties(properties);
}
private void setProperties(Map<String, String> properties) {
this.properties.putAll(properties);
try {
S3Properties.requiredS3Properties(this.properties);
} catch (DdlException e) {
throw new IllegalArgumentException(e);
}
// Virtual hosted-style is recommended in the s3 protocol.
// The path-style has been abandoned, but for some unexplainable reasons,
// the s3 client will determine whether the endpiont starts with `s3`
// when generating a virtual hosted-sytle request.
// If not, it will not be converted ( https://github.com/aws/aws-sdk-java-v2/pull/763),
// but the endpoints of many cloud service providers for object storage do not start with s3,
// so they cannot be converted to virtual hosted-sytle.
// Some of them, such as aliyun's oss, only support virtual hosted-style,
// and some of them(ceph) may only support
// path-style, so we need to do some additional conversion.
//
// use_path_style | !use_path_style
// S3 forceHostedStyle=false | forceHostedStyle=false
// !S3 forceHostedStyle=false | forceHostedStyle=true
//
// That is, for S3 endpoint, ignore the `use_path_style` property, and the s3 client will automatically use
// virtual hosted-sytle.
// And for other endpoint, if `use_path_style` is true, use path style. Otherwise, use virtual hosted-sytle.
if (!this.properties.get(S3Properties.ENDPOINT).toLowerCase().contains(S3Properties.S3_PREFIX)) {
forceHostedStyle = !this.properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false")
.equalsIgnoreCase("true");
} else {
forceHostedStyle = false;
}
}
@Override
public S3Client getClient(String bucket) throws UserException {
if (client == null) {
URI tmpEndpoint = URI.create(properties.get(S3Properties.ENDPOINT));
StaticCredentialsProvider scp;
if (!properties.containsKey(S3Properties.SESSION_TOKEN)) {
AwsBasicCredentials awsBasic = AwsBasicCredentials.create(
properties.get(S3Properties.ACCESS_KEY),
properties.get(S3Properties.SECRET_KEY));
scp = StaticCredentialsProvider.create(awsBasic);
} else {
AwsSessionCredentials awsSession = AwsSessionCredentials.create(
properties.get(S3Properties.ACCESS_KEY),
properties.get(S3Properties.SECRET_KEY),
properties.get(S3Properties.SESSION_TOKEN));
scp = StaticCredentialsProvider.create(awsSession);
}
EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy
.builder()
.baseDelay(Duration.ofSeconds(1))
.maxBackoffTime(Duration.ofMinutes(1))
.build();
// retry 3 time with Equal backoff
RetryPolicy retryPolicy = RetryPolicy
.builder()
.numRetries(3)
.backoffStrategy(backoffStrategy)
.build();
ClientOverrideConfiguration clientConf = ClientOverrideConfiguration
.builder()
// set retry policy
.retryPolicy(retryPolicy)
// using AwsS3V4Signer
.putAdvancedOption(SdkAdvancedClientOption.SIGNER, AwsS3V4Signer.create())
.build();
URI endpoint = StringUtils.isEmpty(bucket) ? tmpEndpoint :
URI.create(new URIBuilder(tmpEndpoint).setHost(bucket + "." + tmpEndpoint.getHost()).toString());
client = S3Client.builder()
.endpointOverride(endpoint)
.credentialsProvider(scp)
.region(Region.of(properties.get(S3Properties.REGION)))
.overrideConfiguration(clientConf)
// disable chunkedEncoding because of bos not supported
// use virtual hosted-style access
.serviceConfiguration(S3Configuration.builder()
.chunkedEncodingEnabled(false)
.pathStyleAccessEnabled(false)
.build())
.build();
}
return client;
}
@Override
public Triple<String, String, String> getStsToken() throws DdlException {
return null;
}
@Override
public Status headObject(String remotePath) {
try {
S3URI uri = S3URI.create(remotePath, forceHostedStyle);
HeadObjectResponse response = getClient(uri.getVirtualBucket())
.headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
LOG.info("head file " + remotePath + " success: " + response.toString());
return Status.OK;
} catch (S3Exception e) {
if (e.statusCode() == HttpStatus.SC_NOT_FOUND) {
return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath);
} else {
LOG.warn("headObject failed:", e);
return new Status(Status.ErrCode.COMMON_ERROR, "headObject failed: " + e.getMessage());
}
} catch (UserException ue) {
LOG.warn("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
}
}
@Override
public Status getObject(String remoteFilePath, File localFile) {
try {
S3URI uri = S3URI.create(remoteFilePath, forceHostedStyle);
GetObjectResponse response = getClient(uri.getVirtualBucket()).getObject(
GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), localFile.toPath());
LOG.info("get file " + remoteFilePath + " success: " + response.toString());
return Status.OK;
} catch (S3Exception s3Exception) {
return new Status(
Status.ErrCode.COMMON_ERROR,
"get file from s3 error: " + s3Exception.awsErrorDetails().errorMessage());
} catch (UserException ue) {
LOG.warn("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.toString());
}
}
@Override
public Status putObject(String remotePath, @Nullable RequestBody requestBody) {
try {
S3URI uri = S3URI.create(remotePath, forceHostedStyle);
PutObjectResponse response =
getClient(uri.getVirtualBucket())
.putObject(
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
requestBody);
LOG.info("put object success: " + response.eTag());
return Status.OK;
} catch (S3Exception e) {
LOG.error("put object failed:", e);
return new Status(Status.ErrCode.COMMON_ERROR, "put object failed: " + e.getMessage());
} catch (Exception ue) {
LOG.error("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
}
}
@Override
public Status deleteObject(String remotePath) {
try {
S3URI uri = S3URI.create(remotePath, forceHostedStyle);
DeleteObjectResponse response =
getClient(uri.getVirtualBucket())
.deleteObject(
DeleteObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
LOG.info("delete file " + remotePath + " success: " + response.toString());
return Status.OK;
} catch (S3Exception e) {
LOG.warn("delete file failed: ", e);
if (e.statusCode() == HttpStatus.SC_NOT_FOUND) {
return Status.OK;
}
return new Status(Status.ErrCode.COMMON_ERROR, "delete file failed: " + e.getMessage());
} catch (UserException ue) {
LOG.warn("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
}
}
public Status copyObject(String origFilePath, String destFilePath) {
try {
S3URI origUri = S3URI.create(origFilePath);
S3URI descUri = S3URI.create(destFilePath, forceHostedStyle);
getClient(descUri.getVirtualBucket())
.copyObject(
CopyObjectRequest.builder()
.copySource(origUri.getBucket() + "/" + origUri.getKey())
.destinationBucket(descUri.getBucket())
.destinationKey(descUri.getKey())
.build());
return Status.OK;
} catch (S3Exception e) {
LOG.error("copy file failed: ", e);
return new Status(Status.ErrCode.COMMON_ERROR, "copy file failed: " + e.getMessage());
} catch (UserException ue) {
LOG.error("copy to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
}
}
@Override
public RemoteObjects listObjects(String absolutePath, String continuationToken) throws DdlException {
try {
S3URI uri = S3URI.create(absolutePath, forceHostedStyle);
String prefix = uri.getKey();
ListObjectsV2Request.Builder requestBuilder = ListObjectsV2Request.builder().bucket(uri.getBucket())
.prefix(normalizePrefix(prefix));
if (!StringUtils.isEmpty(continuationToken)) {
requestBuilder.continuationToken(continuationToken);
}
ListObjectsV2Response response = getClient(uri.getVirtualBucket()).listObjectsV2(requestBuilder.build());
List<RemoteObject> remoteObjects = new ArrayList<>();
for (S3Object c : response.contents()) {
String relativePath = getRelativePath(prefix, c.key());
remoteObjects.add(new RemoteObject(c.key(), relativePath, c.eTag(), c.size()));
}
return new RemoteObjects(remoteObjects, response.isTruncated(), response.nextContinuationToken());
} catch (Exception e) {
LOG.warn("Failed to list objects for S3", e);
throw new DdlException("Failed to list objects for S3, Error message: " + e.getMessage(), e);
}
}
}

View File

@ -72,6 +72,11 @@ import java.util.Comparator;
import java.util.List;
import java.util.Map;
/**
* @see S3ObjStorage
* @see org.apache.doris.fs.remote.S3FileSystem
*/
@Deprecated
public class S3Storage extends BlobStorage {
private static final Logger LOG = LogManager.getLogger(S3Storage.class);
private FileSystem dfsFileSystem = null;

View File

@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.remote;
package org.apache.doris.fs.operations;
import org.apache.doris.backup.Status;
import org.apache.doris.common.Config;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TBrokerCloseReaderRequest;
import org.apache.doris.thrift.TBrokerCloseWriterRequest;
import org.apache.doris.thrift.TBrokerFD;
@ -30,8 +32,6 @@ import org.apache.doris.thrift.TBrokerOpenWriterResponse;
import org.apache.doris.thrift.TBrokerOperationStatus;
import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.doris.thrift.TBrokerVersion;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -39,72 +39,86 @@ import org.apache.thrift.TException;
import java.util.Map;
public class BrokerFileOperations {
public class BrokerFileOperations implements FileOperations {
private static final Logger LOG = LogManager.getLogger(BrokerFileOperations.class);
private String name;
private final String name;
private Map<String, String> properties;
private final Map<String, String> properties;
public BrokerFileOperations(String name, Map<String, String> properties) {
this.name = name;
this.properties = properties;
}
public static String clientId() {
return FrontendOptions.getLocalHostAddress() + ":" + Config.edit_log_port;
}
public Status openReader(TPaloBrokerService.Client client, TNetworkAddress address, String remoteFilePath,
TBrokerFD fd) {
@Override
public Status openReader(OpParams opParams) {
BrokerOpParams brokerOpParams = (BrokerOpParams) opParams;
String remoteFilePath = brokerOpParams.remotePath();
try {
TBrokerOpenReaderRequest req = new TBrokerOpenReaderRequest(TBrokerVersion.VERSION_ONE, remoteFilePath,
0, BrokerFileSystem.clientId(), properties);
TBrokerOpenReaderResponse rep = client.openReader(req);
0, clientId(), properties);
TBrokerOpenReaderResponse rep = brokerOpParams.client().openReader(req);
TBrokerOperationStatus opst = rep.getOpStatus();
if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to open reader on broker " + BrokerUtil.printBroker(name, address)
"failed to open reader on broker "
+ BrokerUtil.printBroker(name, brokerOpParams.address())
+ " for file: " + remoteFilePath + ". msg: " + opst.getMessage());
}
fd.setHigh(rep.getFd().getHigh());
fd.setLow(rep.getFd().getLow());
brokerOpParams.fd().setHigh(rep.getFd().getHigh());
brokerOpParams.fd().setLow(rep.getFd().getLow());
} catch (TException e) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to open reader on broker " + BrokerUtil.printBroker(name, address)
"failed to open reader on broker "
+ BrokerUtil.printBroker(name, brokerOpParams.address())
+ " for file: " + remoteFilePath + ". msg: " + e.getMessage());
}
return Status.OK;
}
public Status closeReader(TPaloBrokerService.Client client, TNetworkAddress address, TBrokerFD fd) {
public Status closeReader(OpParams opParams) {
BrokerOpParams brokerOpParams = (BrokerOpParams) opParams;
TBrokerFD fd = brokerOpParams.fd();
try {
TBrokerCloseReaderRequest req = new TBrokerCloseReaderRequest(TBrokerVersion.VERSION_ONE, fd);
TBrokerOperationStatus st = client.closeReader(req);
TBrokerOperationStatus st = brokerOpParams.client().closeReader(req);
if (st.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to close reader on broker " + BrokerUtil.printBroker(name, address)
"failed to close reader on broker "
+ BrokerUtil.printBroker(name, brokerOpParams.address())
+ " for fd: " + fd);
}
LOG.info("finished to close reader. fd: {}.", fd);
} catch (TException e) {
return new Status(Status.ErrCode.BAD_CONNECTION,
"failed to close reader on broker " + BrokerUtil.printBroker(name, address)
"failed to close reader on broker "
+ BrokerUtil.printBroker(name, brokerOpParams.address())
+ ", fd " + fd + ", msg: " + e.getMessage());
}
return Status.OK;
}
public Status openWriter(TPaloBrokerService.Client client, TNetworkAddress address, String remoteFile,
TBrokerFD fd) {
public Status openWriter(OpParams desc) {
BrokerOpParams brokerOpParams = (BrokerOpParams) desc;
String remoteFile = brokerOpParams.remotePath();
TBrokerFD fd = brokerOpParams.fd();
try {
TBrokerOpenWriterRequest req = new TBrokerOpenWriterRequest(TBrokerVersion.VERSION_ONE,
remoteFile, TBrokerOpenMode.APPEND, BrokerFileSystem.clientId(), properties);
TBrokerOpenWriterResponse rep = client.openWriter(req);
remoteFile, TBrokerOpenMode.APPEND, clientId(), properties);
TBrokerOpenWriterResponse rep = brokerOpParams.client().openWriter(req);
TBrokerOperationStatus opst = rep.getOpStatus();
if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to open writer on broker " + BrokerUtil.printBroker(name, address)
"failed to open writer on broker "
+ BrokerUtil.printBroker(name, brokerOpParams.address())
+ " for file: " + remoteFile + ". msg: " + opst.getMessage());
}
@ -113,27 +127,32 @@ public class BrokerFileOperations {
LOG.info("finished to open writer. fd: {}. directly upload to remote path {}.", fd, remoteFile);
} catch (TException e) {
return new Status(Status.ErrCode.BAD_CONNECTION,
"failed to open writer on broker " + BrokerUtil.printBroker(name, address)
"failed to open writer on broker "
+ BrokerUtil.printBroker(name, brokerOpParams.address())
+ ", err: " + e.getMessage());
}
return Status.OK;
}
public Status closeWriter(TPaloBrokerService.Client client, TNetworkAddress address, TBrokerFD fd) {
public Status closeWriter(OpParams desc) {
BrokerOpParams brokerOpParams = (BrokerOpParams) desc;
TBrokerFD fd = brokerOpParams.fd();
try {
TBrokerCloseWriterRequest req = new TBrokerCloseWriterRequest(TBrokerVersion.VERSION_ONE, fd);
TBrokerOperationStatus st = client.closeWriter(req);
TBrokerOperationStatus st = brokerOpParams.client().closeWriter(req);
if (st.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to close writer on broker " + BrokerUtil.printBroker(name, address)
"failed to close writer on broker "
+ BrokerUtil.printBroker(name, brokerOpParams.address())
+ " for fd: " + fd);
}
LOG.info("finished to close writer. fd: {}.", fd);
} catch (TException e) {
return new Status(Status.ErrCode.BAD_CONNECTION,
"failed to close writer on broker " + BrokerUtil.printBroker(name, address)
"failed to close writer on broker "
+ BrokerUtil.printBroker(name, brokerOpParams.address())
+ ", fd " + fd + ", msg: " + e.getMessage());
}

View File

@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.operations;
import org.apache.doris.thrift.TBrokerFD;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;
public class BrokerOpParams extends OpParams {
private final TPaloBrokerService.Client client;
private final TNetworkAddress address;
private final TBrokerFD fd;
private String remoteFilePath;
protected BrokerOpParams(TPaloBrokerService.Client client, TNetworkAddress address, TBrokerFD fd) {
this(client, address, null, fd);
}
protected BrokerOpParams(TPaloBrokerService.Client client, TNetworkAddress address,
String remoteFilePath, TBrokerFD fd) {
this.client = client;
this.address = address;
this.remoteFilePath = remoteFilePath;
this.fd = fd;
}
public TPaloBrokerService.Client client() {
return client;
}
public TNetworkAddress address() {
return address;
}
public String remotePath() {
return remoteFilePath;
}
public TBrokerFD fd() {
return fd;
}
}

View File

@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.operations;
import org.apache.doris.backup.Status;
/**
* FileOperations contains the read and write operations
* Can extend the other method
*/
public interface FileOperations {
Status openReader(OpParams desc);
Status closeReader(OpParams desc);
Status openWriter(OpParams desc);
Status closeWriter(OpParams desc);
}

View File

@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.remote;
package org.apache.doris.fs.operations;
import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
@ -30,7 +30,7 @@ import org.apache.logging.log4j.Logger;
import java.io.IOException;
public class HDFSFileOperations {
public class HDFSFileOperations implements FileOperations {
private static final Logger LOG = LogManager.getLogger(HDFSFileOperations.class);
public static final int READ_BUFFER_SIZE = 128 << 10; // 128k
public static final int WRITE_BUFFER_SIZE = 128 << 10; // 128k
@ -45,33 +45,38 @@ public class HDFSFileOperations {
/**
* open remotePath for read.
*
* @param remotePath hdfs://namenode:port/path.
* @param startOffset the offset to read.
* @return FSDataInputStream if success.
* @throws UserException when get filesystem failed.
* @throws IOException when open file error.
* @param opParams hdfsOpParams.remotePath: hdfs://namenode:port/path.
* hdfsOpParams.startOffset: the offset to read.
* @return Status.OK and set fsDataInputStream if success
*/
public FSDataInputStream openReader(String remotePath, long startOffset) throws UserException, IOException {
URI pathUri = URI.create(remotePath);
Path inputFilePath = new Path(pathUri.getPath());
public Status openReader(OpParams opParams) {
HDFSOpParams hdfsOpParams = (HDFSOpParams) opParams;
try {
URI pathUri = URI.create(hdfsOpParams.remotePath());
Path inputFilePath = new Path(pathUri.getPath());
FSDataInputStream fsDataInputStream = hdfsClient.open(inputFilePath, READ_BUFFER_SIZE);
fsDataInputStream.seek(startOffset);
return fsDataInputStream;
fsDataInputStream.seek(hdfsOpParams.startOffset());
hdfsOpParams.withFsDataInputStream(fsDataInputStream);
return Status.OK;
} catch (IOException e) {
LOG.error("errors while open path", e);
throw new IOException(e.getMessage());
return new Status(Status.ErrCode.COMMON_ERROR, "failed to open reader, msg:" + e.getMessage());
} catch (UserException ex) {
LOG.error("errors while get filesystem", ex);
return new Status(Status.ErrCode.COMMON_ERROR, "failed to get filesystem, msg:" + ex.getMessage());
}
}
/**
* close for read.
*
* @param fsDataInputStream the input stream.
* @param opParams hdfsOpParams.fsDataInputStream: the input stream.
* @return Status.OK if success.
*/
public Status closeReader(FSDataInputStream fsDataInputStream) {
synchronized (fsDataInputStream) {
public Status closeReader(OpParams opParams) {
HDFSOpParams hdfsOpParams = (HDFSOpParams) opParams;
FSDataInputStream fsDataInputStream = hdfsOpParams.fsDataInputStream();
synchronized (this) {
try {
fsDataInputStream.close();
} catch (IOException e) {
@ -87,31 +92,35 @@ public class HDFSFileOperations {
/**
* open remotePath for write.
*
* @param remotePath hdfs://namenode:port/path.
* @return FSDataOutputStream
* @throws UserException when get filesystem failed.
* @throws IOException when open path error.
* @param opParams hdfsOpParams.remotePath: hdfs://namenode:port/path.
* @return Status.OK and set FSDataOutputStream if success
*/
public FSDataOutputStream openWriter(String remotePath) throws UserException, IOException {
URI pathUri = URI.create(remotePath);
Path inputFilePath = new Path(pathUri.getPath());
public Status openWriter(OpParams opParams) {
HDFSOpParams hdfsOpParams = (HDFSOpParams) opParams;
try {
return hdfsClient.create(inputFilePath, true, WRITE_BUFFER_SIZE);
URI pathUri = URI.create(hdfsOpParams.remotePath());
Path inputFilePath = new Path(pathUri.getPath());
hdfsOpParams.withFsDataOutputStream(hdfsClient.create(inputFilePath, true, WRITE_BUFFER_SIZE));
return Status.OK;
} catch (IOException e) {
LOG.error("errors while open path", e);
throw new IOException(e.getMessage());
return new Status(Status.ErrCode.COMMON_ERROR, "failed to open writer, msg:" + e.getMessage());
} catch (UserException ex) {
LOG.error("errors while get filesystem", ex);
return new Status(Status.ErrCode.COMMON_ERROR, "failed to get filesystem, msg:" + ex.getMessage());
}
}
/**
* close for write.
*
* @param fsDataOutputStream output stream.
* @param opParams hdfsOpParams.fsDataOutputStream: output stream.
* @return Status.OK if success.
*/
public Status closeWriter(FSDataOutputStream fsDataOutputStream) {
synchronized (fsDataOutputStream) {
public Status closeWriter(OpParams opParams) {
HDFSOpParams hdfsOpParams = (HDFSOpParams) opParams;
FSDataOutputStream fsDataOutputStream = hdfsOpParams.fsDataOutputStream();
synchronized (this) {
try {
fsDataOutputStream.flush();
fsDataOutputStream.close();

View File

@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.operations;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
public class HDFSOpParams extends OpParams {
private String remotePath;
private long startOffset;
private FSDataOutputStream fsDataOutputStream;
private FSDataInputStream fsDataInputStream;
protected HDFSOpParams(FSDataInputStream fsDataInputStream) {
this(null, 0, fsDataInputStream, null);
}
protected HDFSOpParams(FSDataOutputStream fsDataOutputStream) {
this(null, 0, null, fsDataOutputStream);
}
protected HDFSOpParams(String remotePath, long startOffset) {
this(remotePath, startOffset, null, null);
}
protected HDFSOpParams(String remotePath, long startOffset,
FSDataInputStream fsDataInputStream, FSDataOutputStream fsDataOutputStream) {
this.remotePath = remotePath;
this.startOffset = startOffset;
this.fsDataInputStream = fsDataInputStream;
this.fsDataOutputStream = fsDataOutputStream;
}
public String remotePath() {
return remotePath;
}
public long startOffset() {
return startOffset;
}
public FSDataOutputStream fsDataOutputStream() {
return fsDataOutputStream;
}
public FSDataInputStream fsDataInputStream() {
return fsDataInputStream;
}
public void withFsDataOutputStream(FSDataOutputStream fsDataOutputStream) {
this.fsDataOutputStream = fsDataOutputStream;
}
public void withFsDataInputStream(FSDataInputStream fsDataInputStream) {
this.fsDataInputStream = fsDataInputStream;
}
}

View File

@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.operations;
import org.apache.doris.thrift.TBrokerFD;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
/**
* All arguments of the FileOperations implementation class should get from this class
* @see org.apache.doris.fs.operations.FileOperations
* Should avoid using this class elsewhere
*/
public class OpParams {
public static BrokerOpParams of(TPaloBrokerService.Client client, TNetworkAddress address, TBrokerFD fd) {
return new BrokerOpParams(client, address, fd);
}
public static BrokerOpParams of(TPaloBrokerService.Client client, TNetworkAddress address,
String remoteFilePath, TBrokerFD fd) {
return new BrokerOpParams(client, address, remoteFilePath, fd);
}
public static HDFSOpParams of(String remotePath) {
return new HDFSOpParams(remotePath, 0);
}
public static HDFSOpParams of(FSDataOutputStream fsDataOutputStream) {
return new HDFSOpParams(fsDataOutputStream);
}
public static HDFSOpParams of(FSDataInputStream fsDataInputStream) {
return new HDFSOpParams(fsDataInputStream);
}
}

View File

@ -17,16 +17,17 @@
package org.apache.doris.fs.remote;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.RemoteFile;
import org.apache.doris.backup.Status;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.fs.operations.BrokerFileOperations;
import org.apache.doris.fs.operations.OpParams;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TBrokerCheckPathExistRequest;
import org.apache.doris.thrift.TBrokerCheckPathExistResponse;
@ -46,8 +47,6 @@ import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
@ -72,20 +71,14 @@ import java.util.Map;
public class BrokerFileSystem extends RemoteFileSystem {
private static final Logger LOG = LogManager.getLogger(BrokerFileSystem.class);
private String name;
private BrokerFileOperations operations;
private final BrokerFileOperations operations;
public BrokerFileSystem(String name, Map<String, String> properties) {
this.name = name;
super(name, StorageBackend.StorageType.BROKER);
this.properties = properties;
this.operations = new BrokerFileOperations(name, properties);
}
public static String clientId() {
return FrontendOptions.getLocalHostAddress() + ":" + Config.edit_log_port;
}
public Pair<TPaloBrokerService.Client, TNetworkAddress> getBroker() {
Pair<TPaloBrokerService.Client, TNetworkAddress> result = Pair.of(null, null);
FsBroker broker;
@ -171,7 +164,7 @@ public class BrokerFileSystem extends RemoteFileSystem {
// 2. open file reader with broker
TBrokerFD fd = new TBrokerFD();
Status opStatus = operations.openReader(client, address, remoteFilePath, fd);
Status opStatus = operations.openReader(OpParams.of(client, address, remoteFilePath, fd));
if (!opStatus.ok()) {
return opStatus;
}
@ -296,7 +289,7 @@ public class BrokerFileSystem extends RemoteFileSystem {
+ BrokerUtil.printBroker(name, address));
} finally {
// close broker reader
Status closeStatus = operations.closeReader(client, address, fd);
Status closeStatus = operations.closeReader(OpParams.of(client, address, fd));
if (!closeStatus.ok()) {
LOG.warn(closeStatus.getErrMsg());
if (status.ok()) {
@ -329,7 +322,7 @@ public class BrokerFileSystem extends RemoteFileSystem {
Status status = Status.OK;
try {
// 2. open file writer with broker
status = operations.openWriter(client, address, remoteFile, fd);
status = operations.openWriter(OpParams.of(client, address, remoteFile, fd));
if (!status.ok()) {
return status;
}
@ -349,7 +342,7 @@ public class BrokerFileSystem extends RemoteFileSystem {
+ ", broker: " + BrokerUtil.printBroker(name, address));
}
} finally {
Status closeStatus = operations.closeWriter(client, address, fd);
Status closeStatus = operations.closeWriter(OpParams.of(client, address, fd));
if (closeStatus.getErrCode() == Status.ErrCode.BAD_CONNECTION
|| status.getErrCode() == Status.ErrCode.BAD_CONNECTION) {
ClientPool.brokerPool.invalidateObject(address, client);
@ -374,7 +367,7 @@ public class BrokerFileSystem extends RemoteFileSystem {
// 2. open file write with broker
TBrokerFD fd = new TBrokerFD();
Status status = operations.openWriter(client, address, remotePath, fd);
Status status = operations.openWriter(OpParams.of(client, address, remotePath, fd));
if (!status.ok()) {
return status;
}
@ -463,7 +456,7 @@ public class BrokerFileSystem extends RemoteFileSystem {
+ ", broker: " + BrokerUtil.printBroker(name, address));
} finally {
// close write
Status closeStatus = operations.closeWriter(client, address, fd);
Status closeStatus = operations.closeWriter(OpParams.of(client, address, fd));
if (!closeStatus.ok()) {
LOG.warn(closeStatus.getErrMsg());
if (status.ok()) {
@ -562,16 +555,6 @@ public class BrokerFileSystem extends RemoteFileSystem {
return Status.OK;
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(String remotePath) throws UserException {
return null;
}
@Override
public Status list(String remotePath, List<RemoteFile> result) {
return list(remotePath, result, true);
}
// List files in remotePath
@Override
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {

View File

@ -17,5 +17,119 @@
package org.apache.doris.fs.remote;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.Status;
import org.apache.doris.fs.obj.ObjStorage;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.core.sync.RequestBody;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Comparator;
public abstract class ObjFileSystem extends RemoteFileSystem {
private static final Logger LOG = LogManager.getLogger(ObjFileSystem.class);
protected final ObjStorage<?> objStorage;
public ObjFileSystem(String name, StorageBackend.StorageType type, ObjStorage<?> objStorage) {
super(name, type);
this.objStorage = objStorage;
}
@Override
public Status exists(String remotePath) {
return objStorage.headObject(remotePath);
}
@Override
public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) {
long start = System.currentTimeMillis();
// Write the data to a local file
File localFile = new File(localFilePath);
if (localFile.exists()) {
try {
Files.walk(Paths.get(localFilePath), FileVisitOption.FOLLOW_LINKS)
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
} catch (IOException e) {
return new Status(
Status.ErrCode.COMMON_ERROR, "failed to delete exist local file: " + localFilePath);
}
}
Status st = objStorage.getObject(remoteFilePath, localFile);
if (st != Status.OK) {
return st;
}
if (localFile.length() == fileSize) {
LOG.info(
"finished to get file from {} to {} with size: {}. cost {} ms",
remoteFilePath,
localFile.toPath(),
fileSize,
(System.currentTimeMillis() - start));
return Status.OK;
} else {
return new Status(Status.ErrCode.COMMON_ERROR, localFile.toString());
}
}
@Override
public Status directUpload(String content, String remoteFile) {
Status st = objStorage.putObject(remoteFile, RequestBody.fromBytes(content.getBytes()));
if (st != Status.OK) {
return st;
}
LOG.info("upload content success.");
return Status.OK;
}
@Override
public Status upload(String localPath, String remotePath) {
Status st = objStorage.putObject(remotePath, RequestBody.fromFile(new File(localPath)));
if (st != Status.OK) {
return st;
}
LOG.info("upload file " + localPath + " success.");
return Status.OK;
}
@Override
public Status makeDir(String remotePath) {
if (!remotePath.endsWith("/")) {
remotePath += "/";
}
Status st = objStorage.putObject(remotePath, RequestBody.empty());
if (st != Status.OK) {
return st;
}
LOG.info("makeDir success.");
return Status.OK;
}
@Override
public Status rename(String origFilePath, String destFilePath) {
Status status = objStorage.copyObject(origFilePath, destFilePath);
if (status.ok()) {
return delete(origFilePath);
} else {
return status;
}
}
public Status copy(String origFilePath, String destFilePath) {
return objStorage.copyObject(origFilePath, destFilePath);
}
@Override
public Status delete(String remotePath) {
return objStorage.deleteObject(remotePath);
}
}

View File

@ -17,11 +17,34 @@
package org.apache.doris.fs.remote;
import org.apache.doris.fs.FileSystem;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.common.UserException;
import org.apache.doris.fs.PersistentFileSystem;
import java.util.Map;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
public abstract class RemoteFileSystem implements FileSystem {
import java.io.IOException;
public abstract class RemoteFileSystem extends PersistentFileSystem {
protected org.apache.hadoop.fs.FileSystem dfsFileSystem = null;
protected Map<String, String> properties;
public RemoteFileSystem(String name, StorageBackend.StorageType type) {
super(name, type);
}
protected org.apache.hadoop.fs.FileSystem getFileSystem(String remotePath) throws UserException {
throw new UserException("Not support to getFileSystem.");
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(String remotePath) throws UserException {
org.apache.hadoop.fs.FileSystem fileSystem = getFileSystem(remotePath);
try {
return fileSystem.listLocatedStatus(new Path(remotePath));
} catch (IOException e) {
throw new UserException("Failed to list located status for path: " + remotePath, e);
}
}
}

View File

@ -17,114 +17,37 @@
package org.apache.doris.fs.remote;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.RemoteFile;
import org.apache.doris.backup.Status;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3URI;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.obj.S3Storage;
import org.apache.doris.fs.obj.S3ObjStorage;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.http.HttpStatus;
import org.apache.http.client.utils.URIBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
public class S3FileSystem extends ObjFileSystem {
private static final Logger LOG = LogManager.getLogger(S3Storage.class);
private S3Client client;
// false: the s3 client will automatically convert endpoint to virtual-hosted style, eg:
// endpoint: http://s3.us-east-2.amazonaws.com
// bucket/path: my_bucket/file.txt
// auto convert: http://my_bucket.s3.us-east-2.amazonaws.com/file.txt
// true: the s3 client will NOT automatically convert endpoint to virtual-hosted style, we need to do some tricks:
// endpoint: http://cos.ap-beijing.myqcloud.com
// bucket/path: my_bucket/file.txt
// convert manually: See S3URI()
private boolean forceHostedStyle = false;
private static final Logger LOG = LogManager.getLogger(S3FileSystem.class);
public S3FileSystem(Map<String, String> properties) {
super(StorageBackend.StorageType.S3.name(), StorageBackend.StorageType.S3, new S3ObjStorage(properties));
this.properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
setProperties(properties);
}
private void setProperties(Map<String, String> properties) {
this.properties.putAll(properties);
try {
S3Properties.requiredS3Properties(this.properties);
} catch (DdlException e) {
throw new IllegalArgumentException(e);
}
// Virtual hosted-style is recommended in the s3 protocol.
// The path-style has been abandoned, but for some unexplainable reasons,
// the s3 client will determine whether the endpiont starts with `s3`
// when generating a virtual hosted-sytle request.
// If not, it will not be converted ( https://github.com/aws/aws-sdk-java-v2/pull/763),
// but the endpoints of many cloud service providers for object storage do not start with s3,
// so they cannot be converted to virtual hosted-sytle.
// Some of them, such as aliyun's oss, only support virtual hosted-style,
// and some of them(ceph) may only support
// path-style, so we need to do some additional conversion.
//
// use_path_style | !use_path_style
// S3 forceHostedStyle=false | forceHostedStyle=false
// !S3 forceHostedStyle=false | forceHostedStyle=true
//
// That is, for S3 endpoint, ignore the `use_path_style` property, and the s3 client will automatically use
// virtual hosted-sytle.
// And for other endpoint, if `use_path_style` is true, use path style. Otherwise, use virtual hosted-sytle.
if (!this.properties.get(S3Properties.ENDPOINT).toLowerCase().startsWith("s3")) {
forceHostedStyle = !this.properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false")
.equalsIgnoreCase("true");
} else {
forceHostedStyle = false;
}
}
private FileSystem getFileSystem(String remotePath) throws UserException {
@Override
protected FileSystem getFileSystem(String remotePath) throws UserException {
if (dfsFileSystem == null) {
Configuration conf = new Configuration();
System.setProperty("com.amazonaws.services.s3.enableV4", "true");
@ -138,238 +61,12 @@ public class S3FileSystem extends ObjFileSystem {
return dfsFileSystem;
}
private S3Client getClient(String bucket) throws UserException {
if (client == null) {
URI tmpEndpoint = URI.create(properties.get(S3Properties.ENDPOINT));
StaticCredentialsProvider scp;
if (!properties.containsKey(S3Properties.SESSION_TOKEN)) {
AwsBasicCredentials awsBasic = AwsBasicCredentials.create(
properties.get(S3Properties.ACCESS_KEY),
properties.get(S3Properties.SECRET_KEY));
scp = StaticCredentialsProvider.create(awsBasic);
} else {
AwsSessionCredentials awsSession = AwsSessionCredentials.create(
properties.get(S3Properties.ACCESS_KEY),
properties.get(S3Properties.SECRET_KEY),
properties.get(S3Properties.SESSION_TOKEN));
scp = StaticCredentialsProvider.create(awsSession);
}
EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy
.builder()
.baseDelay(Duration.ofSeconds(1))
.maxBackoffTime(Duration.ofMinutes(1))
.build();
// retry 3 time with Equal backoff
RetryPolicy retryPolicy = RetryPolicy
.builder()
.numRetries(3)
.backoffStrategy(backoffStrategy)
.build();
ClientOverrideConfiguration clientConf = ClientOverrideConfiguration
.builder()
// set retry policy
.retryPolicy(retryPolicy)
// using AwsS3V4Signer
.putAdvancedOption(SdkAdvancedClientOption.SIGNER, AwsS3V4Signer.create())
.build();
URI endpoint = StringUtils.isEmpty(bucket) ? tmpEndpoint :
URI.create(new URIBuilder(tmpEndpoint).setHost(bucket + "." + tmpEndpoint.getHost()).toString());
client = S3Client.builder()
.endpointOverride(endpoint)
.credentialsProvider(scp)
.region(Region.of(properties.get(S3Properties.REGION)))
.overrideConfiguration(clientConf)
// disable chunkedEncoding because of bos not supported
// use virtual hosted-style access
.serviceConfiguration(S3Configuration.builder()
.chunkedEncodingEnabled(false)
.pathStyleAccessEnabled(false)
.build())
.build();
}
return client;
}
@Override
public Status exists(String remotePath) {
try {
S3URI uri = S3URI.create(remotePath, forceHostedStyle);
getClient(uri.getVirtualBucket())
.headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
return Status.OK;
} catch (S3Exception e) {
if (e.statusCode() == HttpStatus.SC_NOT_FOUND) {
return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath);
} else {
LOG.warn("headObject failed:", e);
return new Status(Status.ErrCode.COMMON_ERROR, "headObject failed: " + e.getMessage());
}
} catch (UserException ue) {
LOG.warn("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
}
}
@Override
public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) {
long start = System.currentTimeMillis();
// Write the data to a local file
File localFile = new File(localFilePath);
if (localFile.exists()) {
try {
Files.walk(Paths.get(localFilePath), FileVisitOption.FOLLOW_LINKS)
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
} catch (IOException e) {
return new Status(
Status.ErrCode.COMMON_ERROR, "failed to delete exist local file: " + localFilePath);
}
}
try {
S3URI uri = S3URI.create(remoteFilePath, forceHostedStyle);
GetObjectResponse response = getClient(uri.getVirtualBucket()).getObject(
GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), localFile.toPath());
if (localFile.length() == fileSize) {
LOG.info(
"finished to download from {} to {} with size: {}. cost {} ms",
remoteFilePath,
localFilePath,
fileSize,
(System.currentTimeMillis() - start));
return Status.OK;
} else {
return new Status(Status.ErrCode.COMMON_ERROR, response.toString());
}
} catch (S3Exception s3Exception) {
return new Status(
Status.ErrCode.COMMON_ERROR,
"get file from s3 error: " + s3Exception.awsErrorDetails().errorMessage());
} catch (UserException ue) {
LOG.warn("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.toString());
}
}
@Override
public Status directUpload(String content, String remoteFile) {
try {
S3URI uri = S3URI.create(remoteFile, forceHostedStyle);
PutObjectResponse response =
getClient(uri.getVirtualBucket())
.putObject(
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
RequestBody.fromBytes(content.getBytes()));
LOG.info("upload content success: " + response.eTag());
return Status.OK;
} catch (S3Exception e) {
LOG.warn("write content failed:", e);
return new Status(Status.ErrCode.COMMON_ERROR, "write content failed: " + e.getMessage());
} catch (UserException ue) {
LOG.warn("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
} catch (Exception e) {
LOG.warn("connect to s3 failed: ", e);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + e.getMessage());
}
}
public Status copy(String origFilePath, String destFilePath) {
try {
S3URI origUri = S3URI.create(origFilePath);
S3URI descUri = S3URI.create(destFilePath, forceHostedStyle);
getClient(descUri.getVirtualBucket())
.copyObject(
CopyObjectRequest.builder()
.copySource(origUri.getBucket() + "/" + origUri.getKey())
.destinationBucket(descUri.getBucket())
.destinationKey(descUri.getKey())
.build());
return Status.OK;
} catch (S3Exception e) {
LOG.error("copy file failed: ", e);
return new Status(Status.ErrCode.COMMON_ERROR, "copy file failed: " + e.getMessage());
} catch (UserException ue) {
LOG.error("copy to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
}
}
@Override
public Status upload(String localPath, String remotePath) {
try {
S3URI uri = S3URI.create(remotePath, forceHostedStyle);
PutObjectResponse response =
getClient(uri.getVirtualBucket())
.putObject(
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
RequestBody.fromFile(new File(localPath)));
LOG.info("upload file " + localPath + " success: " + response.eTag());
return Status.OK;
} catch (S3Exception e) {
LOG.error("write file failed:", e);
return new Status(Status.ErrCode.COMMON_ERROR, "write file failed: " + e.getMessage());
} catch (UserException ue) {
LOG.error("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
}
}
@Override
public Status rename(String origFilePath, String destFilePath) {
Status status = copy(origFilePath, destFilePath);
if (status.ok()) {
return delete(origFilePath);
} else {
return status;
}
}
@Override
public Status delete(String remotePath) {
try {
S3URI uri = S3URI.create(remotePath, forceHostedStyle);
DeleteObjectResponse response =
getClient(uri.getVirtualBucket())
.deleteObject(
DeleteObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
LOG.info("delete file " + remotePath + " success: " + response.toString());
return Status.OK;
} catch (S3Exception e) {
LOG.warn("delete file failed: ", e);
if (e.statusCode() == HttpStatus.SC_NOT_FOUND) {
return Status.OK;
}
return new Status(Status.ErrCode.COMMON_ERROR, "delete file failed: " + e.getMessage());
} catch (UserException ue) {
LOG.warn("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
}
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(String remotePath) throws UserException {
FileSystem fileSystem = getFileSystem(remotePath);
try {
return fileSystem.listLocatedStatus(new org.apache.hadoop.fs.Path(remotePath));
} catch (IOException e) {
throw new UserException("Failed to list located status for path: " + remotePath, e);
}
}
@Override
public Status list(String remotePath, List<RemoteFile> result) {
return list(remotePath, result, true);
}
// broker file pattern glob is too complex, so we use hadoop directly
@Override
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
try {
FileSystem s3AFileSystem = getFileSystem(remotePath);
org.apache.hadoop.fs.Path pathPattern = new org.apache.hadoop.fs.Path(remotePath);
Path pathPattern = new Path(remotePath);
FileStatus[] files = s3AFileSystem.globStatus(pathPattern);
if (files == null) {
return Status.OK;
@ -390,27 +87,4 @@ public class S3FileSystem extends ObjFileSystem {
}
return Status.OK;
}
@Override
public Status makeDir(String remotePath) {
if (!remotePath.endsWith("/")) {
remotePath += "/";
}
try {
S3URI uri = S3URI.create(remotePath, forceHostedStyle);
PutObjectResponse response =
getClient(uri.getVirtualBucket())
.putObject(
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
RequestBody.empty());
LOG.info("makeDir success: " + response.eTag());
return Status.OK;
} catch (S3Exception e) {
LOG.error("makeDir failed:", e);
return new Status(Status.ErrCode.COMMON_ERROR, "makeDir failed: " + e.getMessage());
} catch (UserException ue) {
LOG.error("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
}
}
}

View File

@ -15,24 +15,26 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.remote;
package org.apache.doris.fs.remote.dfs;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.RemoteFile;
import org.apache.doris.backup.Status;
import org.apache.doris.catalog.AuthType;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.URI;
import org.apache.doris.fs.obj.HdfsStorage;
import org.apache.doris.fs.operations.HDFSFileOperations;
import org.apache.doris.fs.operations.HDFSOpParams;
import org.apache.doris.fs.operations.OpParams;
import org.apache.doris.fs.remote.RemoteFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
@ -43,29 +45,32 @@ import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DFSFileSystem extends RemoteFileSystem {
private static final Logger LOG = LogManager.getLogger(HdfsStorage.class);
private static final Logger LOG = LogManager.getLogger(DFSFileSystem.class);
private HDFSFileOperations operations = null;
public DFSFileSystem(Map<String, String> properties) {
this.properties = new HashMap<>(properties);
this(StorageBackend.StorageType.HDFS, properties);
}
private FileSystem getHdfsClient(String remotePath)
throws UserException {
public DFSFileSystem(StorageBackend.StorageType type, Map<String, String> properties) {
super(type.name(), type);
this.properties.putAll(properties);
}
@Override
protected FileSystem getFileSystem(String remotePath) throws UserException {
if (dfsFileSystem != null) {
return dfsFileSystem;
}
@ -103,12 +108,12 @@ public class DFSFileSystem extends RemoteFileSystem {
public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) {
LOG.debug("download from {} to {}, file size: {}.", remoteFilePath, localFilePath, fileSize);
final long start = System.currentTimeMillis();
FSDataInputStream fsDataInputStream;
try {
fsDataInputStream = operations.openReader(remoteFilePath, 0);
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
HDFSOpParams hdfsOpParams = OpParams.of(remoteFilePath);
Status st = operations.openReader(hdfsOpParams);
if (st != Status.OK) {
return st;
}
FSDataInputStream fsDataInputStream = hdfsOpParams.fsDataInputStream();
LOG.info("finished to open reader. download {} to {}.", remoteFilePath, localFilePath);
// delete local file if exist
@ -132,9 +137,9 @@ public class DFSFileSystem extends RemoteFileSystem {
"failed to create local file: " + localFilePath + ", msg: " + e.getMessage());
}
String lastErrMsg = null;
String lastErrMsg;
Status status = Status.OK;
try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(localFile))) {
try (BufferedOutputStream out = new BufferedOutputStream(Files.newOutputStream(localFile.toPath()))) {
final long bufSize = 1024 * 1024; // 1MB
long leftSize = fileSize;
long readOffset = 0;
@ -165,7 +170,7 @@ public class DFSFileSystem extends RemoteFileSystem {
} catch (IOException e) {
return new Status(Status.ErrCode.COMMON_ERROR, "Got exception: " + e.getMessage());
} finally {
Status closeStatus = operations.closeReader(fsDataInputStream);
Status closeStatus = operations.closeReader(OpParams.of(fsDataInputStream));
if (!closeStatus.ok()) {
LOG.warn(closeStatus.getErrMsg());
if (status.ok()) {
@ -256,7 +261,7 @@ public class DFSFileSystem extends RemoteFileSystem {
try {
URI pathUri = URI.create(remotePath);
Path inputFilePath = new Path(pathUri.getPath());
FileSystem fileSystem = getHdfsClient(remotePath);
FileSystem fileSystem = getFileSystem(remotePath);
boolean isPathExist = fileSystem.exists(inputFilePath);
if (!isPathExist) {
return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath);
@ -271,12 +276,12 @@ public class DFSFileSystem extends RemoteFileSystem {
@Override
public Status directUpload(String content, String remoteFile) {
FSDataOutputStream fsDataOutputStream = null;
try {
fsDataOutputStream = operations.openWriter(remoteFile);
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
HDFSOpParams hdfsOpParams = OpParams.of(remoteFile);
Status wst = operations.openWriter(hdfsOpParams);
if (wst != Status.OK) {
return wst;
}
FSDataOutputStream fsDataOutputStream = hdfsOpParams.fsDataOutputStream();
LOG.info("finished to open writer. directly upload to remote path {}.", remoteFile);
Status status = Status.OK;
@ -286,7 +291,7 @@ public class DFSFileSystem extends RemoteFileSystem {
LOG.error("errors while write data to output stream", e);
status = new Status(Status.ErrCode.COMMON_ERROR, "write exception: " + e.getMessage());
} finally {
Status closeStatus = operations.closeWriter(fsDataOutputStream);
Status closeStatus = operations.closeWriter(OpParams.of(fsDataOutputStream));
if (!closeStatus.ok()) {
LOG.warn(closeStatus.getErrMsg());
if (status.ok()) {
@ -301,13 +306,12 @@ public class DFSFileSystem extends RemoteFileSystem {
public Status upload(String localPath, String remotePath) {
long start = System.currentTimeMillis();
LOG.debug("local path {}, remote path {}", localPath, remotePath);
FSDataOutputStream fsDataOutputStream = null;
try {
fsDataOutputStream = operations.openWriter(remotePath);
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
HDFSOpParams hdfsOpParams = OpParams.of(remotePath);
Status wst = operations.openWriter(hdfsOpParams);
if (wst != Status.OK) {
return wst;
}
FSDataOutputStream fsDataOutputStream = hdfsOpParams.fsDataOutputStream();
LOG.info("finished to open writer. directly upload to remote path {}.", remotePath);
// read local file and write remote
File localFile = new File(localPath);
@ -342,7 +346,7 @@ public class DFSFileSystem extends RemoteFileSystem {
} catch (IOException e1) {
return new Status(Status.ErrCode.COMMON_ERROR, "encounter io exception: " + e1.getMessage());
} finally {
Status closeStatus = operations.closeWriter(fsDataOutputStream);
Status closeStatus = operations.closeWriter(OpParams.of(fsDataOutputStream));
if (!closeStatus.ok()) {
LOG.warn(closeStatus.getErrMsg());
if (status.ok()) {
@ -368,7 +372,7 @@ public class DFSFileSystem extends RemoteFileSystem {
if (!srcPathUri.getAuthority().trim().equals(destPathUri.getAuthority().trim())) {
return new Status(Status.ErrCode.COMMON_ERROR, "only allow rename in same file system");
}
FileSystem fileSystem = getHdfsClient(destPath);
FileSystem fileSystem = getFileSystem(destPath);
Path srcfilePath = new Path(srcPathUri.getPath());
Path destfilePath = new Path(destPathUri.getPath());
boolean isRenameSuccess = fileSystem.rename(srcfilePath, destfilePath);
@ -391,7 +395,7 @@ public class DFSFileSystem extends RemoteFileSystem {
try {
URI pathUri = URI.create(remotePath);
Path inputFilePath = new Path(pathUri.getPath());
FileSystem fileSystem = getHdfsClient(remotePath);
FileSystem fileSystem = getFileSystem(remotePath);
fileSystem.delete(inputFilePath, true);
} catch (UserException e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
@ -404,21 +408,6 @@ public class DFSFileSystem extends RemoteFileSystem {
return Status.OK;
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(String remotePath) throws UserException {
FileSystem fileSystem = getHdfsClient(remotePath);
try {
return fileSystem.listLocatedStatus(new Path(remotePath));
} catch (IOException e) {
throw new UserException("Failed to list located status for path: " + remotePath, e);
}
}
@Override
public Status list(String remotePath, List<RemoteFile> result) {
return list(remotePath, result, true);
}
/**
* get files in remotePath of HDFS.
*
@ -431,7 +420,7 @@ public class DFSFileSystem extends RemoteFileSystem {
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
try {
URI pathUri = URI.create(remotePath);
FileSystem fileSystem = getHdfsClient(remotePath);
FileSystem fileSystem = getFileSystem(remotePath);
Path pathPattern = new Path(pathUri.getPath());
FileStatus[] files = fileSystem.globStatus(pathPattern);
if (files == null) {

View File

@ -0,0 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.remote.dfs;
import org.apache.doris.analysis.StorageBackend;
import java.util.Map;
public class JFSFileSystem extends DFSFileSystem {
public JFSFileSystem(Map<String, String> properties) {
super(StorageBackend.StorageType.JFS, properties);
}
}

View File

@ -0,0 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.remote.dfs;
import org.apache.doris.analysis.StorageBackend;
import java.util.Map;
public class OFSFileSystem extends DFSFileSystem {
public OFSFileSystem(Map<String, String> properties) {
super(StorageBackend.StorageType.OFS, properties);
}
}

View File

@ -27,10 +27,9 @@ import org.apache.doris.common.util.S3URI;
import org.apache.doris.datasource.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.obj.BlobStorage;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.thrift.TFileType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import java.util.HashMap;
@ -89,17 +88,12 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle);
if (FeConstants.runningUnitTest) {
// Just check
BlobStorage.create(null, StorageBackend.StorageType.S3, locationProperties);
FileSystemFactory.get(StorageBackend.StorageType.S3, locationProperties);
} else {
parseFile();
}
}
@VisibleForTesting
public static Map<String, String> getParams(Map<String, String> params) throws AnalysisException {
return getValidParams(params);
}
private static Map<String, String> getValidParams(Map<String, String> params) throws AnalysisException {
if (!params.containsKey(S3_URI)) {
throw new AnalysisException("Missing required property: " + S3_URI);

View File

@ -15,10 +15,14 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.backup;
package org.apache.doris.fs.obj;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.RemoteFile;
import org.apache.doris.backup.Status;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.fs.obj.S3Storage;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.remote.S3FileSystem;
import org.apache.commons.codec.digest.DigestUtils;
import org.junit.Assert;
@ -40,11 +44,11 @@ import java.util.Random;
import java.util.UUID;
@Ignore
public class S3StorageTest {
public class S3FileSystemTest {
private static String basePath;
private final String bucket = "s3://doris-test/";
private Map<String, String> properties;
private S3Storage storage;
private S3FileSystem fileSystem;
private String testFile;
private String content;
@ -62,7 +66,7 @@ public class S3StorageTest {
properties.put(PropertyConverter.USE_PATH_STYLE, "false");
properties.put("AWS_REGION", "bj");
storage = new S3Storage(properties);
fileSystem = (S3FileSystem) FileSystemFactory.get(StorageBackend.StorageType.S3, properties);
testFile = bucket + basePath + "/Ode_to_the_West_Wind";
content =
@ -80,17 +84,17 @@ public class S3StorageTest {
+ "With living hues and odors plain and hill:\n"
+ "Wild Spirit, which art moving everywhere;\n"
+ "Destroyer and preserver; hear, oh, hear!";
Assert.assertEquals(Status.OK, storage.directUpload(content, testFile));
Assert.assertEquals(Status.OK, fileSystem.directUpload(content, testFile));
}
@Test
public void downloadWithFileSize() throws IOException {
File localFile = File.createTempFile("s3unittest", ".dat");
localFile.deleteOnExit();
Status status = storage.downloadWithFileSize(testFile, localFile.getAbsolutePath(), content.getBytes().length);
Status status = fileSystem.downloadWithFileSize(testFile, localFile.getAbsolutePath(), content.getBytes().length);
Assert.assertEquals(Status.OK, status);
Assert.assertEquals(DigestUtils.md5Hex(content.getBytes()), DigestUtils.md5Hex(new FileInputStream(localFile)));
status = storage.downloadWithFileSize(bucket + basePath + "/Ode_to_the_West_Wind", localFile.getAbsolutePath(), content.getBytes().length + 1);
status = fileSystem.downloadWithFileSize(bucket + basePath + "/Ode_to_the_West_Wind", localFile.getAbsolutePath(), content.getBytes().length + 1);
Assert.assertNotEquals(Status.OK, status);
}
@ -105,11 +109,11 @@ public class S3StorageTest {
os.write(buf);
os.close();
String remote = bucket + basePath + "/" + localFile.getName();
Status status = storage.upload(localFile.getAbsolutePath(), remote);
Status status = fileSystem.upload(localFile.getAbsolutePath(), remote);
Assert.assertEquals(Status.OK, status);
File localFile2 = File.createTempFile("s3unittest", ".dat");
localFile2.deleteOnExit();
status = storage.downloadWithFileSize(remote, localFile2.getAbsolutePath(), 1024 * 1024);
status = fileSystem.downloadWithFileSize(remote, localFile2.getAbsolutePath(), 1024 * 1024);
Assert.assertEquals(Status.OK, status);
Assert.assertEquals(DigestUtils.md5Hex(new FileInputStream(localFile)),
DigestUtils.md5Hex(new FileInputStream(localFile2)));
@ -117,54 +121,54 @@ public class S3StorageTest {
@Test
public void copy() {
Assert.assertEquals(Status.OK, storage.copy(testFile, testFile + ".bak"));
Assert.assertEquals(Status.OK, storage.checkPathExist(testFile + ".bak"));
Assert.assertNotEquals(Status.OK, storage.copy(testFile + ".bakxxx", testFile + ".bak"));
Assert.assertEquals(Status.OK, fileSystem.copy(testFile, testFile + ".bak"));
Assert.assertEquals(Status.OK, fileSystem.exists(testFile + ".bak"));
Assert.assertNotEquals(Status.OK, fileSystem.copy(testFile + ".bakxxx", testFile + ".bak"));
}
@Test
public void rename() {
Assert.assertEquals(Status.OK, storage.directUpload(content, testFile + ".bak"));
storage.rename(testFile + ".bak", testFile + ".bak1");
Assert.assertEquals(Status.ErrCode.NOT_FOUND, storage.checkPathExist(testFile + ".bak").getErrCode());
Assert.assertEquals(Status.OK, storage.checkPathExist(testFile + ".bak1"));
Assert.assertEquals(Status.OK, fileSystem.directUpload(content, testFile + ".bak"));
fileSystem.rename(testFile + ".bak", testFile + ".bak1");
Assert.assertEquals(Status.ErrCode.NOT_FOUND, fileSystem.exists(testFile + ".bak").getErrCode());
Assert.assertEquals(Status.OK, fileSystem.exists(testFile + ".bak1"));
}
@Test
public void delete() {
String deleteFile = testFile + ".to_be_delete";
Assert.assertEquals(Status.OK, storage.directUpload(content, deleteFile));
Assert.assertEquals(Status.OK, storage.delete(deleteFile));
Assert.assertEquals(Status.ErrCode.NOT_FOUND, storage.checkPathExist(deleteFile).getErrCode());
Assert.assertEquals(Status.OK, storage.delete(deleteFile + "xxxx"));
Assert.assertEquals(Status.OK, fileSystem.directUpload(content, deleteFile));
Assert.assertEquals(Status.OK, fileSystem.delete(deleteFile));
Assert.assertEquals(Status.ErrCode.NOT_FOUND, fileSystem.exists(deleteFile).getErrCode());
Assert.assertEquals(Status.OK, fileSystem.delete(deleteFile + "xxxx"));
}
@Test
public void list() {
List<RemoteFile> result = new ArrayList<>();
String listPath = bucket + basePath + "_list" + "/Ode_to_the_West_Wind";
Assert.assertEquals(Status.OK, storage.directUpload(content, listPath + ".1"));
Assert.assertEquals(Status.OK, storage.directUpload(content, listPath + ".2"));
Assert.assertEquals(Status.OK, storage.directUpload(content, listPath + ".3"));
Assert.assertEquals(Status.OK, storage.list(bucket + basePath + "_list/*", result));
Assert.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".1"));
Assert.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".2"));
Assert.assertEquals(Status.OK, fileSystem.directUpload(content, listPath + ".3"));
Assert.assertEquals(Status.OK, fileSystem.list(bucket + basePath + "_list/*", result));
Assert.assertEquals(3, result.size());
}
@Test
public void makeDir() {
String path = bucket + basePath + "/test_path";
Assert.assertEquals(Status.OK, storage.makeDir(path));
Assert.assertNotEquals(Status.OK, storage.checkPathExist(path));
Assert.assertEquals(Status.OK, fileSystem.makeDir(path));
Assert.assertNotEquals(Status.OK, fileSystem.exists(path));
String path1 = bucket + basePath + "/test_path1/";
Assert.assertEquals(Status.OK, storage.makeDir(path1));
Assert.assertEquals(Status.OK, storage.checkPathExist(path1));
Assert.assertEquals(Status.OK, fileSystem.makeDir(path1));
Assert.assertEquals(Status.OK, fileSystem.exists(path1));
}
@Test
public void checkPathExist() {
Status status = storage.checkPathExist(testFile);
Status status = fileSystem.exists(testFile);
Assert.assertEquals(Status.OK, status);
status = storage.checkPathExist(testFile + ".NOT_EXIST");
status = fileSystem.exists(testFile + ".NOT_EXIST");
Assert.assertEquals(Status.ErrCode.NOT_FOUND, status.getErrCode());
}
}