diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java index dcfa6f8515..1b05d7e11f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java @@ -18,12 +18,12 @@ package org.apache.doris.analysis; import org.apache.doris.analysis.StorageBackend.StorageType; -import org.apache.doris.backup.BlobStorage; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.datasource.property.S3ClientBEProperties; import org.apache.doris.datasource.property.constants.BosProperties; +import org.apache.doris.fs.obj.BlobStorage; import org.apache.doris.thrift.TFileType; import com.google.common.collect.Maps; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java index 57478868c9..18aaf722d8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java @@ -17,7 +17,6 @@ package org.apache.doris.analysis; -import org.apache.doris.backup.HdfsStorage; import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; @@ -47,6 +46,7 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.hadoop.fs.Path; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -667,13 +667,19 @@ public class OutFileClause { S3Properties.requiredS3Properties(brokerProps); } else if (storageType == StorageBackend.StorageType.HDFS) { if (!brokerProps.containsKey(HdfsResource.HADOOP_FS_NAME)) { - brokerProps.put(HdfsResource.HADOOP_FS_NAME, HdfsStorage.getFsName(filePath)); + brokerProps.put(HdfsResource.HADOOP_FS_NAME, getFsName(filePath)); } } - brokerDesc = new BrokerDesc(brokerName, storageType, brokerProps); } + public static String getFsName(String path) { + Path hdfsPath = new Path(path); + String fullPath = hdfsPath.toUri().toString(); + String filePath = hdfsPath.toUri().getPath(); + return fullPath.replace(filePath, ""); + } + void setParquetCompressionType(String propertyValue) { if (PARQUET_COMPRESSION_TYPE_MAP.containsKey(propertyValue)) { this.parquetCompressionType = PARQUET_COMPRESSION_TYPE_MAP.get(propertyValue); diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java index 72eb59392b..d1731155cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java @@ -46,6 +46,7 @@ import org.apache.doris.common.ErrorReport; import org.apache.doris.common.Pair; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.MasterDaemon; +import org.apache.doris.fs.obj.BlobStorage; import org.apache.doris.task.DirMoveTask; import org.apache.doris.task.DownloadTask; import org.apache.doris.task.SnapshotTask; diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java index f46bd7b94d..13a2cff3df 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/Repository.java @@ -28,6 +28,10 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.fs.obj.BlobStorage; +import org.apache.doris.fs.obj.BrokerStorage; +import org.apache.doris.fs.obj.HdfsStorage; +import org.apache.doris.fs.obj.S3Storage; import org.apache.doris.system.Backend; import com.google.common.base.Joiner; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java index e3cb598f24..138904c51a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java @@ -31,12 +31,12 @@ import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.StringLiteral; -import org.apache.doris.backup.BlobStorage; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.property.constants.HMSProperties; +import org.apache.doris.fs.obj.BlobStorage; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TExprOpcode; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java index 76cba51a22..fc90fa053e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java @@ -17,7 +17,6 @@ package org.apache.doris.catalog; -import org.apache.doris.backup.S3Storage; import org.apache.doris.backup.Status; import org.apache.doris.common.DdlException; import org.apache.doris.common.proc.BaseProcResult; @@ -25,6 +24,7 @@ import org.apache.doris.common.util.PrintableMap; import org.apache.doris.datasource.credentials.CloudCredentialWithEndpoint; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.fs.obj.S3Storage; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index 5c0c7e5299..f335701d4a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -18,7 +18,6 @@ package org.apache.doris.common.util; import org.apache.doris.analysis.BrokerDesc; -import org.apache.doris.backup.BlobStorage; import org.apache.doris.backup.RemoteFile; import org.apache.doris.backup.Status; import org.apache.doris.catalog.Env; @@ -30,6 +29,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.datasource.hive.HiveMetaStoreCache; +import org.apache.doris.fs.obj.BlobStorage; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TBrokerCheckPathExistRequest; import org.apache.doris.thrift.TBrokerCheckPathExistResponse; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java new file mode 100644 index 0000000000..122e02c95a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/FileSystem.java @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +import org.apache.doris.backup.RemoteFile; +import org.apache.doris.backup.Status; +import org.apache.doris.common.UserException; + +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; + +import java.util.List; + +public interface FileSystem { + Status exists(String remotePath); + + Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize); + + Status upload(String localPath, String remotePath); + + Status directUpload(String content, String remoteFile); + + Status rename(String origFilePath, String destFilePath); + + Status delete(String remotePath); + + default RemoteIterator listLocatedStatus(String remotePath) throws UserException { + throw new UserException("Not support to listLocatedStatus."); + } + + // List files in remotePath + // The remote file name will only contains file name only(Not full path) + Status list(String remotePath, List result); + + Status list(String remotePath, List result, boolean fileNameOnly); + + Status makeDir(String remotePath); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java new file mode 100644 index 0000000000..5e6aa0b4c4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/LocalFileSystem.java @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs; + +public class LocalFileSystem { +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/BlobStorage.java similarity index 98% rename from fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java rename to fe/fe-core/src/main/java/org/apache/doris/fs/obj/BlobStorage.java index 3bf7f50818..5a40d42d07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/BlobStorage.java @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.backup; +package org.apache.doris.fs.obj; import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.backup.RemoteFile; +import org.apache.doris.backup.Status; import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.common.io.Text; diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BrokerStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/BrokerStorage.java similarity index 99% rename from fe/fe-core/src/main/java/org/apache/doris/backup/BrokerStorage.java rename to fe/fe-core/src/main/java/org/apache/doris/fs/obj/BrokerStorage.java index 0066388da4..9fae34f62e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/BrokerStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/BrokerStorage.java @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.backup; +package org.apache.doris.fs.obj; import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.backup.RemoteFile; +import org.apache.doris.backup.Status; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.AnalysisException; diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/HdfsStorage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/HdfsStorage.java similarity index 92% rename from fe/fe-core/src/main/java/org/apache/doris/backup/HdfsStorage.java rename to fe/fe-core/src/main/java/org/apache/doris/fs/obj/HdfsStorage.java index a7225f6e60..79a65db27b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/HdfsStorage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/HdfsStorage.java @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.backup; +package org.apache.doris.fs.obj; import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.backup.RemoteFile; +import org.apache.doris.backup.Status; import org.apache.doris.catalog.AuthType; import org.apache.doris.catalog.HdfsResource; import org.apache.doris.common.UserException; @@ -77,43 +79,36 @@ public class HdfsStorage extends BlobStorage { setName(StorageBackend.StorageType.HDFS.name()); } - public static String getFsName(String path) { - Path hdfsPath = new Path(path); - String fullPath = hdfsPath.toUri().toString(); - String filePath = hdfsPath.toUri().getPath(); - return fullPath.replace(filePath, ""); - } - @Override public FileSystem getFileSystem(String remotePath) throws UserException { - if (dfsFileSystem == null) { - String username = hdfsProperties.get(HdfsResource.HADOOP_USER_NAME); - Configuration conf = new HdfsConfiguration(); - boolean isSecurityEnabled = false; - for (Map.Entry propEntry : hdfsProperties.entrySet()) { - conf.set(propEntry.getKey(), propEntry.getValue()); - if (propEntry.getKey().equals(HdfsResource.HADOOP_SECURITY_AUTHENTICATION) - && propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) { - isSecurityEnabled = true; - } + if (dfsFileSystem != null) { + return dfsFileSystem; + } + String username = hdfsProperties.get(HdfsResource.HADOOP_USER_NAME); + Configuration conf = new HdfsConfiguration(); + boolean isSecurityEnabled = false; + for (Map.Entry propEntry : hdfsProperties.entrySet()) { + conf.set(propEntry.getKey(), propEntry.getValue()); + if (propEntry.getKey().equals(HdfsResource.HADOOP_SECURITY_AUTHENTICATION) + && propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) { + isSecurityEnabled = true; } - - try { - if (isSecurityEnabled) { - UserGroupInformation.setConfiguration(conf); - UserGroupInformation.loginUserFromKeytab( - hdfsProperties.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL), - hdfsProperties.get(HdfsResource.HADOOP_KERBEROS_KEYTAB)); - } - if (username == null) { - dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf); - } else { - dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf, username); - } - } catch (Exception e) { - LOG.error("errors while connect to " + remotePath, e); - throw new UserException("errors while connect to " + remotePath, e); + } + try { + if (isSecurityEnabled) { + UserGroupInformation.setConfiguration(conf); + UserGroupInformation.loginUserFromKeytab( + hdfsProperties.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL), + hdfsProperties.get(HdfsResource.HADOOP_KERBEROS_KEYTAB)); } + if (username == null) { + dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf); + } else { + dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf, username); + } + } catch (Exception e) { + LOG.error("errors while connect to " + remotePath, e); + throw new UserException("errors while connect to " + remotePath, e); } return dfsFileSystem; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3Storage.java similarity index 99% rename from fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java rename to fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3Storage.java index 39b62458da..8a30ebbe59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/obj/S3Storage.java @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.backup; +package org.apache.doris.fs.obj; import org.apache.doris.analysis.StorageBackend; +import org.apache.doris.backup.RemoteFile; +import org.apache.doris.backup.Status; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.S3URI; diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileOperations.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileOperations.java new file mode 100644 index 0000000000..8fe71713e9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileOperations.java @@ -0,0 +1,142 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.remote; + +import org.apache.doris.backup.Status; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.thrift.TBrokerCloseReaderRequest; +import org.apache.doris.thrift.TBrokerCloseWriterRequest; +import org.apache.doris.thrift.TBrokerFD; +import org.apache.doris.thrift.TBrokerOpenMode; +import org.apache.doris.thrift.TBrokerOpenReaderRequest; +import org.apache.doris.thrift.TBrokerOpenReaderResponse; +import org.apache.doris.thrift.TBrokerOpenWriterRequest; +import org.apache.doris.thrift.TBrokerOpenWriterResponse; +import org.apache.doris.thrift.TBrokerOperationStatus; +import org.apache.doris.thrift.TBrokerOperationStatusCode; +import org.apache.doris.thrift.TBrokerVersion; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPaloBrokerService; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; + +import java.util.Map; + +public class BrokerFileOperations { + + private static final Logger LOG = LogManager.getLogger(BrokerFileOperations.class); + + private String name; + + private Map properties; + + public BrokerFileOperations(String name, Map properties) { + this.name = name; + this.properties = properties; + } + + + public Status openReader(TPaloBrokerService.Client client, TNetworkAddress address, String remoteFilePath, + TBrokerFD fd) { + try { + TBrokerOpenReaderRequest req = new TBrokerOpenReaderRequest(TBrokerVersion.VERSION_ONE, remoteFilePath, + 0, BrokerFileSystem.clientId(), properties); + TBrokerOpenReaderResponse rep = client.openReader(req); + TBrokerOperationStatus opst = rep.getOpStatus(); + if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) { + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to open reader on broker " + BrokerUtil.printBroker(name, address) + + " for file: " + remoteFilePath + ". msg: " + opst.getMessage()); + } + fd.setHigh(rep.getFd().getHigh()); + fd.setLow(rep.getFd().getLow()); + } catch (TException e) { + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to open reader on broker " + BrokerUtil.printBroker(name, address) + + " for file: " + remoteFilePath + ". msg: " + e.getMessage()); + } + return Status.OK; + } + + public Status closeReader(TPaloBrokerService.Client client, TNetworkAddress address, TBrokerFD fd) { + try { + TBrokerCloseReaderRequest req = new TBrokerCloseReaderRequest(TBrokerVersion.VERSION_ONE, fd); + TBrokerOperationStatus st = client.closeReader(req); + if (st.getStatusCode() != TBrokerOperationStatusCode.OK) { + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to close reader on broker " + BrokerUtil.printBroker(name, address) + + " for fd: " + fd); + } + + LOG.info("finished to close reader. fd: {}.", fd); + } catch (TException e) { + return new Status(Status.ErrCode.BAD_CONNECTION, + "failed to close reader on broker " + BrokerUtil.printBroker(name, address) + + ", fd " + fd + ", msg: " + e.getMessage()); + } + + return Status.OK; + } + + public Status openWriter(TPaloBrokerService.Client client, TNetworkAddress address, String remoteFile, + TBrokerFD fd) { + try { + TBrokerOpenWriterRequest req = new TBrokerOpenWriterRequest(TBrokerVersion.VERSION_ONE, + remoteFile, TBrokerOpenMode.APPEND, BrokerFileSystem.clientId(), properties); + TBrokerOpenWriterResponse rep = client.openWriter(req); + TBrokerOperationStatus opst = rep.getOpStatus(); + if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) { + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to open writer on broker " + BrokerUtil.printBroker(name, address) + + " for file: " + remoteFile + ". msg: " + opst.getMessage()); + } + + fd.setHigh(rep.getFd().getHigh()); + fd.setLow(rep.getFd().getLow()); + LOG.info("finished to open writer. fd: {}. directly upload to remote path {}.", fd, remoteFile); + } catch (TException e) { + return new Status(Status.ErrCode.BAD_CONNECTION, + "failed to open writer on broker " + BrokerUtil.printBroker(name, address) + + ", err: " + e.getMessage()); + } + + return Status.OK; + } + + public Status closeWriter(TPaloBrokerService.Client client, TNetworkAddress address, TBrokerFD fd) { + try { + TBrokerCloseWriterRequest req = new TBrokerCloseWriterRequest(TBrokerVersion.VERSION_ONE, fd); + TBrokerOperationStatus st = client.closeWriter(req); + if (st.getStatusCode() != TBrokerOperationStatusCode.OK) { + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to close writer on broker " + BrokerUtil.printBroker(name, address) + + " for fd: " + fd); + } + + LOG.info("finished to close writer. fd: {}.", fd); + } catch (TException e) { + return new Status(Status.ErrCode.BAD_CONNECTION, + "failed to close writer on broker " + BrokerUtil.printBroker(name, address) + + ", fd " + fd + ", msg: " + e.getMessage()); + } + + return Status.OK; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java new file mode 100644 index 0000000000..435e1219be --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/BrokerFileSystem.java @@ -0,0 +1,626 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.remote; + +import org.apache.doris.backup.RemoteFile; +import org.apache.doris.backup.Status; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.FsBroker; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ClientPool; +import org.apache.doris.common.Config; +import org.apache.doris.common.Pair; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.service.FrontendOptions; +import org.apache.doris.thrift.TBrokerCheckPathExistRequest; +import org.apache.doris.thrift.TBrokerCheckPathExistResponse; +import org.apache.doris.thrift.TBrokerDeletePathRequest; +import org.apache.doris.thrift.TBrokerFD; +import org.apache.doris.thrift.TBrokerFileStatus; +import org.apache.doris.thrift.TBrokerListPathRequest; +import org.apache.doris.thrift.TBrokerListResponse; +import org.apache.doris.thrift.TBrokerOperationStatus; +import org.apache.doris.thrift.TBrokerOperationStatusCode; +import org.apache.doris.thrift.TBrokerPReadRequest; +import org.apache.doris.thrift.TBrokerPWriteRequest; +import org.apache.doris.thrift.TBrokerReadResponse; +import org.apache.doris.thrift.TBrokerRenamePathRequest; +import org.apache.doris.thrift.TBrokerVersion; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPaloBrokerService; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.transport.TTransportException; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileVisitOption; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +public class BrokerFileSystem extends RemoteFileSystem { + private static final Logger LOG = LogManager.getLogger(BrokerFileSystem.class); + private String name; + + private BrokerFileOperations operations; + + public BrokerFileSystem(String name, Map properties) { + this.name = name; + this.properties = properties; + this.operations = new BrokerFileOperations(name, properties); + } + + public static String clientId() { + return FrontendOptions.getLocalHostAddress() + ":" + Config.edit_log_port; + } + + public Pair getBroker() { + Pair result = Pair.of(null, null); + FsBroker broker; + try { + String localIP = FrontendOptions.getLocalHostAddress(); + broker = Env.getCurrentEnv().getBrokerMgr().getBroker(name, localIP); + } catch (AnalysisException e) { + LOG.warn("failed to get a broker address: " + e.getMessage()); + return null; + } + TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port); + TPaloBrokerService.Client client; + try { + client = ClientPool.brokerPool.borrowObject(address); + } catch (Exception e) { + LOG.warn("failed to get broker client: " + e.getMessage()); + return null; + } + + result.first = client; + result.second = address; + LOG.info("get broker: {}", BrokerUtil.printBroker(name, address)); + return result; + } + + @Override + public Status exists(String remotePath) { + // 1. get a proper broker + Pair pair = getBroker(); + if (pair == null) { + return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client"); + } + TPaloBrokerService.Client client = pair.first; + TNetworkAddress address = pair.second; + + // check path + boolean needReturn = true; + try { + TBrokerCheckPathExistRequest req = new TBrokerCheckPathExistRequest(TBrokerVersion.VERSION_ONE, + remotePath, properties); + TBrokerCheckPathExistResponse rep = client.checkPathExist(req); + TBrokerOperationStatus opst = rep.getOpStatus(); + if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) { + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to check remote path exist: " + remotePath + + ", broker: " + BrokerUtil.printBroker(name, address) + + ". msg: " + opst.getMessage()); + } + + if (!rep.isIsPathExist()) { + return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath); + } + + return Status.OK; + } catch (TException e) { + needReturn = false; + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to check remote path exist: " + remotePath + + ", broker: " + BrokerUtil.printBroker(name, address) + + ". msg: " + e.getMessage()); + } finally { + if (needReturn) { + ClientPool.brokerPool.returnObject(address, client); + } else { + ClientPool.brokerPool.invalidateObject(address, client); + } + } + } + + @Override + public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) { + LOG.debug("download from {} to {}, file size: {}.", remoteFilePath, localFilePath, fileSize); + + long start = System.currentTimeMillis(); + + // 1. get a proper broker + Pair pair = getBroker(); + if (pair == null) { + return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client"); + } + TPaloBrokerService.Client client = pair.first; + TNetworkAddress address = pair.second; + + // 2. open file reader with broker + TBrokerFD fd = new TBrokerFD(); + Status opStatus = operations.openReader(client, address, remoteFilePath, fd); + if (!opStatus.ok()) { + return opStatus; + } + LOG.info("finished to open reader. fd: {}. download {} to {}.", + fd, remoteFilePath, localFilePath); + Preconditions.checkNotNull(fd); + // 3. delete local file if exist + File localFile = new File(localFilePath); + if (localFile.exists()) { + try { + Files.walk(Paths.get(localFilePath), FileVisitOption.FOLLOW_LINKS) + .sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete); + } catch (IOException e) { + return new Status(Status.ErrCode.COMMON_ERROR, "failed to delete exist local file: " + localFilePath); + } + } + + // 4. create local file + Status status = Status.OK; + try { + if (!localFile.createNewFile()) { + return new Status(Status.ErrCode.COMMON_ERROR, "failed to create local file: " + localFilePath); + } + } catch (IOException e) { + return new Status(Status.ErrCode.COMMON_ERROR, "failed to create local file: " + + localFilePath + ", msg: " + e.getMessage()); + } + + // 5. read remote file with broker and write to local + String lastErrMsg = null; + try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(localFile))) { + final long bufSize = 1024 * 1024; // 1MB + long leftSize = fileSize; + long readOffset = 0; + while (leftSize > 0) { + long readLen = Math.min(leftSize, bufSize); + TBrokerReadResponse rep = null; + // We only retry if we encounter a timeout thrift exception. + int tryTimes = 0; + while (tryTimes < 3) { + try { + TBrokerPReadRequest req = new TBrokerPReadRequest(TBrokerVersion.VERSION_ONE, + fd, readOffset, readLen); + rep = client.pread(req); + if (rep.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) { + // pread return failure. + lastErrMsg = String.format("failed to read via broker %s. " + + "current read offset: %d, read length: %d," + + " file size: %d, file: %s, err code: %d, msg: %s", + BrokerUtil.printBroker(name, address), + readOffset, readLen, fileSize, + remoteFilePath, rep.getOpStatus().getStatusCode().getValue(), + rep.getOpStatus().getMessage()); + LOG.warn(lastErrMsg); + status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); + } + if (rep.opStatus.statusCode != TBrokerOperationStatusCode.END_OF_FILE) { + LOG.debug("download. readLen: {}, read data len: {}, left size:{}. total size: {}", + readLen, rep.getData().length, leftSize, fileSize); + } else { + LOG.debug("read eof: " + remoteFilePath); + } + break; + } catch (TTransportException e) { + if (e.getType() == TTransportException.TIMED_OUT) { + // we only retry when we encounter timeout exception. + lastErrMsg = String.format("failed to read via broker %s. " + + "current read offset: %d, read length: %d," + + " file size: %d, file: %s, timeout.", + BrokerUtil.printBroker(name, address), + readOffset, readLen, fileSize, + remoteFilePath); + tryTimes++; + continue; + } + + lastErrMsg = String.format("failed to read via broker %s. " + + "current read offset: %d, read length: %d," + + " file size: %d, file: %s. msg: %s", + BrokerUtil.printBroker(name, address), + readOffset, readLen, fileSize, + remoteFilePath, e.getMessage()); + LOG.warn(lastErrMsg); + status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); + break; + } catch (TException e) { + lastErrMsg = String.format("failed to read via broker %s. " + + "current read offset: %d, read length: %d," + + " file size: %d, file: %s. msg: %s", + BrokerUtil.printBroker(name, address), + readOffset, readLen, fileSize, + remoteFilePath, e.getMessage()); + LOG.warn(lastErrMsg); + status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); + break; + } + } // end of retry loop + + if (status.ok() && tryTimes < 3) { + // read succeed, write to local file + Preconditions.checkNotNull(rep); + // NOTICE(cmy): Sometimes the actual read length does not equal to the expected read length, + // even if the broker's read buffer size is large enough. + // I don't know why, but have to adapt to it. + if (rep.getData().length != readLen) { + LOG.warn("the actual read length does not equal to " + + "the expected read length: {} vs. {}, file: {}, broker: {}", + rep.getData().length, readLen, remoteFilePath, + BrokerUtil.printBroker(name, address)); + } + + out.write(rep.getData()); + readOffset += rep.getData().length; + leftSize -= rep.getData().length; + } else { + status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); + break; + } + } // end of reading remote file + } catch (IOException e) { + return new Status(Status.ErrCode.COMMON_ERROR, "Got exception: " + e.getMessage() + ", broker: " + + BrokerUtil.printBroker(name, address)); + } finally { + // close broker reader + Status closeStatus = operations.closeReader(client, address, fd); + if (!closeStatus.ok()) { + LOG.warn(closeStatus.getErrMsg()); + if (status.ok()) { + // we return close write error only if no other error has been encountered. + status = closeStatus; + } + ClientPool.brokerPool.invalidateObject(address, client); + } else { + ClientPool.brokerPool.returnObject(address, client); + } + } + + LOG.info("finished to download from {} to {} with size: {}. cost {} ms", + remoteFilePath, localFilePath, fileSize, (System.currentTimeMillis() - start)); + return status; + } + + // directly upload the content to remote file + @Override + public Status directUpload(String content, String remoteFile) { + // 1. get a proper broker + Pair pair = getBroker(); + if (pair == null) { + return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client"); + } + TPaloBrokerService.Client client = pair.first; + TNetworkAddress address = pair.second; + + TBrokerFD fd = new TBrokerFD(); + Status status = Status.OK; + try { + // 2. open file writer with broker + status = operations.openWriter(client, address, remoteFile, fd); + if (!status.ok()) { + return status; + } + + // 3. write content + try { + ByteBuffer bb = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8)); + TBrokerPWriteRequest req = new TBrokerPWriteRequest(TBrokerVersion.VERSION_ONE, fd, 0, bb); + TBrokerOperationStatus opst = client.pwrite(req); + if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) { + // pwrite return failure. + status = new Status(Status.ErrCode.COMMON_ERROR, "write failed: " + opst.getMessage() + + ", broker: " + BrokerUtil.printBroker(name, address)); + } + } catch (TException e) { + status = new Status(Status.ErrCode.BAD_CONNECTION, "write exception: " + e.getMessage() + + ", broker: " + BrokerUtil.printBroker(name, address)); + } + } finally { + Status closeStatus = operations.closeWriter(client, address, fd); + if (closeStatus.getErrCode() == Status.ErrCode.BAD_CONNECTION + || status.getErrCode() == Status.ErrCode.BAD_CONNECTION) { + ClientPool.brokerPool.invalidateObject(address, client); + } else { + ClientPool.brokerPool.returnObject(address, client); + } + } + + return status; + } + + @Override + public Status upload(String localPath, String remotePath) { + long start = System.currentTimeMillis(); + // 1. get a proper broker + Pair pair = getBroker(); + if (pair == null) { + return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client"); + } + TPaloBrokerService.Client client = pair.first; + TNetworkAddress address = pair.second; + + // 2. open file write with broker + TBrokerFD fd = new TBrokerFD(); + Status status = operations.openWriter(client, address, remotePath, fd); + if (!status.ok()) { + return status; + } + + // 3. read local file and write to remote with broker + File localFile = new File(localPath); + long fileLength = localFile.length(); + byte[] readBuf = new byte[1024]; + try (BufferedInputStream in = new BufferedInputStream(new FileInputStream(localFile))) { + // save the last err msg + String lastErrMsg = null; + // save the current write offset of remote file + long writeOffset = 0; + // read local file, 1MB at a time + int bytesRead; + while ((bytesRead = in.read(readBuf)) != -1) { + ByteBuffer bb = ByteBuffer.wrap(readBuf, 0, bytesRead); + + // We only retry if we encounter a timeout thrift exception. + int tryTimes = 0; + while (tryTimes < 3) { + try { + TBrokerPWriteRequest req + = new TBrokerPWriteRequest(TBrokerVersion.VERSION_ONE, fd, writeOffset, bb); + TBrokerOperationStatus opst = client.pwrite(req); + if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) { + // pwrite return failure. + lastErrMsg = String.format("failed to write via broker %s. " + + "current write offset: %d, write length: %d," + + " file length: %d, file: %s, err code: %d, msg: %s", + BrokerUtil.printBroker(name, address), + writeOffset, bytesRead, fileLength, + remotePath, opst.getStatusCode().getValue(), opst.getMessage()); + LOG.warn(lastErrMsg); + status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); + } + break; + } catch (TTransportException e) { + if (e.getType() == TTransportException.TIMED_OUT) { + // we only retry when we encounter timeout exception. + lastErrMsg = String.format("failed to write via broker %s. " + + "current write offset: %d, write length: %d," + + " file length: %d, file: %s. timeout", + BrokerUtil.printBroker(name, address), + writeOffset, bytesRead, fileLength, + remotePath); + tryTimes++; + continue; + } + + lastErrMsg = String.format("failed to write via broker %s. " + + "current write offset: %d, write length: %d," + + " file length: %d, file: %s. encounter TTransportException: %s", + BrokerUtil.printBroker(name, address), + writeOffset, bytesRead, fileLength, + remotePath, e.getMessage()); + LOG.warn(lastErrMsg, e); + status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); + break; + } catch (TException e) { + lastErrMsg = String.format("failed to write via broker %s. " + + "current write offset: %d, write length: %d," + + " file length: %d, file: %s. encounter TException: %s", + BrokerUtil.printBroker(name, address), + writeOffset, bytesRead, fileLength, + remotePath, e.getMessage()); + LOG.warn(lastErrMsg, e); + status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); + break; + } + } + + if (status.ok() && tryTimes < 3) { + // write succeed, update current write offset + writeOffset += bytesRead; + } else { + status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); + break; + } + } // end of read local file loop + } catch (FileNotFoundException e1) { + return new Status(Status.ErrCode.COMMON_ERROR, "encounter file not found exception: " + e1.getMessage() + + ", broker: " + BrokerUtil.printBroker(name, address)); + } catch (IOException e1) { + return new Status(Status.ErrCode.COMMON_ERROR, "encounter io exception: " + e1.getMessage() + + ", broker: " + BrokerUtil.printBroker(name, address)); + } finally { + // close write + Status closeStatus = operations.closeWriter(client, address, fd); + if (!closeStatus.ok()) { + LOG.warn(closeStatus.getErrMsg()); + if (status.ok()) { + // we return close write error only if no other error has been encountered. + status = closeStatus; + } + ClientPool.brokerPool.invalidateObject(address, client); + } else { + ClientPool.brokerPool.returnObject(address, client); + } + } + + if (status.ok()) { + LOG.info("finished to upload {} to remote path {}. cost: {} ms", + localPath, remotePath, (System.currentTimeMillis() - start)); + } + return status; + } + + @Override + public Status rename(String origFilePath, String destFilePath) { + long start = System.currentTimeMillis(); + // 1. get a proper broker + Pair pair = getBroker(); + if (pair == null) { + return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client"); + } + TPaloBrokerService.Client client = pair.first; + TNetworkAddress address = pair.second; + + // 2. rename + boolean needReturn = true; + try { + TBrokerRenamePathRequest req = new TBrokerRenamePathRequest(TBrokerVersion.VERSION_ONE, + origFilePath, destFilePath, properties); + TBrokerOperationStatus ost = client.renamePath(req); + if (ost.getStatusCode() != TBrokerOperationStatusCode.OK) { + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to rename " + origFilePath + " to " + destFilePath + ", msg: " + ost.getMessage() + + ", broker: " + BrokerUtil.printBroker(name, address)); + } + } catch (TException e) { + needReturn = false; + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to rename " + origFilePath + " to " + destFilePath + ", msg: " + e.getMessage() + + ", broker: " + BrokerUtil.printBroker(name, address)); + } finally { + if (needReturn) { + ClientPool.brokerPool.returnObject(address, client); + } else { + ClientPool.brokerPool.invalidateObject(address, client); + } + } + + LOG.info("finished to rename {} to {}. cost: {} ms", + origFilePath, destFilePath, (System.currentTimeMillis() - start)); + return Status.OK; + } + + @Override + public Status delete(String remotePath) { + // get a proper broker + Pair pair = getBroker(); + if (pair == null) { + return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client"); + } + TPaloBrokerService.Client client = pair.first; + TNetworkAddress address = pair.second; + + // delete + boolean needReturn = true; + try { + TBrokerDeletePathRequest req = new TBrokerDeletePathRequest(TBrokerVersion.VERSION_ONE, + remotePath, properties); + TBrokerOperationStatus opst = client.deletePath(req); + if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) { + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to delete remote path: " + remotePath + ". msg: " + opst.getMessage() + + ", broker: " + BrokerUtil.printBroker(name, address)); + } + + LOG.info("finished to delete remote path {}.", remotePath); + } catch (TException e) { + needReturn = false; + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to delete remote path: " + remotePath + ". msg: " + e.getMessage() + + ", broker: " + BrokerUtil.printBroker(name, address)); + } finally { + if (needReturn) { + ClientPool.brokerPool.returnObject(address, client); + } else { + ClientPool.brokerPool.invalidateObject(address, client); + } + } + + return Status.OK; + } + + @Override + public RemoteIterator listLocatedStatus(String remotePath) throws UserException { + return null; + } + + @Override + public Status list(String remotePath, List result) { + return list(remotePath, result, true); + } + + // List files in remotePath + @Override + public Status list(String remotePath, List result, boolean fileNameOnly) { + // get a proper broker + Pair pair = getBroker(); + if (pair == null) { + return new Status(Status.ErrCode.COMMON_ERROR, "failed to get broker client"); + } + TPaloBrokerService.Client client = pair.first; + TNetworkAddress address = pair.second; + + // list + boolean needReturn = true; + try { + TBrokerListPathRequest req = new TBrokerListPathRequest(TBrokerVersion.VERSION_ONE, remotePath, + false /* not recursive */, properties); + req.setFileNameOnly(fileNameOnly); + TBrokerListResponse rep = client.listPath(req); + TBrokerOperationStatus opst = rep.getOpStatus(); + if (opst.getStatusCode() != TBrokerOperationStatusCode.OK) { + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to list remote path: " + remotePath + ". msg: " + opst.getMessage() + + ", broker: " + BrokerUtil.printBroker(name, address)); + } + + List fileStatus = rep.getFiles(); + for (TBrokerFileStatus tFile : fileStatus) { + RemoteFile file = new RemoteFile(tFile.path, !tFile.isDir, tFile.size, 0); + result.add(file); + } + LOG.info("finished to list remote path {}. get files: {}", remotePath, result); + } catch (TException e) { + needReturn = false; + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to list remote path: " + remotePath + ". msg: " + e.getMessage() + + ", broker: " + BrokerUtil.printBroker(name, address)); + } finally { + if (needReturn) { + ClientPool.brokerPool.returnObject(address, client); + } else { + ClientPool.brokerPool.invalidateObject(address, client); + } + } + + return Status.OK; + } + + @Override + public Status makeDir(String remotePath) { + return new Status(Status.ErrCode.COMMON_ERROR, "mkdir is not implemented."); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/DFSFileSystem.java new file mode 100644 index 0000000000..c4c082b3aa --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/DFSFileSystem.java @@ -0,0 +1,463 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.remote; + +import org.apache.doris.backup.RemoteFile; +import org.apache.doris.backup.Status; +import org.apache.doris.catalog.AuthType; +import org.apache.doris.catalog.HdfsResource; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.URI; +import org.apache.doris.fs.obj.HdfsStorage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.FileVisitOption; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DFSFileSystem extends RemoteFileSystem { + + private static final Logger LOG = LogManager.getLogger(HdfsStorage.class); + + private HDFSFileOperations operations = null; + + public DFSFileSystem(Map properties) { + this.properties = new HashMap<>(properties); + } + + private FileSystem getHdfsClient(String remotePath) + throws UserException { + if (dfsFileSystem != null) { + return dfsFileSystem; + } + String username = properties.get(HdfsResource.HADOOP_USER_NAME); + Configuration conf = new HdfsConfiguration(); + boolean isSecurityEnabled = false; + for (Map.Entry propEntry : properties.entrySet()) { + conf.set(propEntry.getKey(), propEntry.getValue()); + if (propEntry.getKey().equals(HdfsResource.HADOOP_SECURITY_AUTHENTICATION) + && propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) { + isSecurityEnabled = true; + } + } + try { + if (isSecurityEnabled) { + UserGroupInformation.setConfiguration(conf); + UserGroupInformation.loginUserFromKeytab( + properties.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL), + properties.get(HdfsResource.HADOOP_KERBEROS_KEYTAB)); + } + if (username == null) { + dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf); + } else { + dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf, username); + } + } catch (Exception e) { + LOG.error("errors while connect to " + remotePath, e); + throw new UserException("errors while connect to " + remotePath, e); + } + operations = new HDFSFileOperations(dfsFileSystem); + return dfsFileSystem; + } + + @Override + public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) { + LOG.debug("download from {} to {}, file size: {}.", remoteFilePath, localFilePath, fileSize); + final long start = System.currentTimeMillis(); + FSDataInputStream fsDataInputStream; + try { + fsDataInputStream = operations.openReader(remoteFilePath, 0); + } catch (Exception e) { + return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); + } + LOG.info("finished to open reader. download {} to {}.", remoteFilePath, localFilePath); + + // delete local file if exist + File localFile = new File(localFilePath); + if (localFile.exists()) { + try { + Files.walk(Paths.get(localFilePath), FileVisitOption.FOLLOW_LINKS).sorted(Comparator.reverseOrder()) + .map(java.nio.file.Path::toFile).forEach(File::delete); + } catch (IOException e) { + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to delete exist local file: " + localFilePath + ", msg: " + e.getMessage()); + } + } + // create local file + try { + if (!localFile.createNewFile()) { + return new Status(Status.ErrCode.COMMON_ERROR, "failed to create local file: " + localFilePath); + } + } catch (IOException e) { + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to create local file: " + localFilePath + ", msg: " + e.getMessage()); + } + + String lastErrMsg = null; + Status status = Status.OK; + try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(localFile))) { + final long bufSize = 1024 * 1024; // 1MB + long leftSize = fileSize; + long readOffset = 0; + while (leftSize > 0) { + long readLen = Math.min(leftSize, bufSize); + try { + ByteBuffer data = readStreamBuffer(fsDataInputStream, readOffset, readLen); + if (readLen != data.array().length) { + LOG.warn( + "the actual read length does not equal to " + + "the expected read length: {} vs. {}, file: {}", + data.array().length, readLen, remoteFilePath); + } + // write local file + out.write(data.array()); + readOffset += data.array().length; + leftSize -= data.array().length; + } catch (Exception e) { + lastErrMsg = String.format( + "failed to read. " + "current read offset: %d, read length: %d," + + " file size: %d, file: %s. msg: %s", + readOffset, readLen, fileSize, remoteFilePath, e.getMessage()); + LOG.warn(lastErrMsg); + status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); + break; + } + } + } catch (IOException e) { + return new Status(Status.ErrCode.COMMON_ERROR, "Got exception: " + e.getMessage()); + } finally { + Status closeStatus = operations.closeReader(fsDataInputStream); + if (!closeStatus.ok()) { + LOG.warn(closeStatus.getErrMsg()); + if (status.ok()) { + // we return close write error only if no other error has been encountered. + status = closeStatus; + } + } + } + + LOG.info("finished to download from {} to {} with size: {}. cost {} ms", remoteFilePath, localFilePath, + fileSize, (System.currentTimeMillis() - start)); + return status; + } + + /** + * read data from fsDataInputStream. + * + * @param fsDataInputStream input stream for read. + * @param readOffset read offset. + * @param length read length. + * @return ByteBuffer + * @throws IOException when read data error. + */ + private static ByteBuffer readStreamBuffer(FSDataInputStream fsDataInputStream, long readOffset, long length) + throws IOException { + synchronized (fsDataInputStream) { + long currentStreamOffset; + try { + currentStreamOffset = fsDataInputStream.getPos(); + } catch (IOException e) { + LOG.error("errors while get file pos from output stream", e); + throw new IOException("errors while get file pos from output stream", e); + } + if (currentStreamOffset != readOffset) { + // it's ok, when reading some format like parquet, it is not a sequential read + LOG.debug("invalid offset, current read offset is " + currentStreamOffset + + " is not equal to request offset " + readOffset + " seek to it"); + try { + fsDataInputStream.seek(readOffset); + } catch (IOException e) { + throw new IOException(String.format( + "current read offset %d is not equal to %d, and could not seek to it, msg: %s", + currentStreamOffset, readOffset, e.getMessage())); + } + } + // Avoid using the ByteBuffer based read for Hadoop because some + // FSDataInputStream + // implementations are not ByteBufferReadable, + // See https://issues.apache.org/jira/browse/HADOOP-14603 + byte[] buf; + if (length > HDFSFileOperations.READ_BUFFER_SIZE) { + buf = new byte[HDFSFileOperations.READ_BUFFER_SIZE]; + } else { + buf = new byte[(int) length]; + } + try { + int readLength = readBytesFully(fsDataInputStream, buf); + if (readLength < 0) { + throw new IOException("end of file reached"); + } + if (LOG.isDebugEnabled()) { + LOG.debug( + "read buffer from input stream, buffer size:" + buf.length + ", read length:" + readLength); + } + return ByteBuffer.wrap(buf, 0, readLength); + } catch (IOException e) { + LOG.error("errors while read data from stream", e); + throw new IOException("errors while read data from stream " + e.getMessage()); + } + } + } + + private static int readBytesFully(FSDataInputStream is, byte[] dest) throws IOException { + int readLength = 0; + while (readLength < dest.length) { + int availableReadLength = dest.length - readLength; + int n = is.read(dest, readLength, availableReadLength); + if (n <= 0) { + break; + } + readLength += n; + } + return readLength; + } + + @Override + public Status exists(String remotePath) { + try { + URI pathUri = URI.create(remotePath); + Path inputFilePath = new Path(pathUri.getPath()); + FileSystem fileSystem = getHdfsClient(remotePath); + boolean isPathExist = fileSystem.exists(inputFilePath); + if (!isPathExist) { + return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath); + } + return Status.OK; + } catch (Exception e) { + LOG.error("errors while check path exist " + remotePath, e); + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to check remote path exist: " + remotePath + ". msg: " + e.getMessage()); + } + } + + @Override + public Status directUpload(String content, String remoteFile) { + FSDataOutputStream fsDataOutputStream = null; + try { + fsDataOutputStream = operations.openWriter(remoteFile); + } catch (Exception e) { + return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); + } + LOG.info("finished to open writer. directly upload to remote path {}.", remoteFile); + + Status status = Status.OK; + try { + fsDataOutputStream.writeBytes(content); + } catch (IOException e) { + LOG.error("errors while write data to output stream", e); + status = new Status(Status.ErrCode.COMMON_ERROR, "write exception: " + e.getMessage()); + } finally { + Status closeStatus = operations.closeWriter(fsDataOutputStream); + if (!closeStatus.ok()) { + LOG.warn(closeStatus.getErrMsg()); + if (status.ok()) { + status = closeStatus; + } + } + } + return status; + } + + @Override + public Status upload(String localPath, String remotePath) { + long start = System.currentTimeMillis(); + LOG.debug("local path {}, remote path {}", localPath, remotePath); + FSDataOutputStream fsDataOutputStream = null; + try { + fsDataOutputStream = operations.openWriter(remotePath); + } catch (Exception e) { + return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); + } + + LOG.info("finished to open writer. directly upload to remote path {}.", remotePath); + // read local file and write remote + File localFile = new File(localPath); + long fileLength = localFile.length(); + byte[] readBuf = new byte[1024]; + Status status = new Status(Status.ErrCode.OK, ""); + try (BufferedInputStream in = new BufferedInputStream(new FileInputStream(localFile))) { + // save the last err msg + String lastErrMsg = null; + // save the current write offset of remote file + long writeOffset = 0; + // read local file, 1MB at a time + int bytesRead; + while ((bytesRead = in.read(readBuf)) != -1) { + try { + fsDataOutputStream.write(readBuf, 0, bytesRead); + } catch (IOException e) { + LOG.error("errors while write data to output stream", e); + lastErrMsg = String.format( + "failed to write hdfs. current write offset: %d, write length: %d, " + + "file length: %d, file: %s, msg: errors while write data to output stream", + writeOffset, bytesRead, fileLength, remotePath); + status = new Status(Status.ErrCode.COMMON_ERROR, lastErrMsg); + break; + } + + // write succeed, update current write offset + writeOffset += bytesRead; + } // end of read local file loop + } catch (FileNotFoundException e1) { + return new Status(Status.ErrCode.COMMON_ERROR, "encounter file not found exception: " + e1.getMessage()); + } catch (IOException e1) { + return new Status(Status.ErrCode.COMMON_ERROR, "encounter io exception: " + e1.getMessage()); + } finally { + Status closeStatus = operations.closeWriter(fsDataOutputStream); + if (!closeStatus.ok()) { + LOG.warn(closeStatus.getErrMsg()); + if (status.ok()) { + // we return close write error only if no other error has been encountered. + status = closeStatus; + } + } + } + + if (status.ok()) { + LOG.info("finished to upload {} to remote path {}. cost: {} ms", localPath, remotePath, + (System.currentTimeMillis() - start)); + } + return status; + } + + @Override + public Status rename(String srcPath, String destPath) { + long start = System.currentTimeMillis(); + try { + URI srcPathUri = URI.create(srcPath); + URI destPathUri = URI.create(destPath); + if (!srcPathUri.getAuthority().trim().equals(destPathUri.getAuthority().trim())) { + return new Status(Status.ErrCode.COMMON_ERROR, "only allow rename in same file system"); + } + FileSystem fileSystem = getHdfsClient(destPath); + Path srcfilePath = new Path(srcPathUri.getPath()); + Path destfilePath = new Path(destPathUri.getPath()); + boolean isRenameSuccess = fileSystem.rename(srcfilePath, destfilePath); + if (!isRenameSuccess) { + return new Status(Status.ErrCode.COMMON_ERROR, "failed to rename " + srcPath + " to " + destPath); + } + } catch (UserException e) { + return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); + } catch (IOException e) { + LOG.error("errors while rename path from " + srcPath + " to " + destPath); + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to rename remote " + srcPath + " to " + destPath + ", msg: " + e.getMessage()); + } + LOG.info("finished to rename {} to {}. cost: {} ms", srcPath, destPath, (System.currentTimeMillis() - start)); + return Status.OK; + } + + @Override + public Status delete(String remotePath) { + try { + URI pathUri = URI.create(remotePath); + Path inputFilePath = new Path(pathUri.getPath()); + FileSystem fileSystem = getHdfsClient(remotePath); + fileSystem.delete(inputFilePath, true); + } catch (UserException e) { + return new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); + } catch (IOException e) { + LOG.error("errors while delete path " + remotePath); + return new Status(Status.ErrCode.COMMON_ERROR, + "failed to delete remote path: " + remotePath + ", msg: " + e.getMessage()); + } + LOG.info("finished to delete remote path {}.", remotePath); + return Status.OK; + } + + @Override + public RemoteIterator listLocatedStatus(String remotePath) throws UserException { + FileSystem fileSystem = getHdfsClient(remotePath); + try { + return fileSystem.listLocatedStatus(new Path(remotePath)); + } catch (IOException e) { + throw new UserException("Failed to list located status for path: " + remotePath, e); + } + } + + @Override + public Status list(String remotePath, List result) { + return list(remotePath, result, true); + } + + /** + * get files in remotePath of HDFS. + * + * @param remotePath hdfs://namenode:port/path. + * @param result files in remotePath. + * @param fileNameOnly means get file only in remotePath if true. + * @return Status.OK if success. + */ + @Override + public Status list(String remotePath, List result, boolean fileNameOnly) { + try { + URI pathUri = URI.create(remotePath); + FileSystem fileSystem = getHdfsClient(remotePath); + Path pathPattern = new Path(pathUri.getPath()); + FileStatus[] files = fileSystem.globStatus(pathPattern); + if (files == null) { + LOG.info("no files in path " + remotePath); + return Status.OK; + } + for (FileStatus fileStatus : files) { + RemoteFile remoteFile = new RemoteFile( + fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(), + !fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(), + fileStatus.getBlockSize()); + result.add(remoteFile); + } + } catch (FileNotFoundException e) { + LOG.info("file not found: " + e.getMessage()); + return new Status(Status.ErrCode.NOT_FOUND, "file not found: " + e.getMessage()); + } catch (Exception e) { + LOG.error("errors while get file status ", e); + return new Status(Status.ErrCode.COMMON_ERROR, "errors while get file status " + e.getMessage()); + } + LOG.info("finish list path {}", remotePath); + return Status.OK; + } + + @Override + public Status makeDir(String remotePath) { + return new Status(Status.ErrCode.COMMON_ERROR, "mkdir is not implemented."); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/HDFSFileOperations.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/HDFSFileOperations.java new file mode 100644 index 0000000000..9e03f8f5a0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/HDFSFileOperations.java @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.remote; + +import org.apache.doris.backup.Status; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.URI; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; + +public class HDFSFileOperations { + private static final Logger LOG = LogManager.getLogger(HDFSFileOperations.class); + public static final int READ_BUFFER_SIZE = 128 << 10; // 128k + public static final int WRITE_BUFFER_SIZE = 128 << 10; // 128k + + private final FileSystem hdfsClient; + + public HDFSFileOperations(FileSystem hdfsClient) { + this.hdfsClient = hdfsClient; + } + + + /** + * open remotePath for read. + * + * @param remotePath hdfs://namenode:port/path. + * @param startOffset the offset to read. + * @return FSDataInputStream if success. + * @throws UserException when get filesystem failed. + * @throws IOException when open file error. + */ + public FSDataInputStream openReader(String remotePath, long startOffset) throws UserException, IOException { + URI pathUri = URI.create(remotePath); + Path inputFilePath = new Path(pathUri.getPath()); + try { + FSDataInputStream fsDataInputStream = hdfsClient.open(inputFilePath, READ_BUFFER_SIZE); + fsDataInputStream.seek(startOffset); + return fsDataInputStream; + } catch (IOException e) { + LOG.error("errors while open path", e); + throw new IOException(e.getMessage()); + } + } + + /** + * close for read. + * + * @param fsDataInputStream the input stream. + * @return Status.OK if success. + */ + public Status closeReader(FSDataInputStream fsDataInputStream) { + synchronized (fsDataInputStream) { + try { + fsDataInputStream.close(); + } catch (IOException e) { + LOG.error("errors while close file input stream", e); + return new Status(Status.ErrCode.COMMON_ERROR, + "errors while close file input stream, msg: " + e.getMessage()); + } + } + return Status.OK; + } + + + /** + * open remotePath for write. + * + * @param remotePath hdfs://namenode:port/path. + * @return FSDataOutputStream + * @throws UserException when get filesystem failed. + * @throws IOException when open path error. + */ + public FSDataOutputStream openWriter(String remotePath) throws UserException, IOException { + URI pathUri = URI.create(remotePath); + Path inputFilePath = new Path(pathUri.getPath()); + try { + return hdfsClient.create(inputFilePath, true, WRITE_BUFFER_SIZE); + } catch (IOException e) { + LOG.error("errors while open path", e); + throw new IOException(e.getMessage()); + } + } + + /** + * close for write. + * + * @param fsDataOutputStream output stream. + * @return Status.OK if success. + */ + + public Status closeWriter(FSDataOutputStream fsDataOutputStream) { + synchronized (fsDataOutputStream) { + try { + fsDataOutputStream.flush(); + fsDataOutputStream.close(); + LOG.info("finished to close writer"); + } catch (IOException e) { + LOG.error("errors while close file output stream", e); + return new Status(Status.ErrCode.COMMON_ERROR, "failed to close writer, msg:" + e.getMessage()); + } + } + return Status.OK; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java new file mode 100644 index 0000000000..d1f99db596 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/ObjFileSystem.java @@ -0,0 +1,21 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.remote; + +public abstract class ObjFileSystem extends RemoteFileSystem { +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java new file mode 100644 index 0000000000..9bb089fad0 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/RemoteFileSystem.java @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.remote; + +import org.apache.doris.fs.FileSystem; + +import java.util.Map; + +public abstract class RemoteFileSystem implements FileSystem { + protected org.apache.hadoop.fs.FileSystem dfsFileSystem = null; + protected Map properties; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java new file mode 100644 index 0000000000..72f6b487b8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/S3FileSystem.java @@ -0,0 +1,416 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.fs.remote; + +import org.apache.doris.backup.RemoteFile; +import org.apache.doris.backup.Status; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.S3URI; +import org.apache.doris.datasource.property.PropertyConverter; +import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.fs.obj.S3Storage; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.http.HttpStatus; +import org.apache.http.client.utils.URIBuilder; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.auth.signer.AwsS3V4Signer; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.nio.file.FileVisitOption; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +public class S3FileSystem extends ObjFileSystem { + + private static final Logger LOG = LogManager.getLogger(S3Storage.class); + private S3Client client; + // false: the s3 client will automatically convert endpoint to virtual-hosted style, eg: + // endpoint: http://s3.us-east-2.amazonaws.com + // bucket/path: my_bucket/file.txt + // auto convert: http://my_bucket.s3.us-east-2.amazonaws.com/file.txt + // true: the s3 client will NOT automatically convert endpoint to virtual-hosted style, we need to do some tricks: + // endpoint: http://cos.ap-beijing.myqcloud.com + // bucket/path: my_bucket/file.txt + // convert manually: See S3URI() + private boolean forceHostedStyle = false; + + public S3FileSystem(Map properties) { + this.properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); + setProperties(properties); + } + + private void setProperties(Map properties) { + this.properties.putAll(properties); + try { + S3Properties.requiredS3Properties(this.properties); + } catch (DdlException e) { + throw new IllegalArgumentException(e); + } + // Virtual hosted-style is recommended in the s3 protocol. + // The path-style has been abandoned, but for some unexplainable reasons, + // the s3 client will determine whether the endpiont starts with `s3` + // when generating a virtual hosted-sytle request. + // If not, it will not be converted ( https://github.com/aws/aws-sdk-java-v2/pull/763), + // but the endpoints of many cloud service providers for object storage do not start with s3, + // so they cannot be converted to virtual hosted-sytle. + // Some of them, such as aliyun's oss, only support virtual hosted-style, + // and some of them(ceph) may only support + // path-style, so we need to do some additional conversion. + // + // use_path_style | !use_path_style + // S3 forceHostedStyle=false | forceHostedStyle=false + // !S3 forceHostedStyle=false | forceHostedStyle=true + // + // That is, for S3 endpoint, ignore the `use_path_style` property, and the s3 client will automatically use + // virtual hosted-sytle. + // And for other endpoint, if `use_path_style` is true, use path style. Otherwise, use virtual hosted-sytle. + if (!this.properties.get(S3Properties.ENDPOINT).toLowerCase().startsWith("s3")) { + forceHostedStyle = !this.properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false") + .equalsIgnoreCase("true"); + } else { + forceHostedStyle = false; + } + } + + private FileSystem getFileSystem(String remotePath) throws UserException { + if (dfsFileSystem == null) { + Configuration conf = new Configuration(); + System.setProperty("com.amazonaws.services.s3.enableV4", "true"); + PropertyConverter.convertToHadoopFSProperties(properties).forEach(conf::set); + try { + dfsFileSystem = FileSystem.get(new URI(remotePath), conf); + } catch (Exception e) { + throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e); + } + } + return dfsFileSystem; + } + + private S3Client getClient(String bucket) throws UserException { + if (client == null) { + URI tmpEndpoint = URI.create(properties.get(S3Properties.ENDPOINT)); + StaticCredentialsProvider scp; + if (!properties.containsKey(S3Properties.SESSION_TOKEN)) { + AwsBasicCredentials awsBasic = AwsBasicCredentials.create( + properties.get(S3Properties.ACCESS_KEY), + properties.get(S3Properties.SECRET_KEY)); + scp = StaticCredentialsProvider.create(awsBasic); + } else { + AwsSessionCredentials awsSession = AwsSessionCredentials.create( + properties.get(S3Properties.ACCESS_KEY), + properties.get(S3Properties.SECRET_KEY), + properties.get(S3Properties.SESSION_TOKEN)); + scp = StaticCredentialsProvider.create(awsSession); + } + EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy + .builder() + .baseDelay(Duration.ofSeconds(1)) + .maxBackoffTime(Duration.ofMinutes(1)) + .build(); + // retry 3 time with Equal backoff + RetryPolicy retryPolicy = RetryPolicy + .builder() + .numRetries(3) + .backoffStrategy(backoffStrategy) + .build(); + ClientOverrideConfiguration clientConf = ClientOverrideConfiguration + .builder() + // set retry policy + .retryPolicy(retryPolicy) + // using AwsS3V4Signer + .putAdvancedOption(SdkAdvancedClientOption.SIGNER, AwsS3V4Signer.create()) + .build(); + URI endpoint = StringUtils.isEmpty(bucket) ? tmpEndpoint : + URI.create(new URIBuilder(tmpEndpoint).setHost(bucket + "." + tmpEndpoint.getHost()).toString()); + client = S3Client.builder() + .endpointOverride(endpoint) + .credentialsProvider(scp) + .region(Region.of(properties.get(S3Properties.REGION))) + .overrideConfiguration(clientConf) + // disable chunkedEncoding because of bos not supported + // use virtual hosted-style access + .serviceConfiguration(S3Configuration.builder() + .chunkedEncodingEnabled(false) + .pathStyleAccessEnabled(false) + .build()) + .build(); + } + return client; + } + + @Override + public Status exists(String remotePath) { + try { + S3URI uri = S3URI.create(remotePath, forceHostedStyle); + getClient(uri.getVirtualBucket()) + .headObject(HeadObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build()); + return Status.OK; + } catch (S3Exception e) { + if (e.statusCode() == HttpStatus.SC_NOT_FOUND) { + return new Status(Status.ErrCode.NOT_FOUND, "remote path does not exist: " + remotePath); + } else { + LOG.warn("headObject failed:", e); + return new Status(Status.ErrCode.COMMON_ERROR, "headObject failed: " + e.getMessage()); + } + } catch (UserException ue) { + LOG.warn("connect to s3 failed: ", ue); + return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); + } + } + + @Override + public Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize) { + long start = System.currentTimeMillis(); + // Write the data to a local file + File localFile = new File(localFilePath); + if (localFile.exists()) { + try { + Files.walk(Paths.get(localFilePath), FileVisitOption.FOLLOW_LINKS) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } catch (IOException e) { + return new Status( + Status.ErrCode.COMMON_ERROR, "failed to delete exist local file: " + localFilePath); + } + } + try { + S3URI uri = S3URI.create(remoteFilePath, forceHostedStyle); + GetObjectResponse response = getClient(uri.getVirtualBucket()).getObject( + GetObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), localFile.toPath()); + if (localFile.length() == fileSize) { + LOG.info( + "finished to download from {} to {} with size: {}. cost {} ms", + remoteFilePath, + localFilePath, + fileSize, + (System.currentTimeMillis() - start)); + return Status.OK; + } else { + return new Status(Status.ErrCode.COMMON_ERROR, response.toString()); + } + } catch (S3Exception s3Exception) { + return new Status( + Status.ErrCode.COMMON_ERROR, + "get file from s3 error: " + s3Exception.awsErrorDetails().errorMessage()); + } catch (UserException ue) { + LOG.warn("connect to s3 failed: ", ue); + return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); + } catch (Exception e) { + return new Status(Status.ErrCode.COMMON_ERROR, e.toString()); + } + } + + @Override + public Status directUpload(String content, String remoteFile) { + try { + S3URI uri = S3URI.create(remoteFile, forceHostedStyle); + PutObjectResponse response = + getClient(uri.getVirtualBucket()) + .putObject( + PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), + RequestBody.fromBytes(content.getBytes())); + LOG.info("upload content success: " + response.eTag()); + return Status.OK; + } catch (S3Exception e) { + LOG.warn("write content failed:", e); + return new Status(Status.ErrCode.COMMON_ERROR, "write content failed: " + e.getMessage()); + } catch (UserException ue) { + LOG.warn("connect to s3 failed: ", ue); + return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); + } catch (Exception e) { + LOG.warn("connect to s3 failed: ", e); + return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + e.getMessage()); + } + } + + public Status copy(String origFilePath, String destFilePath) { + try { + S3URI origUri = S3URI.create(origFilePath); + S3URI descUri = S3URI.create(destFilePath, forceHostedStyle); + getClient(descUri.getVirtualBucket()) + .copyObject( + CopyObjectRequest.builder() + .copySource(origUri.getBucket() + "/" + origUri.getKey()) + .destinationBucket(descUri.getBucket()) + .destinationKey(descUri.getKey()) + .build()); + return Status.OK; + } catch (S3Exception e) { + LOG.error("copy file failed: ", e); + return new Status(Status.ErrCode.COMMON_ERROR, "copy file failed: " + e.getMessage()); + } catch (UserException ue) { + LOG.error("copy to s3 failed: ", ue); + return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); + } + } + + @Override + public Status upload(String localPath, String remotePath) { + try { + S3URI uri = S3URI.create(remotePath, forceHostedStyle); + PutObjectResponse response = + getClient(uri.getVirtualBucket()) + .putObject( + PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), + RequestBody.fromFile(new File(localPath))); + LOG.info("upload file " + localPath + " success: " + response.eTag()); + return Status.OK; + } catch (S3Exception e) { + LOG.error("write file failed:", e); + return new Status(Status.ErrCode.COMMON_ERROR, "write file failed: " + e.getMessage()); + } catch (UserException ue) { + LOG.error("connect to s3 failed: ", ue); + return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); + } + } + + @Override + public Status rename(String origFilePath, String destFilePath) { + Status status = copy(origFilePath, destFilePath); + if (status.ok()) { + return delete(origFilePath); + } else { + return status; + } + } + + @Override + public Status delete(String remotePath) { + try { + S3URI uri = S3URI.create(remotePath, forceHostedStyle); + DeleteObjectResponse response = + getClient(uri.getVirtualBucket()) + .deleteObject( + DeleteObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build()); + LOG.info("delete file " + remotePath + " success: " + response.toString()); + return Status.OK; + } catch (S3Exception e) { + LOG.warn("delete file failed: ", e); + if (e.statusCode() == HttpStatus.SC_NOT_FOUND) { + return Status.OK; + } + return new Status(Status.ErrCode.COMMON_ERROR, "delete file failed: " + e.getMessage()); + } catch (UserException ue) { + LOG.warn("connect to s3 failed: ", ue); + return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); + } + } + + @Override + public RemoteIterator listLocatedStatus(String remotePath) throws UserException { + FileSystem fileSystem = getFileSystem(remotePath); + try { + return fileSystem.listLocatedStatus(new org.apache.hadoop.fs.Path(remotePath)); + } catch (IOException e) { + throw new UserException("Failed to list located status for path: " + remotePath, e); + } + } + + @Override + public Status list(String remotePath, List result) { + return list(remotePath, result, true); + } + + // broker file pattern glob is too complex, so we use hadoop directly + @Override + public Status list(String remotePath, List result, boolean fileNameOnly) { + try { + FileSystem s3AFileSystem = getFileSystem(remotePath); + org.apache.hadoop.fs.Path pathPattern = new org.apache.hadoop.fs.Path(remotePath); + FileStatus[] files = s3AFileSystem.globStatus(pathPattern); + if (files == null) { + return Status.OK; + } + for (FileStatus fileStatus : files) { + RemoteFile remoteFile = new RemoteFile( + fileNameOnly ? fileStatus.getPath().getName() : fileStatus.getPath().toString(), + !fileStatus.isDirectory(), fileStatus.isDirectory() ? -1 : fileStatus.getLen(), + fileStatus.getBlockSize()); + result.add(remoteFile); + } + } catch (FileNotFoundException e) { + LOG.info("file not found: " + e.getMessage()); + return new Status(Status.ErrCode.NOT_FOUND, "file not found: " + e.getMessage()); + } catch (Exception e) { + LOG.error("errors while get file status ", e); + return new Status(Status.ErrCode.COMMON_ERROR, "errors while get file status " + e.getMessage()); + } + return Status.OK; + } + + @Override + public Status makeDir(String remotePath) { + if (!remotePath.endsWith("/")) { + remotePath += "/"; + } + try { + S3URI uri = S3URI.create(remotePath, forceHostedStyle); + PutObjectResponse response = + getClient(uri.getVirtualBucket()) + .putObject( + PutObjectRequest.builder().bucket(uri.getBucket()).key(uri.getKey()).build(), + RequestBody.empty()); + LOG.info("makeDir success: " + response.eTag()); + return Status.OK; + } catch (S3Exception e) { + LOG.error("makeDir failed:", e); + return new Status(Status.ErrCode.COMMON_ERROR, "makeDir failed: " + e.getMessage()); + } catch (UserException ue) { + LOG.error("connect to s3 failed: ", ue); + return new Status(Status.ErrCode.COMMON_ERROR, "connect to s3 failed: " + ue.getMessage()); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java index 193566dee8..4d2b1ebbfa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/S3TableValuedFunction.java @@ -20,7 +20,6 @@ package org.apache.doris.tablefunction; import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.StorageBackend.StorageType; -import org.apache.doris.backup.BlobStorage; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; @@ -28,6 +27,7 @@ import org.apache.doris.common.util.S3URI; import org.apache.doris.datasource.credentials.CloudCredentialWithEndpoint; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.fs.obj.BlobStorage; import org.apache.doris.thrift.TFileType; import com.google.common.annotations.VisibleForTesting; diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java index d4c445793d..d3970060c8 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java @@ -31,6 +31,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.common.util.UnitTestUtil; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.fs.obj.BlobStorage; import org.apache.doris.persist.EditLog; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTask; diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/BrokerStorageTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/BrokerStorageTest.java index 8526bf59cc..419e3dae2e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/BrokerStorageTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BrokerStorageTest.java @@ -21,6 +21,7 @@ import org.apache.doris.common.ClientPool; import org.apache.doris.common.GenericPool; import org.apache.doris.common.Pair; import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.fs.obj.BrokerStorage; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloBrokerService; diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RepositoryTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RepositoryTest.java index 522ef2df47..b23faa1524 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RepositoryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RepositoryTest.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.StorageBackend; import org.apache.doris.catalog.BrokerMgr; import org.apache.doris.catalog.FsBroker; import org.apache.doris.common.AnalysisException; +import org.apache.doris.fs.obj.BlobStorage; import org.apache.doris.service.FrontendOptions; import com.google.common.collect.Lists; diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java index 0667076da0..32cc9fab5e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java @@ -37,6 +37,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.MarkedCountDownLatch; import org.apache.doris.common.jmockit.Deencapsulation; import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.fs.obj.BlobStorage; import org.apache.doris.persist.EditLog; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TStorageMedium; diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java index 6537ae4090..208aff157a 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/backup/S3StorageTest.java @@ -18,6 +18,7 @@ package org.apache.doris.backup; import org.apache.doris.datasource.property.PropertyConverter; +import org.apache.doris.fs.obj.S3Storage; import org.apache.commons.codec.digest.DigestUtils; import org.junit.Assert;