apache_hdfs_broker
diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 5b1ef33491..42ff97123b 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -70,6 +70,7 @@ public class FileSystemManager {
private static final String CHDFS_SCHEME = "ofs";
private static final String OBS_SCHEME = "obs";
private static final String OSS_SCHEME = "oss";
+ private static final String COS_SCHEME = "cosn";
private static final String USER_NAME_KEY = "username";
private static final String PASSWORD_KEY = "password";
@@ -124,6 +125,13 @@ public class FileSystemManager {
private static final String FS_OSS_IMPL_DISABLE_CACHE = "fs.oss.impl.disable.cache";
private static final String FS_OSS_IMPL = "fs.oss.impl";
+ // arguments for cos
+ private static final String FS_COS_ACCESS_KEY = "fs.cosn.userinfo.secretId";
+ private static final String FS_COS_SECRET_KEY = "fs.cosn.userinfo.secretKey";
+ private static final String FS_COS_ENDPOINT = "fs.cosn.bucket.endpoint_suffix";
+ private static final String FS_COS_IMPL = "fs.cosn.impl";
+ private static final String FS_COS_IMPL_DISABLE_CACHE = "fs.cosn.impl.disable.cache";
+
private ScheduledExecutorService handleManagementPool = Executors.newScheduledThreadPool(2);
private int readBufferSize = 128 << 10; // 128k
@@ -187,6 +195,8 @@ public class FileSystemManager {
brokerFileSystem = getOBSFileSystem(path, properties);
} else if (scheme.equals(OSS_SCHEME)) {
brokerFileSystem = getOSSFileSystem(path, properties);
+ } else if (scheme.equals(COS_SCHEME)) {
+ brokerFileSystem = getCOSFileSystem(path, properties);
} else {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
"invalid path. scheme is not supported");
@@ -681,6 +691,53 @@ public class FileSystemManager {
}
}
+ /**
+ * visible for test
+ *
+ * file system handle is cached, the identity is endpoint + bucket + accessKey_secretKey
+ *
+ * @param path
+ * @param properties
+ * @return
+ * @throws URISyntaxException
+ * @throws Exception
+ */
+ public BrokerFileSystem getCOSFileSystem(String path, Map properties) {
+ WildcardURI pathUri = new WildcardURI(path);
+ String accessKey = properties.getOrDefault(FS_COS_ACCESS_KEY, "");
+ String secretKey = properties.getOrDefault(FS_COS_SECRET_KEY, "");
+ String endpoint = properties.getOrDefault(FS_COS_ENDPOINT, "");
+ String disableCache = properties.getOrDefault(FS_COS_IMPL_DISABLE_CACHE, "true");
+ // endpoint is the server host, pathUri.getUri().getHost() is the bucket
+ // we should use these two params as the host identity, because FileSystem will cache both.
+ String host = COS_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost();
+ String cosUgi = accessKey + "," + secretKey;
+ FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, cosUgi);
+ BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties);
+ fileSystem.getLock().lock();
+ try {
+ if (fileSystem.getDFSFileSystem() == null) {
+ logger.info("could not find file system for path " + path + " create a new one");
+ // create a new filesystem
+ Configuration conf = new Configuration();
+ conf.set(FS_COS_ACCESS_KEY, accessKey);
+ conf.set(FS_COS_SECRET_KEY, secretKey);
+ conf.set(FS_COS_ENDPOINT, endpoint);
+ conf.set(FS_COS_IMPL, "org.apache.hadoop.fs.CosFileSystem");
+ conf.set(FS_COS_IMPL_DISABLE_CACHE, disableCache);
+ FileSystem cosFileSystem = FileSystem.get(pathUri.getUri(), conf);
+ fileSystem.setFileSystem(cosFileSystem);
+ }
+ return fileSystem;
+ } catch (Exception e) {
+ logger.error("errors while connect to " + path, e);
+ throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+ } finally {
+ fileSystem.getLock().unlock();
+ }
+ }
+
+
public List listPath(String path, boolean fileNameOnly, Map properties) {
List resultFileStatus = null;
WildcardURI pathUri = new WildcardURI(path);