[refactor](fs)(step1) add new storage file system (#18938)

PR1: add new storage file system template and move old storage to new package
PR2: extract some method in old storage to new file system.
PR3: use storages to access remote object storage, and use file systems to access file in local or remote location. Will add some unit tests.

---------

Co-authored-by: jinzhe <jinzhe@selectdb.com>
This commit is contained in:
slothever
2023-04-24 11:41:48 +08:00
committed by GitHub
parent 1f9450e0f7
commit 22cdfc5970
26 changed files with 1957 additions and 45 deletions

View File

@ -18,12 +18,12 @@
package org.apache.doris.analysis;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.backup.BlobStorage;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.datasource.property.constants.BosProperties;
import org.apache.doris.fs.obj.BlobStorage;
import org.apache.doris.thrift.TFileType;
import com.google.common.collect.Maps;

View File

@ -17,7 +17,6 @@
package org.apache.doris.analysis;
import org.apache.doris.backup.HdfsStorage;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
@ -47,6 +46,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -667,13 +667,19 @@ public class OutFileClause {
S3Properties.requiredS3Properties(brokerProps);
} else if (storageType == StorageBackend.StorageType.HDFS) {
if (!brokerProps.containsKey(HdfsResource.HADOOP_FS_NAME)) {
brokerProps.put(HdfsResource.HADOOP_FS_NAME, HdfsStorage.getFsName(filePath));
brokerProps.put(HdfsResource.HADOOP_FS_NAME, getFsName(filePath));
}
}
brokerDesc = new BrokerDesc(brokerName, storageType, brokerProps);
}
public static String getFsName(String path) {
Path hdfsPath = new Path(path);
String fullPath = hdfsPath.toUri().toString();
String filePath = hdfsPath.toUri().getPath();
return fullPath.replace(filePath, "");
}
void setParquetCompressionType(String propertyValue) {
if (PARQUET_COMPRESSION_TYPE_MAP.containsKey(propertyValue)) {
this.parquetCompressionType = PARQUET_COMPRESSION_TYPE_MAP.get(propertyValue);

View File

@ -46,6 +46,7 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.fs.obj.BlobStorage;
import org.apache.doris.task.DirMoveTask;
import org.apache.doris.task.DownloadTask;
import org.apache.doris.task.SnapshotTask;

View File

@ -28,6 +28,10 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.fs.obj.BlobStorage;
import org.apache.doris.fs.obj.BrokerStorage;
import org.apache.doris.fs.obj.HdfsStorage;
import org.apache.doris.fs.obj.S3Storage;
import org.apache.doris.system.Backend;
import com.google.common.base.Joiner;

View File

@ -31,12 +31,12 @@ import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.backup.BlobStorage;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.fs.obj.BlobStorage;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExprOpcode;

View File

@ -17,7 +17,6 @@
package org.apache.doris.catalog;
import org.apache.doris.backup.S3Storage;
import org.apache.doris.backup.Status;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.proc.BaseProcResult;
@ -25,6 +24,7 @@ import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.obj.S3Storage;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;

View File

@ -18,7 +18,6 @@
package org.apache.doris.common.util;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.backup.BlobStorage;
import org.apache.doris.backup.RemoteFile;
import org.apache.doris.backup.Status;
import org.apache.doris.catalog.Env;
@ -30,6 +29,7 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.fs.obj.BlobStorage;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TBrokerCheckPathExistRequest;
import org.apache.doris.thrift.TBrokerCheckPathExistResponse;

View File

@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs;
import org.apache.doris.backup.RemoteFile;
import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import java.util.List;
public interface FileSystem {
Status exists(String remotePath);
Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize);
Status upload(String localPath, String remotePath);
Status directUpload(String content, String remoteFile);
Status rename(String origFilePath, String destFilePath);
Status delete(String remotePath);
default RemoteIterator<LocatedFileStatus> listLocatedStatus(String remotePath) throws UserException {
throw new UserException("Not support to listLocatedStatus.");
}
// List files in remotePath
// The remote file name will only contains file name only(Not full path)
Status list(String remotePath, List<RemoteFile> result);
Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly);
Status makeDir(String remotePath);
}

View File

@ -0,0 +1,21 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs;
public class LocalFileSystem {
}

View File

@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.backup;
package org.apache.doris.fs.obj;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.RemoteFile;
import org.apache.doris.backup.Status;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;

View File

@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.backup;
package org.apache.doris.fs.obj;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.RemoteFile;
import org.apache.doris.backup.Status;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;

View File

@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.backup;
package org.apache.doris.fs.obj;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.RemoteFile;
import org.apache.doris.backup.Status;
import org.apache.doris.catalog.AuthType;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.common.UserException;
@ -77,43 +79,36 @@ public class HdfsStorage extends BlobStorage {
setName(StorageBackend.StorageType.HDFS.name());
}
public static String getFsName(String path) {
Path hdfsPath = new Path(path);
String fullPath = hdfsPath.toUri().toString();
String filePath = hdfsPath.toUri().getPath();
return fullPath.replace(filePath, "");
}
@Override
public FileSystem getFileSystem(String remotePath) throws UserException {
if (dfsFileSystem == null) {
String username = hdfsProperties.get(HdfsResource.HADOOP_USER_NAME);
Configuration conf = new HdfsConfiguration();
boolean isSecurityEnabled = false;
for (Map.Entry<String, String> propEntry : hdfsProperties.entrySet()) {
conf.set(propEntry.getKey(), propEntry.getValue());
if (propEntry.getKey().equals(HdfsResource.HADOOP_SECURITY_AUTHENTICATION)
&& propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) {
isSecurityEnabled = true;
}
if (dfsFileSystem != null) {
return dfsFileSystem;
}
String username = hdfsProperties.get(HdfsResource.HADOOP_USER_NAME);
Configuration conf = new HdfsConfiguration();
boolean isSecurityEnabled = false;
for (Map.Entry<String, String> propEntry : hdfsProperties.entrySet()) {
conf.set(propEntry.getKey(), propEntry.getValue());
if (propEntry.getKey().equals(HdfsResource.HADOOP_SECURITY_AUTHENTICATION)
&& propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) {
isSecurityEnabled = true;
}
try {
if (isSecurityEnabled) {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(
hdfsProperties.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL),
hdfsProperties.get(HdfsResource.HADOOP_KERBEROS_KEYTAB));
}
if (username == null) {
dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf);
} else {
dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf, username);
}
} catch (Exception e) {
LOG.error("errors while connect to " + remotePath, e);
throw new UserException("errors while connect to " + remotePath, e);
}
try {
if (isSecurityEnabled) {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(
hdfsProperties.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL),
hdfsProperties.get(HdfsResource.HADOOP_KERBEROS_KEYTAB));
}
if (username == null) {
dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf);
} else {
dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf, username);
}
} catch (Exception e) {
LOG.error("errors while connect to " + remotePath, e);
throw new UserException("errors while connect to " + remotePath, e);
}
return dfsFileSystem;
}

View File

@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.backup;
package org.apache.doris.fs.obj;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.RemoteFile;
import org.apache.doris.backup.Status;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3URI;

View File

@ -0,0 +1,142 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.remote;
import org.apache.doris.backup.Status;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.thrift.TBrokerCloseReaderRequest;
import org.apache.doris.thrift.TBrokerCloseWriterRequest;
import org.apache.doris.thrift.TBrokerFD;
import org.apache.doris.thrift.TBrokerOpenMode;
import org.apache.doris.thrift.TBrokerOpenReaderRequest;
import org.apache.doris.thrift.TBrokerOpenReaderResponse;
import org.apache.doris.thrift.TBrokerOpenWriterRequest;
import org.apache.doris.thrift.TBrokerOpenWriterResponse;
import org.apache.doris.thrift.TBrokerOperationStatus;
import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.doris.thrift.TBrokerVersion;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.util.Map;
public class BrokerFileOperations {
private static final Logger LOG = LogManager.getLogger(BrokerFileOperations.class);
private String name;
private Map<String, String> properties;
public BrokerFileOperations(String name, Map<String, String> properties) {
this.name = name;
this.properties = properties;
}
public Status openReader(TPaloBrokerService.Client client, TNetworkAddress address, String remoteFilePath,
TBrokerFD fd) {
try {
TBrokerOpenReaderRequest req = new TBrokerOpenReaderRequest(TBrokerVersion.VERSION_ONE, remoteFilePath,
0, BrokerFileSystem.clientId(), properties);
TBrokerOpenReaderResponse rep = client.openReader(req);
TBrokerOperationStatus opst = rep.getOpStatus();
if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to open reader on broker " + BrokerUtil.printBroker(name, address)
+ " for file: " + remoteFilePath + ". msg: " + opst.getMessage());
}
fd.setHigh(rep.getFd().getHigh());
fd.setLow(rep.getFd().getLow());
} catch (TException e) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to open reader on broker " + BrokerUtil.printBroker(name, address)
+ " for file: " + remoteFilePath + ". msg: " + e.getMessage());
}
return Status.OK;
}
public Status closeReader(TPaloBrokerService.Client client, TNetworkAddress address, TBrokerFD fd) {
try {
TBrokerCloseReaderRequest req = new TBrokerCloseReaderRequest(TBrokerVersion.VERSION_ONE, fd);
TBrokerOperationStatus st = client.closeReader(req);
if (st.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to close reader on broker " + BrokerUtil.printBroker(name, address)
+ " for fd: " + fd);
}
LOG.info("finished to close reader. fd: {}.", fd);
} catch (TException e) {
return new Status(Status.ErrCode.BAD_CONNECTION,
"failed to close reader on broker " + BrokerUtil.printBroker(name, address)
+ ", fd " + fd + ", msg: " + e.getMessage());
}
return Status.OK;
}
public Status openWriter(TPaloBrokerService.Client client, TNetworkAddress address, String remoteFile,
TBrokerFD fd) {
try {
TBrokerOpenWriterRequest req = new TBrokerOpenWriterRequest(TBrokerVersion.VERSION_ONE,
remoteFile, TBrokerOpenMode.APPEND, BrokerFileSystem.clientId(), properties);
TBrokerOpenWriterResponse rep = client.openWriter(req);
TBrokerOperationStatus opst = rep.getOpStatus();
if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to open writer on broker " + BrokerUtil.printBroker(name, address)
+ " for file: " + remoteFile + ". msg: " + opst.getMessage());
}
fd.setHigh(rep.getFd().getHigh());
fd.setLow(rep.getFd().getLow());
LOG.info("finished to open writer. fd: {}. directly upload to remote path {}.", fd, remoteFile);
} catch (TException e) {
return new Status(Status.ErrCode.BAD_CONNECTION,
"failed to open writer on broker " + BrokerUtil.printBroker(name, address)
+ ", err: " + e.getMessage());
}
return Status.OK;
}
public Status closeWriter(TPaloBrokerService.Client client, TNetworkAddress address, TBrokerFD fd) {
try {
TBrokerCloseWriterRequest req = new TBrokerCloseWriterRequest(TBrokerVersion.VERSION_ONE, fd);
TBrokerOperationStatus st = client.closeWriter(req);
if (st.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to close writer on broker " + BrokerUtil.printBroker(name, address)
+ " for fd: " + fd);
}
LOG.info("finished to close writer. fd: {}.", fd);
} catch (TException e) {
return new Status(Status.ErrCode.BAD_CONNECTION,
"failed to close writer on broker " + BrokerUtil.printBroker(name, address)
+ ", fd " + fd + ", msg: " + e.getMessage());
}
return Status.OK;
}
}

View File

@ -0,0 +1,626 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.remote;
import org.apache.doris.backup.RemoteFile;
import org.apache.doris.backup.Status;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TBrokerCheckPathExistRequest;
import org.apache.doris.thrift.TBrokerCheckPathExistResponse;
import org.apache.doris.thrift.TBrokerDeletePathRequest;
import org.apache.doris.thrift.TBrokerFD;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TBrokerListPathRequest;
import org.apache.doris.thrift.TBrokerListResponse;
import org.apache.doris.thrift.TBrokerOperationStatus;
import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.doris.thrift.TBrokerPReadRequest;
import org.apache.doris.thrift.TBrokerPWriteRequest;
import org.apache.doris.thrift.TBrokerReadResponse;
import org.apache.doris.thrift.TBrokerRenamePathRequest;
import org.apache.doris.thrift.TBrokerVersion;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
public class BrokerFileSystem extends RemoteFileSystem {
private static final Logger LOG = LogManager.getLogger(BrokerFileSystem.class);
private String name;
private BrokerFileOperations operations;
public BrokerFileSystem(String name, Map<String, String> properties) {
this.name = name;
this.properties = properties;
this.operations = new BrokerFileOperations(name, properties);
}
public static String clientId() {
return FrontendOptions.getLocalHostAddress() + ":" + Config.edit_log_port;
}
public Pair<TPaloBrokerService.Client, TNetworkAddress> getBroker() {
Pair<TPaloBrokerService.Client, TNetworkAddress> result = Pair.of(null, null);
FsBroker broker;
try {
String localIP = FrontendOptions.getLocalHostAddress();
broker = Env.getCurrentEnv().getBrokerMgr().getBroker(name, localIP);
} catch (AnalysisException e) {
LOG.warn("failed to get a broker address: " + e.getMessage());
return null;
}
TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port);
TPaloBrokerService.Client client;
try {
client = ClientPool.brokerPool.borrowObject(address);
} catch (Exception e) {
LOG.warn("failed to get broker client: " + e.getMessage());
return null;
}
result.first = client;
result.second = address;
LOG.info("get broker: {}", BrokerUtil.printBroker(name, address));
return result;
}
@Override
public Status exists(String remotePath) {
// 1. get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
// check path
boolean needReturn = true;
try {
TBrokerCheckPathExistRequest req = new TBrokerCheckPathExistRequest(TBrokerVersion.VERSION_ONE,
remotePath, properties);
TBrokerCheckPathExistResponse rep = client.checkPathExist(req);
TBrokerOperationStatus opst = rep.getOpStatus();
if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to check remote path exist: " + remotePath
+ ", broker: " + BrokerUtil.printBroker(name, address)
+ ". msg: " + opst.getMessage());
}
if (!rep.isIsPathExist()) {
return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath);
}
return Status.OK;
} catch (TException e) {
needReturn = false;
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to check remote path exist: " + remotePath
+ ", broker: " + BrokerUtil.printBroker(name, address)
+ ". msg: " + e.getMessage());
} finally {
if (needReturn) {
ClientPool.brokerPool.returnObject(address, client);
} else {
ClientPool.brokerPool.invalidateObject(address, client);
}
}
}
@Override
public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) {
LOG.debug("download from {} to {}, file size: {}.", remoteFilePath, localFilePath, fileSize);
long start = System.currentTimeMillis();
// 1. get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
// 2. open file reader with broker
TBrokerFD fd = new TBrokerFD();
Status opStatus = operations.openReader(client, address, remoteFilePath, fd);
if (!opStatus.ok()) {
return opStatus;
}
LOG.info("finished to open reader. fd: {}. download {} to {}.",
fd, remoteFilePath, localFilePath);
Preconditions.checkNotNull(fd);
// 3. delete local file if exist
File localFile = new File(localFilePath);
if (localFile.exists()) {
try {
Files.walk(Paths.get(localFilePath), FileVisitOption.FOLLOW_LINKS)
.sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
} catch (IOException e) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to delete exist local file: " + localFilePath);
}
}
// 4. create local file
Status status = Status.OK;
try {
if (!localFile.createNewFile()) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to create local file: " + localFilePath);
}
} catch (IOException e) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to create local file: "
+ localFilePath + ", msg: " + e.getMessage());
}
// 5. read remote file with broker and write to local
String lastErrMsg = null;
try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(localFile))) {
final long bufSize = 1024 * 1024; // 1MB
long leftSize = fileSize;
long readOffset = 0;
while (leftSize > 0) {
long readLen = Math.min(leftSize, bufSize);
TBrokerReadResponse rep = null;
// We only retry if we encounter a timeout thrift exception.
int tryTimes = 0;
while (tryTimes < 3) {
try {
TBrokerPReadRequest req = new TBrokerPReadRequest(TBrokerVersion.VERSION_ONE,
fd, readOffset, readLen);
rep = client.pread(req);
if (rep.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
// pread return failure.
lastErrMsg = String.format("failed to read via broker %s. "
+ "current read offset: %d, read length: %d,"
+ " file size: %d, file: %s, err code: %d, msg: %s",
BrokerUtil.printBroker(name, address),
readOffset, readLen, fileSize,
remoteFilePath, rep.getOpStatus().getStatusCode().getValue(),
rep.getOpStatus().getMessage());
LOG.warn(lastErrMsg);
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
}
if (rep.opStatus.statusCode != TBrokerOperationStatusCode.END_OF_FILE) {
LOG.debug("download. readLen: {}, read data len: {}, left size:{}. total size: {}",
readLen, rep.getData().length, leftSize, fileSize);
} else {
LOG.debug("read eof: " + remoteFilePath);
}
break;
} catch (TTransportException e) {
if (e.getType() == TTransportException.TIMED_OUT) {
// we only retry when we encounter timeout exception.
lastErrMsg = String.format("failed to read via broker %s. "
+ "current read offset: %d, read length: %d,"
+ " file size: %d, file: %s, timeout.",
BrokerUtil.printBroker(name, address),
readOffset, readLen, fileSize,
remoteFilePath);
tryTimes++;
continue;
}
lastErrMsg = String.format("failed to read via broker %s. "
+ "current read offset: %d, read length: %d,"
+ " file size: %d, file: %s. msg: %s",
BrokerUtil.printBroker(name, address),
readOffset, readLen, fileSize,
remoteFilePath, e.getMessage());
LOG.warn(lastErrMsg);
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
break;
} catch (TException e) {
lastErrMsg = String.format("failed to read via broker %s. "
+ "current read offset: %d, read length: %d,"
+ " file size: %d, file: %s. msg: %s",
BrokerUtil.printBroker(name, address),
readOffset, readLen, fileSize,
remoteFilePath, e.getMessage());
LOG.warn(lastErrMsg);
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
break;
}
} // end of retry loop
if (status.ok() && tryTimes < 3) {
// read succeed, write to local file
Preconditions.checkNotNull(rep);
// NOTICE(cmy): Sometimes the actual read length does not equal to the expected read length,
// even if the broker's read buffer size is large enough.
// I don't know why, but have to adapt to it.
if (rep.getData().length != readLen) {
LOG.warn("the actual read length does not equal to "
+ "the expected read length: {} vs. {}, file: {}, broker: {}",
rep.getData().length, readLen, remoteFilePath,
BrokerUtil.printBroker(name, address));
}
out.write(rep.getData());
readOffset += rep.getData().length;
leftSize -= rep.getData().length;
} else {
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
break;
}
} // end of reading remote file
} catch (IOException e) {
return new Status(Status.ErrCode.COMMON_ERROR, "Got exception: " + e.getMessage() + ", broker: "
+ BrokerUtil.printBroker(name, address));
} finally {
// close broker reader
Status closeStatus = operations.closeReader(client, address, fd);
if (!closeStatus.ok()) {
LOG.warn(closeStatus.getErrMsg());
if (status.ok()) {
// we return close write error only if no other error has been encountered.
status = closeStatus;
}
ClientPool.brokerPool.invalidateObject(address, client);
} else {
ClientPool.brokerPool.returnObject(address, client);
}
}
LOG.info("finished to download from {} to {} with size: {}. cost {} ms",
remoteFilePath, localFilePath, fileSize, (System.currentTimeMillis() - start));
return status;
}
// directly upload the content to remote file
@Override
public Status directUpload(String content, String remoteFile) {
// 1. get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
TBrokerFD fd = new TBrokerFD();
Status status = Status.OK;
try {
// 2. open file writer with broker
status = operations.openWriter(client, address, remoteFile, fd);
if (!status.ok()) {
return status;
}
// 3. write content
try {
ByteBuffer bb = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8));
TBrokerPWriteRequest req = new TBrokerPWriteRequest(TBrokerVersion.VERSION_ONE, fd, 0, bb);
TBrokerOperationStatus opst = client.pwrite(req);
if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) {
// pwrite return failure.
status = new Status(Status.ErrCode.COMMON_ERROR, "write failed: " + opst.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
}
} catch (TException e) {
status = new Status(Status.ErrCode.BAD_CONNECTION, "write exception: " + e.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
}
} finally {
Status closeStatus = operations.closeWriter(client, address, fd);
if (closeStatus.getErrCode() == Status.ErrCode.BAD_CONNECTION
|| status.getErrCode() == Status.ErrCode.BAD_CONNECTION) {
ClientPool.brokerPool.invalidateObject(address, client);
} else {
ClientPool.brokerPool.returnObject(address, client);
}
}
return status;
}
@Override
public Status upload(String localPath, String remotePath) {
long start = System.currentTimeMillis();
// 1. get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
// 2. open file write with broker
TBrokerFD fd = new TBrokerFD();
Status status = operations.openWriter(client, address, remotePath, fd);
if (!status.ok()) {
return status;
}
// 3. read local file and write to remote with broker
File localFile = new File(localPath);
long fileLength = localFile.length();
byte[] readBuf = new byte[1024];
try (BufferedInputStream in = new BufferedInputStream(new FileInputStream(localFile))) {
// save the last err msg
String lastErrMsg = null;
// save the current write offset of remote file
long writeOffset = 0;
// read local file, 1MB at a time
int bytesRead;
while ((bytesRead = in.read(readBuf)) != -1) {
ByteBuffer bb = ByteBuffer.wrap(readBuf, 0, bytesRead);
// We only retry if we encounter a timeout thrift exception.
int tryTimes = 0;
while (tryTimes < 3) {
try {
TBrokerPWriteRequest req
= new TBrokerPWriteRequest(TBrokerVersion.VERSION_ONE, fd, writeOffset, bb);
TBrokerOperationStatus opst = client.pwrite(req);
if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) {
// pwrite return failure.
lastErrMsg = String.format("failed to write via broker %s. "
+ "current write offset: %d, write length: %d,"
+ " file length: %d, file: %s, err code: %d, msg: %s",
BrokerUtil.printBroker(name, address),
writeOffset, bytesRead, fileLength,
remotePath, opst.getStatusCode().getValue(), opst.getMessage());
LOG.warn(lastErrMsg);
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
}
break;
} catch (TTransportException e) {
if (e.getType() == TTransportException.TIMED_OUT) {
// we only retry when we encounter timeout exception.
lastErrMsg = String.format("failed to write via broker %s. "
+ "current write offset: %d, write length: %d,"
+ " file length: %d, file: %s. timeout",
BrokerUtil.printBroker(name, address),
writeOffset, bytesRead, fileLength,
remotePath);
tryTimes++;
continue;
}
lastErrMsg = String.format("failed to write via broker %s. "
+ "current write offset: %d, write length: %d,"
+ " file length: %d, file: %s. encounter TTransportException: %s",
BrokerUtil.printBroker(name, address),
writeOffset, bytesRead, fileLength,
remotePath, e.getMessage());
LOG.warn(lastErrMsg, e);
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
break;
} catch (TException e) {
lastErrMsg = String.format("failed to write via broker %s. "
+ "current write offset: %d, write length: %d,"
+ " file length: %d, file: %s. encounter TException: %s",
BrokerUtil.printBroker(name, address),
writeOffset, bytesRead, fileLength,
remotePath, e.getMessage());
LOG.warn(lastErrMsg, e);
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
break;
}
}
if (status.ok() && tryTimes < 3) {
// write succeed, update current write offset
writeOffset += bytesRead;
} else {
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
break;
}
} // end of read local file loop
} catch (FileNotFoundException e1) {
return new Status(Status.ErrCode.COMMON_ERROR, "encounter file not found exception: " + e1.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
} catch (IOException e1) {
return new Status(Status.ErrCode.COMMON_ERROR, "encounter io exception: " + e1.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
} finally {
// close write
Status closeStatus = operations.closeWriter(client, address, fd);
if (!closeStatus.ok()) {
LOG.warn(closeStatus.getErrMsg());
if (status.ok()) {
// we return close write error only if no other error has been encountered.
status = closeStatus;
}
ClientPool.brokerPool.invalidateObject(address, client);
} else {
ClientPool.brokerPool.returnObject(address, client);
}
}
if (status.ok()) {
LOG.info("finished to upload {} to remote path {}. cost: {} ms",
localPath, remotePath, (System.currentTimeMillis() - start));
}
return status;
}
@Override
public Status rename(String origFilePath, String destFilePath) {
long start = System.currentTimeMillis();
// 1. get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
// 2. rename
boolean needReturn = true;
try {
TBrokerRenamePathRequest req = new TBrokerRenamePathRequest(TBrokerVersion.VERSION_ONE,
origFilePath, destFilePath, properties);
TBrokerOperationStatus ost = client.renamePath(req);
if (ost.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to rename " + origFilePath + " to " + destFilePath + ", msg: " + ost.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
}
} catch (TException e) {
needReturn = false;
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to rename " + origFilePath + " to " + destFilePath + ", msg: " + e.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
} finally {
if (needReturn) {
ClientPool.brokerPool.returnObject(address, client);
} else {
ClientPool.brokerPool.invalidateObject(address, client);
}
}
LOG.info("finished to rename {} to {}. cost: {} ms",
origFilePath, destFilePath, (System.currentTimeMillis() - start));
return Status.OK;
}
@Override
public Status delete(String remotePath) {
// get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
// delete
boolean needReturn = true;
try {
TBrokerDeletePathRequest req = new TBrokerDeletePathRequest(TBrokerVersion.VERSION_ONE,
remotePath, properties);
TBrokerOperationStatus opst = client.deletePath(req);
if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to delete remote path: " + remotePath + ". msg: " + opst.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
}
LOG.info("finished to delete remote path {}.", remotePath);
} catch (TException e) {
needReturn = false;
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to delete remote path: " + remotePath + ". msg: " + e.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
} finally {
if (needReturn) {
ClientPool.brokerPool.returnObject(address, client);
} else {
ClientPool.brokerPool.invalidateObject(address, client);
}
}
return Status.OK;
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(String remotePath) throws UserException {
return null;
}
@Override
public Status list(String remotePath, List<RemoteFile> result) {
return list(remotePath, result, true);
}
// List files in remotePath
@Override
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
// get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client");
}
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
// list
boolean needReturn = true;
try {
TBrokerListPathRequest req = new TBrokerListPathRequest(TBrokerVersion.VERSION_ONE, remotePath,
false /* not recursive */, properties);
req.setFileNameOnly(fileNameOnly);
TBrokerListResponse rep = client.listPath(req);
TBrokerOperationStatus opst = rep.getOpStatus();
if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to list remote path: " + remotePath + ". msg: " + opst.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
}
List<TBrokerFileStatus> fileStatus = rep.getFiles();
for (TBrokerFileStatus tFile : fileStatus) {
RemoteFile file = new RemoteFile(tFile.path, !tFile.isDir, tFile.size, 0);
result.add(file);
}
LOG.info("finished to list remote path {}. get files: {}", remotePath, result);
} catch (TException e) {
needReturn = false;
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to list remote path: " + remotePath + ". msg: " + e.getMessage()
+ ", broker: " + BrokerUtil.printBroker(name, address));
} finally {
if (needReturn) {
ClientPool.brokerPool.returnObject(address, client);
} else {
ClientPool.brokerPool.invalidateObject(address, client);
}
}
return Status.OK;
}
@Override
public Status makeDir(String remotePath) {
return new Status(Status.ErrCode.COMMON_ERROR, "mkdir is not implemented.");
}
}

View File

@ -0,0 +1,463 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.remote;
import org.apache.doris.backup.RemoteFile;
import org.apache.doris.backup.Status;
import org.apache.doris.catalog.AuthType;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.URI;
import org.apache.doris.fs.obj.HdfsStorage;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DFSFileSystem extends RemoteFileSystem {
private static final Logger LOG = LogManager.getLogger(HdfsStorage.class);
private HDFSFileOperations operations = null;
public DFSFileSystem(Map<String, String> properties) {
this.properties = new HashMap<>(properties);
}
private FileSystem getHdfsClient(String remotePath)
throws UserException {
if (dfsFileSystem != null) {
return dfsFileSystem;
}
String username = properties.get(HdfsResource.HADOOP_USER_NAME);
Configuration conf = new HdfsConfiguration();
boolean isSecurityEnabled = false;
for (Map.Entry<String, String> propEntry : properties.entrySet()) {
conf.set(propEntry.getKey(), propEntry.getValue());
if (propEntry.getKey().equals(HdfsResource.HADOOP_SECURITY_AUTHENTICATION)
&& propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) {
isSecurityEnabled = true;
}
}
try {
if (isSecurityEnabled) {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(
properties.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL),
properties.get(HdfsResource.HADOOP_KERBEROS_KEYTAB));
}
if (username == null) {
dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf);
} else {
dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf, username);
}
} catch (Exception e) {
LOG.error("errors while connect to " + remotePath, e);
throw new UserException("errors while connect to " + remotePath, e);
}
operations = new HDFSFileOperations(dfsFileSystem);
return dfsFileSystem;
}
@Override
public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) {
LOG.debug("download from {} to {}, file size: {}.", remoteFilePath, localFilePath, fileSize);
final long start = System.currentTimeMillis();
FSDataInputStream fsDataInputStream;
try {
fsDataInputStream = operations.openReader(remoteFilePath, 0);
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
LOG.info("finished to open reader. download {} to {}.", remoteFilePath, localFilePath);
// delete local file if exist
File localFile = new File(localFilePath);
if (localFile.exists()) {
try {
Files.walk(Paths.get(localFilePath), FileVisitOption.FOLLOW_LINKS).sorted(Comparator.reverseOrder())
.map(java.nio.file.Path::toFile).forEach(File::delete);
} catch (IOException e) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to delete exist local file: " + localFilePath + ", msg: " + e.getMessage());
}
}
// create local file
try {
if (!localFile.createNewFile()) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to create local file: " + localFilePath);
}
} catch (IOException e) {
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to create local file: " + localFilePath + ", msg: " + e.getMessage());
}
String lastErrMsg = null;
Status status = Status.OK;
try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(localFile))) {
final long bufSize = 1024 * 1024; // 1MB
long leftSize = fileSize;
long readOffset = 0;
while (leftSize > 0) {
long readLen = Math.min(leftSize, bufSize);
try {
ByteBuffer data = readStreamBuffer(fsDataInputStream, readOffset, readLen);
if (readLen != data.array().length) {
LOG.warn(
"the actual read length does not equal to "
+ "the expected read length: {} vs. {}, file: {}",
data.array().length, readLen, remoteFilePath);
}
// write local file
out.write(data.array());
readOffset += data.array().length;
leftSize -= data.array().length;
} catch (Exception e) {
lastErrMsg = String.format(
"failed to read. " + "current read offset: %d, read length: %d,"
+ " file size: %d, file: %s. msg: %s",
readOffset, readLen, fileSize, remoteFilePath, e.getMessage());
LOG.warn(lastErrMsg);
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
break;
}
}
} catch (IOException e) {
return new Status(Status.ErrCode.COMMON_ERROR, "Got exception: " + e.getMessage());
} finally {
Status closeStatus = operations.closeReader(fsDataInputStream);
if (!closeStatus.ok()) {
LOG.warn(closeStatus.getErrMsg());
if (status.ok()) {
// we return close write error only if no other error has been encountered.
status = closeStatus;
}
}
}
LOG.info("finished to download from {} to {} with size: {}. cost {} ms", remoteFilePath, localFilePath,
fileSize, (System.currentTimeMillis() - start));
return status;
}
/**
* read data from fsDataInputStream.
*
* @param fsDataInputStream input stream for read.
* @param readOffset read offset.
* @param length read length.
* @return ByteBuffer
* @throws IOException when read data error.
*/
private static ByteBuffer readStreamBuffer(FSDataInputStream fsDataInputStream, long readOffset, long length)
throws IOException {
synchronized (fsDataInputStream) {
long currentStreamOffset;
try {
currentStreamOffset = fsDataInputStream.getPos();
} catch (IOException e) {
LOG.error("errors while get file pos from output stream", e);
throw new IOException("errors while get file pos from output stream", e);
}
if (currentStreamOffset != readOffset) {
// it's ok, when reading some format like parquet, it is not a sequential read
LOG.debug("invalid offset, current read offset is " + currentStreamOffset
+ " is not equal to request offset " + readOffset + " seek to it");
try {
fsDataInputStream.seek(readOffset);
} catch (IOException e) {
throw new IOException(String.format(
"current read offset %d is not equal to %d, and could not seek to it, msg: %s",
currentStreamOffset, readOffset, e.getMessage()));
}
}
// Avoid using the ByteBuffer based read for Hadoop because some
// FSDataInputStream
// implementations are not ByteBufferReadable,
// See https://issues.apache.org/jira/browse/HADOOP-14603
byte[] buf;
if (length > HDFSFileOperations.READ_BUFFER_SIZE) {
buf = new byte[HDFSFileOperations.READ_BUFFER_SIZE];
} else {
buf = new byte[(int) length];
}
try {
int readLength = readBytesFully(fsDataInputStream, buf);
if (readLength < 0) {
throw new IOException("end of file reached");
}
if (LOG.isDebugEnabled()) {
LOG.debug(
"read buffer from input stream, buffer size:" + buf.length + ", read length:" + readLength);
}
return ByteBuffer.wrap(buf, 0, readLength);
} catch (IOException e) {
LOG.error("errors while read data from stream", e);
throw new IOException("errors while read data from stream " + e.getMessage());
}
}
}
private static int readBytesFully(FSDataInputStream is, byte[] dest) throws IOException {
int readLength = 0;
while (readLength < dest.length) {
int availableReadLength = dest.length - readLength;
int n = is.read(dest, readLength, availableReadLength);
if (n <= 0) {
break;
}
readLength += n;
}
return readLength;
}
@Override
public Status exists(String remotePath) {
try {
URI pathUri = URI.create(remotePath);
Path inputFilePath = new Path(pathUri.getPath());
FileSystem fileSystem = getHdfsClient(remotePath);
boolean isPathExist = fileSystem.exists(inputFilePath);
if (!isPathExist) {
return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath);
}
return Status.OK;
} catch (Exception e) {
LOG.error("errors while check path exist " + remotePath, e);
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to check remote path exist: " + remotePath + ". msg: " + e.getMessage());
}
}
@Override
public Status directUpload(String content, String remoteFile) {
FSDataOutputStream fsDataOutputStream = null;
try {
fsDataOutputStream = operations.openWriter(remoteFile);
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
LOG.info("finished to open writer. directly upload to remote path {}.", remoteFile);
Status status = Status.OK;
try {
fsDataOutputStream.writeBytes(content);
} catch (IOException e) {
LOG.error("errors while write data to output stream", e);
status = new Status(Status.ErrCode.COMMON_ERROR, "write exception: " + e.getMessage());
} finally {
Status closeStatus = operations.closeWriter(fsDataOutputStream);
if (!closeStatus.ok()) {
LOG.warn(closeStatus.getErrMsg());
if (status.ok()) {
status = closeStatus;
}
}
}
return status;
}
@Override
public Status upload(String localPath, String remotePath) {
long start = System.currentTimeMillis();
LOG.debug("local path {}, remote path {}", localPath, remotePath);
FSDataOutputStream fsDataOutputStream = null;
try {
fsDataOutputStream = operations.openWriter(remotePath);
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}
LOG.info("finished to open writer. directly upload to remote path {}.", remotePath);
// read local file and write remote
File localFile = new File(localPath);
long fileLength = localFile.length();
byte[] readBuf = new byte[1024];
Status status = new Status(Status.ErrCode.OK, "");
try (BufferedInputStream in = new BufferedInputStream(new FileInputStream(localFile))) {
// save the last err msg
String lastErrMsg = null;
// save the current write offset of remote file
long writeOffset = 0;
// read local file, 1MB at a time
int bytesRead;
while ((bytesRead = in.read(readBuf)) != -1) {
try {
fsDataOutputStream.write(readBuf, 0, bytesRead);
} catch (IOException e) {
LOG.error("errors while write data to output stream", e);
lastErrMsg = String.format(
"failed to write hdfs. current write offset: %d, write length: %d, "
+ "file length: %d, file: %s, msg: errors while write data to output stream",
writeOffset, bytesRead, fileLength, remotePath);
status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg);
break;
}
// write succeed, update current write offset
writeOffset += bytesRead;
} // end of read local file loop
} catch (FileNotFoundException e1) {
return new Status(Status.ErrCode.COMMON_ERROR, "encounter file not found exception: " + e1.getMessage());
} catch (IOException e1) {
return new Status(Status.ErrCode.COMMON_ERROR, "encounter io exception: " + e1.getMessage());
} finally {
Status closeStatus = operations.closeWriter(fsDataOutputStream);
if (!closeStatus.ok()) {
LOG.warn(closeStatus.getErrMsg());
if (status.ok()) {
// we return close write error only if no other error has been encountered.
status = closeStatus;
}
}
}
if (status.ok()) {
LOG.info("finished to upload {} to remote path {}. cost: {} ms", localPath, remotePath,
(System.currentTimeMillis() - start));
}
return status;
}
@Override
public Status rename(String srcPath, String destPath) {
long start = System.currentTimeMillis();
try {
URI srcPathUri = URI.create(srcPath);
URI destPathUri = URI.create(destPath);
if (!srcPathUri.getAuthority().trim().equals(destPathUri.getAuthority().trim())) {
return new Status(Status.ErrCode.COMMON_ERROR, "only allow rename in same file system");
}
FileSystem fileSystem = getHdfsClient(destPath);
Path srcfilePath = new Path(srcPathUri.getPath());
Path destfilePath = new Path(destPathUri.getPath());
boolean isRenameSuccess = fileSystem.rename(srcfilePath, destfilePath);
if (!isRenameSuccess) {
return new Status(Status.ErrCode.COMMON_ERROR, "failed to rename " + srcPath + " to " + destPath);
}
} catch (UserException e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
} catch (IOException e) {
LOG.error("errors while rename path from " + srcPath + " to " + destPath);
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to rename remote " + srcPath + " to " + destPath + ", msg: " + e.getMessage());
}
LOG.info("finished to rename {} to {}. cost: {} ms", srcPath, destPath, (System.currentTimeMillis() - start));
return Status.OK;
}
@Override
public Status delete(String remotePath) {
try {
URI pathUri = URI.create(remotePath);
Path inputFilePath = new Path(pathUri.getPath());
FileSystem fileSystem = getHdfsClient(remotePath);
fileSystem.delete(inputFilePath, true);
} catch (UserException e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
} catch (IOException e) {
LOG.error("errors while delete path " + remotePath);
return new Status(Status.ErrCode.COMMON_ERROR,
"failed to delete remote path: " + remotePath + ", msg: " + e.getMessage());
}
LOG.info("finished to delete remote path {}.", remotePath);
return Status.OK;
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(String remotePath) throws UserException {
FileSystem fileSystem = getHdfsClient(remotePath);
try {
return fileSystem.listLocatedStatus(new Path(remotePath));
} catch (IOException e) {
throw new UserException("Failed to list located status for path: " + remotePath, e);
}
}
@Override
public Status list(String remotePath, List<RemoteFile> result) {
return list(remotePath, result, true);
}
/**
* get files in remotePath of HDFS.
*
* @param remotePath hdfs://namenode:port/path.
* @param result files in remotePath.
* @param fileNameOnly means get file only in remotePath if true.
* @return Status.OK if success.
*/
@Override
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
try {
URI pathUri = URI.create(remotePath);
FileSystem fileSystem = getHdfsClient(remotePath);
Path pathPattern = new Path(pathUri.getPath());
FileStatus[] files = fileSystem.globStatus(pathPattern);
if (files == null) {
LOG.info("no files in path " + remotePath);
return Status.OK;
}
for (FileStatus fileStatus : files) {
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(),
!fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(),
fileStatus.getBlockSize());
result.add(remoteFile);
}
} catch (FileNotFoundException e) {
LOG.info("file not found: " + e.getMessage());
return new Status(Status.ErrCode.NOT_FOUND, "file not found: " + e.getMessage());
} catch (Exception e) {
LOG.error("errors while get file status ", e);
return new Status(Status.ErrCode.COMMON_ERROR, "errors while get file status " + e.getMessage());
}
LOG.info("finish list path {}", remotePath);
return Status.OK;
}
@Override
public Status makeDir(String remotePath) {
return new Status(Status.ErrCode.COMMON_ERROR, "mkdir is not implemented.");
}
}

View File

@ -0,0 +1,126 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.remote;
import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.URI;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
public class HDFSFileOperations {
private static final Logger LOG = LogManager.getLogger(HDFSFileOperations.class);
public static final int READ_BUFFER_SIZE = 128 << 10; // 128k
public static final int WRITE_BUFFER_SIZE = 128 << 10; // 128k
private final FileSystem hdfsClient;
public HDFSFileOperations(FileSystem hdfsClient) {
this.hdfsClient = hdfsClient;
}
/**
* open remotePath for read.
*
* @param remotePath hdfs://namenode:port/path.
* @param startOffset the offset to read.
* @return FSDataInputStream if success.
* @throws UserException when get filesystem failed.
* @throws IOException when open file error.
*/
public FSDataInputStream openReader(String remotePath, long startOffset) throws UserException, IOException {
URI pathUri = URI.create(remotePath);
Path inputFilePath = new Path(pathUri.getPath());
try {
FSDataInputStream fsDataInputStream = hdfsClient.open(inputFilePath, READ_BUFFER_SIZE);
fsDataInputStream.seek(startOffset);
return fsDataInputStream;
} catch (IOException e) {
LOG.error("errors while open path", e);
throw new IOException(e.getMessage());
}
}
/**
* close for read.
*
* @param fsDataInputStream the input stream.
* @return Status.OK if success.
*/
public Status closeReader(FSDataInputStream fsDataInputStream) {
synchronized (fsDataInputStream) {
try {
fsDataInputStream.close();
} catch (IOException e) {
LOG.error("errors while close file input stream", e);
return new Status(Status.ErrCode.COMMON_ERROR,
"errors while close file input stream, msg: " + e.getMessage());
}
}
return Status.OK;
}
/**
* open remotePath for write.
*
* @param remotePath hdfs://namenode:port/path.
* @return FSDataOutputStream
* @throws UserException when get filesystem failed.
* @throws IOException when open path error.
*/
public FSDataOutputStream openWriter(String remotePath) throws UserException, IOException {
URI pathUri = URI.create(remotePath);
Path inputFilePath = new Path(pathUri.getPath());
try {
return hdfsClient.create(inputFilePath, true, WRITE_BUFFER_SIZE);
} catch (IOException e) {
LOG.error("errors while open path", e);
throw new IOException(e.getMessage());
}
}
/**
* close for write.
*
* @param fsDataOutputStream output stream.
* @return Status.OK if success.
*/
public Status closeWriter(FSDataOutputStream fsDataOutputStream) {
synchronized (fsDataOutputStream) {
try {
fsDataOutputStream.flush();
fsDataOutputStream.close();
LOG.info("finished to close writer");
} catch (IOException e) {
LOG.error("errors while close file output stream", e);
return new Status(Status.ErrCode.COMMON_ERROR, "failed to close writer, msg:" + e.getMessage());
}
}
return Status.OK;
}
}

View File

@ -0,0 +1,21 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.remote;
public abstract class ObjFileSystem extends RemoteFileSystem {
}

View File

@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.remote;
import org.apache.doris.fs.FileSystem;
import java.util.Map;
public abstract class RemoteFileSystem implements FileSystem {
protected org.apache.hadoop.fs.FileSystem dfsFileSystem = null;
protected Map<String, String> properties;
}

View File

@ -0,0 +1,416 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.fs.remote;
import org.apache.doris.backup.RemoteFile;
import org.apache.doris.backup.Status;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3URI;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.obj.S3Storage;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.http.HttpStatus;
import org.apache.http.client.utils.URIBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
public class S3FileSystem extends ObjFileSystem {
private static final Logger LOG = LogManager.getLogger(S3Storage.class);
private S3Client client;
// false: the s3 client will automatically convert endpoint to virtual-hosted style, eg:
// endpoint: http://s3.us-east-2.amazonaws.com
// bucket/path: my_bucket/file.txt
// auto convert: http://my_bucket.s3.us-east-2.amazonaws.com/file.txt
// true: the s3 client will NOT automatically convert endpoint to virtual-hosted style, we need to do some tricks:
// endpoint: http://cos.ap-beijing.myqcloud.com
// bucket/path: my_bucket/file.txt
// convert manually: See S3URI()
private boolean forceHostedStyle = false;
public S3FileSystem(Map<String, String> properties) {
this.properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
setProperties(properties);
}
private void setProperties(Map<String, String> properties) {
this.properties.putAll(properties);
try {
S3Properties.requiredS3Properties(this.properties);
} catch (DdlException e) {
throw new IllegalArgumentException(e);
}
// Virtual hosted-style is recommended in the s3 protocol.
// The path-style has been abandoned, but for some unexplainable reasons,
// the s3 client will determine whether the endpiont starts with `s3`
// when generating a virtual hosted-sytle request.
// If not, it will not be converted ( https://github.com/aws/aws-sdk-java-v2/pull/763),
// but the endpoints of many cloud service providers for object storage do not start with s3,
// so they cannot be converted to virtual hosted-sytle.
// Some of them, such as aliyun's oss, only support virtual hosted-style,
// and some of them(ceph) may only support
// path-style, so we need to do some additional conversion.
//
// use_path_style | !use_path_style
// S3 forceHostedStyle=false | forceHostedStyle=false
// !S3 forceHostedStyle=false | forceHostedStyle=true
//
// That is, for S3 endpoint, ignore the `use_path_style` property, and the s3 client will automatically use
// virtual hosted-sytle.
// And for other endpoint, if `use_path_style` is true, use path style. Otherwise, use virtual hosted-sytle.
if (!this.properties.get(S3Properties.ENDPOINT).toLowerCase().startsWith("s3")) {
forceHostedStyle = !this.properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false")
.equalsIgnoreCase("true");
} else {
forceHostedStyle = false;
}
}
private FileSystem getFileSystem(String remotePath) throws UserException {
if (dfsFileSystem == null) {
Configuration conf = new Configuration();
System.setProperty("com.amazonaws.services.s3.enableV4", "true");
PropertyConverter.convertToHadoopFSProperties(properties).forEach(conf::set);
try {
dfsFileSystem = FileSystem.get(new URI(remotePath), conf);
} catch (Exception e) {
throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e);
}
}
return dfsFileSystem;
}
private S3Client getClient(String bucket) throws UserException {
if (client == null) {
URI tmpEndpoint = URI.create(properties.get(S3Properties.ENDPOINT));
StaticCredentialsProvider scp;
if (!properties.containsKey(S3Properties.SESSION_TOKEN)) {
AwsBasicCredentials awsBasic = AwsBasicCredentials.create(
properties.get(S3Properties.ACCESS_KEY),
properties.get(S3Properties.SECRET_KEY));
scp = StaticCredentialsProvider.create(awsBasic);
} else {
AwsSessionCredentials awsSession = AwsSessionCredentials.create(
properties.get(S3Properties.ACCESS_KEY),
properties.get(S3Properties.SECRET_KEY),
properties.get(S3Properties.SESSION_TOKEN));
scp = StaticCredentialsProvider.create(awsSession);
}
EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy
.builder()
.baseDelay(Duration.ofSeconds(1))
.maxBackoffTime(Duration.ofMinutes(1))
.build();
// retry 3 time with Equal backoff
RetryPolicy retryPolicy = RetryPolicy
.builder()
.numRetries(3)
.backoffStrategy(backoffStrategy)
.build();
ClientOverrideConfiguration clientConf = ClientOverrideConfiguration
.builder()
// set retry policy
.retryPolicy(retryPolicy)
// using AwsS3V4Signer
.putAdvancedOption(SdkAdvancedClientOption.SIGNER, AwsS3V4Signer.create())
.build();
URI endpoint = StringUtils.isEmpty(bucket) ? tmpEndpoint :
URI.create(new URIBuilder(tmpEndpoint).setHost(bucket + "." + tmpEndpoint.getHost()).toString());
client = S3Client.builder()
.endpointOverride(endpoint)
.credentialsProvider(scp)
.region(Region.of(properties.get(S3Properties.REGION)))
.overrideConfiguration(clientConf)
// disable chunkedEncoding because of bos not supported
// use virtual hosted-style access
.serviceConfiguration(S3Configuration.builder()
.chunkedEncodingEnabled(false)
.pathStyleAccessEnabled(false)
.build())
.build();
}
return client;
}
@Override
public Status exists(String remotePath) {
try {
S3URI uri = S3URI.create(remotePath, forceHostedStyle);
getClient(uri.getVirtualBucket())
.headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
return Status.OK;
} catch (S3Exception e) {
if (e.statusCode() == HttpStatus.SC_NOT_FOUND) {
return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath);
} else {
LOG.warn("headObject failed:", e);
return new Status(Status.ErrCode.COMMON_ERROR, "headObject failed: " + e.getMessage());
}
} catch (UserException ue) {
LOG.warn("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
}
}
@Override
public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) {
long start = System.currentTimeMillis();
// Write the data to a local file
File localFile = new File(localFilePath);
if (localFile.exists()) {
try {
Files.walk(Paths.get(localFilePath), FileVisitOption.FOLLOW_LINKS)
.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(File::delete);
} catch (IOException e) {
return new Status(
Status.ErrCode.COMMON_ERROR, "failed to delete exist local file: " + localFilePath);
}
}
try {
S3URI uri = S3URI.create(remoteFilePath, forceHostedStyle);
GetObjectResponse response = getClient(uri.getVirtualBucket()).getObject(
GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), localFile.toPath());
if (localFile.length() == fileSize) {
LOG.info(
"finished to download from {} to {} with size: {}. cost {} ms",
remoteFilePath,
localFilePath,
fileSize,
(System.currentTimeMillis() - start));
return Status.OK;
} else {
return new Status(Status.ErrCode.COMMON_ERROR, response.toString());
}
} catch (S3Exception s3Exception) {
return new Status(
Status.ErrCode.COMMON_ERROR,
"get file from s3 error: " + s3Exception.awsErrorDetails().errorMessage());
} catch (UserException ue) {
LOG.warn("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
} catch (Exception e) {
return new Status(Status.ErrCode.COMMON_ERROR, e.toString());
}
}
@Override
public Status directUpload(String content, String remoteFile) {
try {
S3URI uri = S3URI.create(remoteFile, forceHostedStyle);
PutObjectResponse response =
getClient(uri.getVirtualBucket())
.putObject(
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
RequestBody.fromBytes(content.getBytes()));
LOG.info("upload content success: " + response.eTag());
return Status.OK;
} catch (S3Exception e) {
LOG.warn("write content failed:", e);
return new Status(Status.ErrCode.COMMON_ERROR, "write content failed: " + e.getMessage());
} catch (UserException ue) {
LOG.warn("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
} catch (Exception e) {
LOG.warn("connect to s3 failed: ", e);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + e.getMessage());
}
}
public Status copy(String origFilePath, String destFilePath) {
try {
S3URI origUri = S3URI.create(origFilePath);
S3URI descUri = S3URI.create(destFilePath, forceHostedStyle);
getClient(descUri.getVirtualBucket())
.copyObject(
CopyObjectRequest.builder()
.copySource(origUri.getBucket() + "/" + origUri.getKey())
.destinationBucket(descUri.getBucket())
.destinationKey(descUri.getKey())
.build());
return Status.OK;
} catch (S3Exception e) {
LOG.error("copy file failed: ", e);
return new Status(Status.ErrCode.COMMON_ERROR, "copy file failed: " + e.getMessage());
} catch (UserException ue) {
LOG.error("copy to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
}
}
@Override
public Status upload(String localPath, String remotePath) {
try {
S3URI uri = S3URI.create(remotePath, forceHostedStyle);
PutObjectResponse response =
getClient(uri.getVirtualBucket())
.putObject(
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
RequestBody.fromFile(new File(localPath)));
LOG.info("upload file " + localPath + " success: " + response.eTag());
return Status.OK;
} catch (S3Exception e) {
LOG.error("write file failed:", e);
return new Status(Status.ErrCode.COMMON_ERROR, "write file failed: " + e.getMessage());
} catch (UserException ue) {
LOG.error("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
}
}
@Override
public Status rename(String origFilePath, String destFilePath) {
Status status = copy(origFilePath, destFilePath);
if (status.ok()) {
return delete(origFilePath);
} else {
return status;
}
}
@Override
public Status delete(String remotePath) {
try {
S3URI uri = S3URI.create(remotePath, forceHostedStyle);
DeleteObjectResponse response =
getClient(uri.getVirtualBucket())
.deleteObject(
DeleteObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build());
LOG.info("delete file " + remotePath + " success: " + response.toString());
return Status.OK;
} catch (S3Exception e) {
LOG.warn("delete file failed: ", e);
if (e.statusCode() == HttpStatus.SC_NOT_FOUND) {
return Status.OK;
}
return new Status(Status.ErrCode.COMMON_ERROR, "delete file failed: " + e.getMessage());
} catch (UserException ue) {
LOG.warn("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
}
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(String remotePath) throws UserException {
FileSystem fileSystem = getFileSystem(remotePath);
try {
return fileSystem.listLocatedStatus(new org.apache.hadoop.fs.Path(remotePath));
} catch (IOException e) {
throw new UserException("Failed to list located status for path: " + remotePath, e);
}
}
@Override
public Status list(String remotePath, List<RemoteFile> result) {
return list(remotePath, result, true);
}
// broker file pattern glob is too complex, so we use hadoop directly
@Override
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
try {
FileSystem s3AFileSystem = getFileSystem(remotePath);
org.apache.hadoop.fs.Path pathPattern = new org.apache.hadoop.fs.Path(remotePath);
FileStatus[] files = s3AFileSystem.globStatus(pathPattern);
if (files == null) {
return Status.OK;
}
for (FileStatus fileStatus : files) {
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(),
!fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(),
fileStatus.getBlockSize());
result.add(remoteFile);
}
} catch (FileNotFoundException e) {
LOG.info("file not found: " + e.getMessage());
return new Status(Status.ErrCode.NOT_FOUND, "file not found: " + e.getMessage());
} catch (Exception e) {
LOG.error("errors while get file status ", e);
return new Status(Status.ErrCode.COMMON_ERROR, "errors while get file status " + e.getMessage());
}
return Status.OK;
}
@Override
public Status makeDir(String remotePath) {
if (!remotePath.endsWith("/")) {
remotePath += "/";
}
try {
S3URI uri = S3URI.create(remotePath, forceHostedStyle);
PutObjectResponse response =
getClient(uri.getVirtualBucket())
.putObject(
PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(),
RequestBody.empty());
LOG.info("makeDir success: " + response.eTag());
return Status.OK;
} catch (S3Exception e) {
LOG.error("makeDir failed:", e);
return new Status(Status.ErrCode.COMMON_ERROR, "makeDir failed: " + e.getMessage());
} catch (UserException ue) {
LOG.error("connect to s3 failed: ", ue);
return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage());
}
}
}

View File

@ -20,7 +20,6 @@ package org.apache.doris.tablefunction;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.backup.BlobStorage;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
@ -28,6 +27,7 @@ import org.apache.doris.common.util.S3URI;
import org.apache.doris.datasource.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.obj.BlobStorage;
import org.apache.doris.thrift.TFileType;
import com.google.common.annotations.VisibleForTesting;