[Feature](broker)Support GCS (#20904)

This commit is contained in:
Calvin Kirs
2023-08-07 19:37:18 +08:00
committed by GitHub
parent 4b20f62f79
commit d1a2473944
5 changed files with 89 additions and 3 deletions

View File

@ -35,6 +35,7 @@ Broker is an optional process in the Doris cluster. It is mainly used to support
- Huawei Cloud OBS (since 1.2.0)
- Amazon S3
- JuiceFS (since 2.0.0)
- GCS (since 2.0.0)
Broker provides services through an RPC service port. It is a stateless JVM process that is responsible for encapsulating some POSIX-like file operations for read and write operations on remote storage, such as open, pred, pwrite, and so on.
In addition, the Broker does not record any other information, so the connection information, file information, permission information, and so on stored remotely need to be passed to the Broker process in the RPC call through parameters in order for the Broker to read and write files correctly .
@ -247,4 +248,14 @@ Same as Apache HDFS
"juicefs.meta" = "xxx",
"juicefs.access-log" = "xxx"
)
```
#### GCS
When accessing GCS using Broker, the Project ID is required, while other parameters are optional. For all parameter configurations, please refer to the documentation. [GCS Config](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/branch-2.2.x/gcs/CONFIGURATION.md)
```
(
"fs.gs.project.id" = "your-project-id",
"fs.AbstractFileSystem.gs.impl" = "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
"fs.gs.impl" = "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
)
```

View File

@ -35,6 +35,7 @@ Broker 是 Doris 集群中一种可选进程,主要用于支持 Doris 读写
- 华为云 OBS (1.2.0 版本后支持)
- 亚马逊 S3
- JuiceFS (2.0.0 版本支持)
- GCS (2.0.0 版本支持)
Broker 通过提供一个 RPC 服务端口来提供服务,是一个无状态的 Java 进程,负责为远端存储的读写操作封装一些类 POSIX 的文件操作,如 open,pread,pwrite 等等。除此之外,Broker 不记录任何其他信息,所以包括远端存储的连接信息、文件信息、权限信息等等,都需要通过参数在 RPC 调用中传递给 Broker 进程,才能使得 Broker 能够正确读写文件。
@ -240,4 +241,14 @@ WITH BROKER "broker_name"
"juicefs.meta" = "xxx",
"juicefs.access-log" = "xxx"
)
```
```
#### GCS
在使用 Broker 访问 GCS 时,Project ID 是必须的,其他参数可选,所有参数配置请参考 [GCS Config](https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/branch-2.2.x/gcs/CONFIGURATION.md)
```
(
"fs.gs.project.id" = "你的 Project ID",
"fs.AbstractFileSystem.gs.impl" = "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
"fs.gs.impl" = "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
)
```

View File

@ -301,9 +301,10 @@ public class ExportStmt extends StatementBase {
&& !schema.equalsIgnoreCase("s3a")
&& !schema.equalsIgnoreCase("cosn")
&& !schema.equalsIgnoreCase("gfs")
&& !schema.equalsIgnoreCase("jfs"))) {
&& !schema.equalsIgnoreCase("jfs")
&& !schema.equalsIgnoreCase("gs"))) {
throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'afs://' , 'bos://',"
+ " 'ofs://', 'obs://', 'oss://', 's3a://', 'cosn://', 'gfs://' or 'jfs://' path.");
+ " 'ofs://', 'obs://', 'oss://', 's3a://', 'cosn://', 'gfs://', 'gs://' or 'jfs://' path.");
}
} else if (type == StorageBackend.StorageType.S3) {
if (schema == null || !schema.equalsIgnoreCase("s3")) {

View File

@ -71,6 +71,7 @@ under the License.
<project.scm.id>github</project.scm.id>
<hadoop.version>2.10.2</hadoop.version>
<netty.version>4.1.65.Final</netty.version>
<gcs.version>hadoop2-2.2.15</gcs.version>
</properties>
<profiles>
<!-- for custom internal repository -->
@ -266,6 +267,13 @@ under the License.
<artifactId>log4j-core</artifactId>
<version>${log4j2.version}</version>
</dependency>
<!-- gcs connector-->
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>gcs-connector</artifactId>
<version>${gcs.version}</version>
<classifier>shaded</classifier>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
@ -393,6 +401,15 @@ under the License.
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/gcs-connector-${gcs.version}.jar</exclude>
</excludes>
</configuration>
</plugin>
<!-- copy all dependency libs to target lib dir -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>

View File

@ -77,6 +77,12 @@ public class FileSystemManager {
private static final String AFS_SCHEME = "afs";
private static final String GFS_SCHEME = "gfs";
private static final String GCS_SCHEME = "gs";
private static final String FS_PREFIX = "fs.";
private static final String GCS_PROJECT_ID_KEY = "fs.gs.project.id";
private static final String USER_NAME_KEY = "username";
private static final String PASSWORD_KEY = "password";
private static final String AUTHENTICATION_SIMPLE = "simple";
@ -224,6 +230,8 @@ public class FileSystemManager {
brokerFileSystem = getJuiceFileSystem(path, properties);
} else if (scheme.equals(GFS_SCHEME)) {
brokerFileSystem = getGooseFSFileSystem(path, properties);
} else if (scheme.equals(GCS_SCHEME)) {
brokerFileSystem = getGCSFileSystem(path, properties);
} else {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
"invalid path. scheme is not supported");
@ -475,6 +483,44 @@ public class FileSystemManager {
}
}
/**
* get GCS file system
* @param path gcs path
* @param properties See {@link <a href="https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/branch-2.2.x/gcs/CONFIGURATION.md)">...</a>}
* in broker load scenario, the properties should contains fs.gs.project.id
* @return GcsBrokerFileSystem
*/
public BrokerFileSystem getGCSFileSystem(String path, Map<String, String> properties) {
WildcardURI pathUri = new WildcardURI(path);
String projectId = properties.get(GCS_PROJECT_ID_KEY);
if (Strings.isNullOrEmpty(projectId)) {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
"missed fs.gs.project.id configuration");
}
FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(projectId, null);
BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties);
fileSystem.getLock().lock();
try {
if (fileSystem.getDFSFileSystem() == null) {
logger.info("create file system for new path " + path);
// create a new filesystem
Configuration conf = new Configuration();
properties.forEach((k, v) -> {
if (Strings.isNullOrEmpty(v) && k.startsWith(FS_PREFIX)) {
conf.set(k, v);
}
});
FileSystem gcsFileSystem = FileSystem.get(pathUri.getUri(), conf);
fileSystem.setFileSystem(gcsFileSystem);
}
return fileSystem;
} catch (Exception e) {
logger.error("errors while connect to " + path, e);
throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
} finally {
fileSystem.getLock().unlock();
}
}
/**
* file system handle is cached, the identity is endpoint + bucket + accessKey_secretKey