diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java index 077729c81e..62fb2c9977 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java @@ -236,14 +236,16 @@ public class ExportStmt extends StatementBase { URI uri = URI.create(path); String schema = uri.getScheme(); if (type == StorageBackend.StorageType.BROKER) { - if (schema == null || (!schema.equalsIgnoreCase("hdfs") + if (schema == null || (!schema.equalsIgnoreCase("bos") + && !schema.equalsIgnoreCase("afs") + && !schema.equalsIgnoreCase("hdfs") && !schema.equalsIgnoreCase("ofs") && !schema.equalsIgnoreCase("obs") && !schema.equalsIgnoreCase("oss") && !schema.equalsIgnoreCase("s3a") && !schema.equalsIgnoreCase("cosn"))) { - throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'ofs://', 'obs://'," - + "'oss://', 's3a://' or 'cosn://' path."); + throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'afs://' , 'bos://'," + + " 'ofs://', 'obs://', 'oss://', 's3a://' or 'cosn://' path."); } } else if (type == StorageBackend.StorageType.S3) { if (schema == null || !schema.equalsIgnoreCase("s3")) { diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemIdentity.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemIdentity.java index 9446638a52..2885ba1a58 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemIdentity.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemIdentity.java @@ -21,10 +21,19 @@ public class FileSystemIdentity { private final String hostName; private final String ugiInfo; + + private final String extraInfo; public FileSystemIdentity(String hostName, String ugiInfo) { this.hostName = hostName; this.ugiInfo = ugiInfo; + this.extraInfo = null; + } + + public FileSystemIdentity(String hostName, String ugiInfo, String extraInfo) { + this.hostName = hostName; + this.ugiInfo = ugiInfo; + this.extraInfo = extraInfo; } @Override @@ -34,6 +43,7 @@ public class FileSystemIdentity { result = prime * result + ((hostName == null) ? 0 : hostName.hashCode()); result = prime * result + ((ugiInfo == null) ? 0 : ugiInfo.hashCode()); + result = prime * result + ((extraInfo == null) ? 0 : extraInfo.hashCode()); return result; } @@ -63,6 +73,13 @@ public class FileSystemIdentity { } else if (!ugiInfo.equals(other.ugiInfo)) { return false; } + if (extraInfo == null) { + if (other.extraInfo != null) { + return false; + } + } else if (!extraInfo.equals(other.extraInfo)) { + return false; + } return true; } } diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java index 42ff97123b..27b548af8c 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java @@ -42,6 +42,7 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.net.InetAddress; +import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.nio.ByteBuffer; @@ -71,6 +72,8 @@ public class FileSystemManager { private static final String OBS_SCHEME = "obs"; private static final String OSS_SCHEME = "oss"; private static final String COS_SCHEME = "cosn"; + private static final String BOS_SCHEME = "bos"; + private static final String AFS_SCHEME = "afs"; private static final String USER_NAME_KEY = "username"; private static final String PASSWORD_KEY = "password"; @@ -132,6 +135,23 @@ public class FileSystemManager { private static final String FS_COS_IMPL = "fs.cosn.impl"; private static final String FS_COS_IMPL_DISABLE_CACHE = "fs.cosn.impl.disable.cache"; + // arguments for bos + private static final String FS_BOS_ACCESS_KEY = "fs.bos.access.key"; + private static final String FS_BOS_SECRET_KEY = "fs.bos.secret.access.key"; + private static final String FS_BOS_ENDPOINT = "fs.bos.endpoint"; + private static final String FS_BOS_IMPL = "fs.bos.impl"; + private static final String FS_BOS_MULTIPART_UPLOADS_BLOCK_SIZE = "fs.bos.multipart.uploads.block.size"; + + + // arguments for afs + private static final String HADOOP_JOB_GROUP_NAME = "hadoop.job.group.name"; + private static final String HADOOP_JOB_UGI = "hadoop.job.ugi"; + private static final String FS_DEFAULT_NAME = "fs.default.name"; + private static final String FS_AFS_IMPL = "fs.afs.impl"; + private static final String DFS_AGENT_PORT = "dfs.agent.port"; + private static final String DFS_CLIENT_AUTH_METHOD = "dfs.client.auth.method"; + private static final String DFS_RPC_TIMEOUT = "dfs.rpc.timeout"; + private ScheduledExecutorService handleManagementPool = Executors.newScheduledThreadPool(2); private int readBufferSize = 128 << 10; // 128k @@ -197,6 +217,8 @@ public class FileSystemManager { brokerFileSystem = getOSSFileSystem(path, properties); } else if (scheme.equals(COS_SCHEME)) { brokerFileSystem = getCOSFileSystem(path, properties); + } else if (scheme.equals(BOS_SCHEME)) { + brokerFileSystem = getBOSFileSystem(path, properties); } else { throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH, "invalid path. scheme is not supported"); @@ -548,8 +570,8 @@ public class FileSystemManager { String endpoint = properties.getOrDefault(FS_OSS_ENDPOINT, ""); String disableCache = properties.getOrDefault(FS_OSS_IMPL_DISABLE_CACHE, "true"); String host = OSS_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost(); - String obsUgi = accessKey + "," + secretKey; - FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, obsUgi); + String ossUgi = accessKey + "," + secretKey; + FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, ossUgi); cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity)); BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties); fileSystem.getLock().lock(); @@ -608,11 +630,11 @@ public class FileSystemManager { } else if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) { kerberosContent = properties.get(KERBEROS_KEYTAB_CONTENT); } else { - throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT, + throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT, "keytab is required for kerberos authentication"); } if (!properties.containsKey(KERBEROS_PRINCIPAL)) { - throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT, + throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT, "principal is required for kerberos authentication"); } else { kerberosContent = kerberosContent + properties.get(KERBEROS_PRINCIPAL); @@ -651,8 +673,8 @@ public class FileSystemManager { // pass kerberos keytab content use base64 encoding // so decode it and write it to tmp path under /tmp // because ugi api only accept a local path as argument - String keytab_content = properties.get(KERBEROS_KEYTAB_CONTENT); - byte[] base64decodedBytes = Base64.getDecoder().decode(keytab_content); + String keytabContent = properties.get(KERBEROS_KEYTAB_CONTENT); + byte[] base64decodedBytes = Base64.getDecoder().decode(keytabContent); long currentTime = System.currentTimeMillis(); Random random = new Random(currentTime); int randNumber = random.nextInt(10000); @@ -737,6 +759,91 @@ public class FileSystemManager { } } + /** + * visible for test + *
+ * file system handle is cached, the identity is endpoint + bucket + accessKey_secretKey
+ *
+ * @param path
+ * @param properties
+ * @return
+ * @throws URISyntaxException
+ * @throws Exception
+ */
+ public BrokerFileSystem getBOSFileSystem(String path, Map