[fix](iceberg) update iceberg docs and add credential properties (#16429)
Update iceberg docs Add new s3 credential and properties
This commit is contained in:
@ -37,6 +37,8 @@ When connecting to Iceberg, Doris:
|
||||
|
||||
## Create Catalog
|
||||
|
||||
### Hive Metastore Catalog
|
||||
|
||||
Same as creating Hive Catalogs. A simple example is provided here. See [Hive](./hive) for more information.
|
||||
|
||||
```sql
|
||||
@ -52,6 +54,52 @@ CREATE CATALOG iceberg PROPERTIES (
|
||||
);
|
||||
```
|
||||
|
||||
### Iceberg Native Catalog
|
||||
|
||||
<version since="dev">
|
||||
|
||||
Access metadata with the iceberg API. The Hive, REST, Glue and other services can serve as the iceberg catalog.
|
||||
|
||||
</version>
|
||||
|
||||
- Using Iceberg Hive Catalog
|
||||
|
||||
```sql
|
||||
CREATE CATALOG iceberg PROPERTIES (
|
||||
'type'='iceberg',
|
||||
'iceberg.catalog.type'='hms',
|
||||
'hive.metastore.uris' = 'thrift://172.21.0.1:7004',
|
||||
'hadoop.username' = 'hive',
|
||||
'dfs.nameservices'='your-nameservice',
|
||||
'dfs.ha.namenodes.your-nameservice'='nn1,nn2',
|
||||
'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007',
|
||||
'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007',
|
||||
'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
|
||||
);
|
||||
```
|
||||
|
||||
- Using Iceberg REST Catalog
|
||||
|
||||
RESTful service as the server side. Implementing RESTCatalog interface of iceberg to obtain metadata.
|
||||
|
||||
```sql
|
||||
CREATE CATALOG iceberg PROPERTIES (
|
||||
'type'='iceberg',
|
||||
'iceberg.catalog.type'='rest',
|
||||
'uri' = 'http://172.21.0.1:8181',
|
||||
);
|
||||
```
|
||||
|
||||
If you want to use S3 storage, the following properties need to be set.
|
||||
|
||||
```
|
||||
"AWS_ACCESS_KEY" = "ak"
|
||||
"AWS_SECRET_KEY" = "sk"
|
||||
"AWS_REGION" = "region-name"
|
||||
"AWS_ENDPOINT" = "http://endpoint-uri"
|
||||
"AWS_CREDENTIALS_PROVIDER" = "provider-class-name" // Optional. The default credentials class is based on BasicAWSCredentials.
|
||||
```
|
||||
|
||||
## Column Type Mapping
|
||||
|
||||
Same as that in Hive Catalogs. See the relevant section in [Hive](./hive).
|
||||
|
||||
@ -54,8 +54,12 @@ CREATE CATALOG iceberg PROPERTIES (
|
||||
|
||||
### 基于Iceberg API创建Catalog
|
||||
|
||||
<version since="dev">
|
||||
|
||||
使用Iceberg API访问元数据的方式,支持Hive、REST、Glue等服务作为Iceberg的Catalog。
|
||||
|
||||
</version>
|
||||
|
||||
- Hive Metastore作为元数据服务
|
||||
|
||||
```sql
|
||||
@ -86,11 +90,12 @@ CREATE CATALOG iceberg PROPERTIES (
|
||||
|
||||
若数据存放在S3上,properties中可以使用以下参数
|
||||
|
||||
```sql
|
||||
"AWS_ACCESS_KEY" = "username"
|
||||
"AWS_SECRET_KEY" = "password"
|
||||
```
|
||||
"AWS_ACCESS_KEY" = "ak"
|
||||
"AWS_SECRET_KEY" = "sk"
|
||||
"AWS_REGION" = "region-name"
|
||||
"AWS_ENDPOINT" = "http://endpoint-uri"
|
||||
"AWS_CREDENTIALS_PROVIDER" = "provider-class-name" // 可选,默认凭证类基于BasicAWSCredentials实现。
|
||||
```
|
||||
|
||||
## 列类型映射
|
||||
|
||||
@ -27,6 +27,9 @@ import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.hadoop.fs.s3a.Constants;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -70,6 +73,7 @@ public class S3Resource extends Resource {
|
||||
public static final String S3_REGION = "AWS_REGION";
|
||||
public static final String S3_ACCESS_KEY = "AWS_ACCESS_KEY";
|
||||
public static final String S3_SECRET_KEY = "AWS_SECRET_KEY";
|
||||
private static final String S3_CREDENTIALS_PROVIDER = "AWS_CREDENTIALS_PROVIDER";
|
||||
public static final List<String> REQUIRED_FIELDS =
|
||||
Arrays.asList(S3_ENDPOINT, S3_REGION, S3_ACCESS_KEY, S3_SECRET_KEY);
|
||||
// required by storage policy
|
||||
@ -222,41 +226,39 @@ public class S3Resource extends Resource {
|
||||
public static Map<String, String> getS3HadoopProperties(Map<String, String> properties) {
|
||||
Map<String, String> s3Properties = Maps.newHashMap();
|
||||
if (properties.containsKey(S3_ACCESS_KEY)) {
|
||||
s3Properties.put("fs.s3a.access.key", properties.get(S3_ACCESS_KEY));
|
||||
s3Properties.put(Constants.ACCESS_KEY, properties.get(S3_ACCESS_KEY));
|
||||
}
|
||||
if (properties.containsKey(S3Resource.S3_SECRET_KEY)) {
|
||||
s3Properties.put("fs.s3a.secret.key", properties.get(S3_SECRET_KEY));
|
||||
s3Properties.put(Constants.SECRET_KEY, properties.get(S3_SECRET_KEY));
|
||||
}
|
||||
if (properties.containsKey(S3Resource.S3_ENDPOINT)) {
|
||||
s3Properties.put("fs.s3a.endpoint", properties.get(S3_ENDPOINT));
|
||||
s3Properties.put(Constants.ENDPOINT, properties.get(S3_ENDPOINT));
|
||||
}
|
||||
if (properties.containsKey(S3Resource.S3_REGION)) {
|
||||
s3Properties.put("fs.s3a.endpoint.region", properties.get(S3_REGION));
|
||||
s3Properties.put(Constants.AWS_REGION, properties.get(S3_REGION));
|
||||
}
|
||||
if (properties.containsKey(S3Resource.S3_MAX_CONNECTIONS)) {
|
||||
s3Properties.put("fs.s3a.connection.maximum", properties.get(S3_MAX_CONNECTIONS));
|
||||
s3Properties.put(Constants.MAXIMUM_CONNECTIONS, properties.get(S3_MAX_CONNECTIONS));
|
||||
}
|
||||
if (properties.containsKey(S3Resource.S3_REQUEST_TIMEOUT_MS)) {
|
||||
s3Properties.put("fs.s3a.connection.request.timeout", properties.get(S3_REQUEST_TIMEOUT_MS));
|
||||
s3Properties.put(Constants.REQUEST_TIMEOUT, properties.get(S3_REQUEST_TIMEOUT_MS));
|
||||
}
|
||||
if (properties.containsKey(S3Resource.S3_CONNECTION_TIMEOUT_MS)) {
|
||||
s3Properties.put("fs.s3a.connection.timeout", properties.get(S3_CONNECTION_TIMEOUT_MS));
|
||||
s3Properties.put(Constants.SOCKET_TIMEOUT, properties.get(S3_CONNECTION_TIMEOUT_MS));
|
||||
}
|
||||
s3Properties.put(Constants.MAX_ERROR_RETRIES, "2");
|
||||
s3Properties.put("fs.s3.impl.disable.cache", "true");
|
||||
s3Properties.put("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
|
||||
s3Properties.put("fs.s3a.attempts.maximum", "2");
|
||||
s3Properties.put("fs.s3.impl", S3AFileSystem.class.getName());
|
||||
|
||||
if (Boolean.valueOf(properties.getOrDefault(S3Resource.USE_PATH_STYLE, "false")).booleanValue()) {
|
||||
s3Properties.put("fs.s3a.path.style.access", "true");
|
||||
} else {
|
||||
s3Properties.put("fs.s3a.path.style.access", "false");
|
||||
}
|
||||
s3Properties.put(Constants.PATH_STYLE_ACCESS, properties.getOrDefault(S3Resource.USE_PATH_STYLE, "false"));
|
||||
s3Properties.put(Constants.AWS_CREDENTIALS_PROVIDER, properties.getOrDefault(S3Resource.S3_CREDENTIALS_PROVIDER,
|
||||
TemporaryAWSCredentialsProvider.class.getName()));
|
||||
if (properties.containsKey(S3Resource.S3_TOKEN)) {
|
||||
s3Properties.put("fs.s3a.session.token", properties.get(S3_TOKEN));
|
||||
s3Properties.put(Constants.SESSION_TOKEN, properties.get(S3_TOKEN));
|
||||
s3Properties.put("fs.s3a.aws.credentials.provider",
|
||||
"org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider");
|
||||
s3Properties.put("fs.s3a.impl.disable.cache", "true");
|
||||
s3Properties.put("fs.s3.impl.disable.cache", "true");
|
||||
s3Properties.put("fs.s3a.impl.disable.cache", "true");
|
||||
}
|
||||
for (Map.Entry<String, String> entry : properties.entrySet()) {
|
||||
if (entry.getKey().startsWith(S3Resource.S3_FS_PREFIX)) {
|
||||
|
||||
Reference in New Issue
Block a user