From 2883f6704263565b5d57227bbb91c29c8287dcbf Mon Sep 17 00:00:00 2001 From: slothever <18522955+wsjz@users.noreply.github.com> Date: Wed, 8 Feb 2023 13:53:01 +0800 Subject: [PATCH] [fix](iceberg) update iceberg docs and add credential properties (#16429) Update iceberg docs Add new s3 credential and properties --- .../docs/lakehouse/multi-catalog/iceberg.md | 48 +++++++++++++++++++ .../docs/lakehouse/multi-catalog/iceberg.md | 11 +++-- .../org/apache/doris/catalog/S3Resource.java | 34 ++++++------- 3 files changed, 74 insertions(+), 19 deletions(-) diff --git a/docs/en/docs/lakehouse/multi-catalog/iceberg.md b/docs/en/docs/lakehouse/multi-catalog/iceberg.md index 67ce750066..143133b598 100644 --- a/docs/en/docs/lakehouse/multi-catalog/iceberg.md +++ b/docs/en/docs/lakehouse/multi-catalog/iceberg.md @@ -37,6 +37,8 @@ When connecting to Iceberg, Doris: ## Create Catalog +### Hive Metastore Catalog + Same as creating Hive Catalogs. A simple example is provided here. See [Hive](./hive) for more information. ```sql @@ -52,6 +54,52 @@ CREATE CATALOG iceberg PROPERTIES ( ); ``` +### Iceberg Native Catalog + + + +Access metadata with the iceberg API. The Hive, REST, Glue and other services can serve as the iceberg catalog. + + + +- Using Iceberg Hive Catalog + +```sql +CREATE CATALOG iceberg PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='hms', + 'hive.metastore.uris' = 'thrift://172.21.0.1:7004', + 'hadoop.username' = 'hive', + 'dfs.nameservices'='your-nameservice', + 'dfs.ha.namenodes.your-nameservice'='nn1,nn2', + 'dfs.namenode.rpc-address.your-nameservice.nn1'='172.21.0.2:4007', + 'dfs.namenode.rpc-address.your-nameservice.nn2'='172.21.0.3:4007', + 'dfs.client.failover.proxy.provider.your-nameservice'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider' +); +``` + +- Using Iceberg REST Catalog + +RESTful service as the server side. Implementing RESTCatalog interface of iceberg to obtain metadata. + +```sql +CREATE CATALOG iceberg PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://172.21.0.1:8181', +); +``` + +If you want to use S3 storage, the following properties need to be set. + +``` +"AWS_ACCESS_KEY" = "ak" +"AWS_SECRET_KEY" = "sk" +"AWS_REGION" = "region-name" +"AWS_ENDPOINT" = "http://endpoint-uri" +"AWS_CREDENTIALS_PROVIDER" = "provider-class-name" // Optional. The default credentials class is based on BasicAWSCredentials. +``` + ## Column Type Mapping Same as that in Hive Catalogs. See the relevant section in [Hive](./hive). diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md index 3424eb8c29..2df5af1d6a 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md @@ -54,8 +54,12 @@ CREATE CATALOG iceberg PROPERTIES ( ### 基于Iceberg API创建Catalog + + 使用Iceberg API访问元数据的方式,支持Hive、REST、Glue等服务作为Iceberg的Catalog。 + + - Hive Metastore作为元数据服务 ```sql @@ -86,11 +90,12 @@ CREATE CATALOG iceberg PROPERTIES ( 若数据存放在S3上,properties中可以使用以下参数 -```sql -"AWS_ACCESS_KEY" = "username" -"AWS_SECRET_KEY" = "password" +``` +"AWS_ACCESS_KEY" = "ak" +"AWS_SECRET_KEY" = "sk" "AWS_REGION" = "region-name" "AWS_ENDPOINT" = "http://endpoint-uri" +"AWS_CREDENTIALS_PROVIDER" = "provider-class-name" // 可选,默认凭证类基于BasicAWSCredentials实现。 ``` ## 列类型映射 diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java index e1ace54e1b..50457ef3d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/S3Resource.java @@ -27,6 +27,9 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -70,6 +73,7 @@ public class S3Resource extends Resource { public static final String S3_REGION = "AWS_REGION"; public static final String S3_ACCESS_KEY = "AWS_ACCESS_KEY"; public static final String S3_SECRET_KEY = "AWS_SECRET_KEY"; + private static final String S3_CREDENTIALS_PROVIDER = "AWS_CREDENTIALS_PROVIDER"; public static final List REQUIRED_FIELDS = Arrays.asList(S3_ENDPOINT, S3_REGION, S3_ACCESS_KEY, S3_SECRET_KEY); // required by storage policy @@ -222,41 +226,39 @@ public class S3Resource extends Resource { public static Map getS3HadoopProperties(Map properties) { Map s3Properties = Maps.newHashMap(); if (properties.containsKey(S3_ACCESS_KEY)) { - s3Properties.put("fs.s3a.access.key", properties.get(S3_ACCESS_KEY)); + s3Properties.put(Constants.ACCESS_KEY, properties.get(S3_ACCESS_KEY)); } if (properties.containsKey(S3Resource.S3_SECRET_KEY)) { - s3Properties.put("fs.s3a.secret.key", properties.get(S3_SECRET_KEY)); + s3Properties.put(Constants.SECRET_KEY, properties.get(S3_SECRET_KEY)); } if (properties.containsKey(S3Resource.S3_ENDPOINT)) { - s3Properties.put("fs.s3a.endpoint", properties.get(S3_ENDPOINT)); + s3Properties.put(Constants.ENDPOINT, properties.get(S3_ENDPOINT)); } if (properties.containsKey(S3Resource.S3_REGION)) { - s3Properties.put("fs.s3a.endpoint.region", properties.get(S3_REGION)); + s3Properties.put(Constants.AWS_REGION, properties.get(S3_REGION)); } if (properties.containsKey(S3Resource.S3_MAX_CONNECTIONS)) { - s3Properties.put("fs.s3a.connection.maximum", properties.get(S3_MAX_CONNECTIONS)); + s3Properties.put(Constants.MAXIMUM_CONNECTIONS, properties.get(S3_MAX_CONNECTIONS)); } if (properties.containsKey(S3Resource.S3_REQUEST_TIMEOUT_MS)) { - s3Properties.put("fs.s3a.connection.request.timeout", properties.get(S3_REQUEST_TIMEOUT_MS)); + s3Properties.put(Constants.REQUEST_TIMEOUT, properties.get(S3_REQUEST_TIMEOUT_MS)); } if (properties.containsKey(S3Resource.S3_CONNECTION_TIMEOUT_MS)) { - s3Properties.put("fs.s3a.connection.timeout", properties.get(S3_CONNECTION_TIMEOUT_MS)); + s3Properties.put(Constants.SOCKET_TIMEOUT, properties.get(S3_CONNECTION_TIMEOUT_MS)); } + s3Properties.put(Constants.MAX_ERROR_RETRIES, "2"); s3Properties.put("fs.s3.impl.disable.cache", "true"); - s3Properties.put("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); - s3Properties.put("fs.s3a.attempts.maximum", "2"); + s3Properties.put("fs.s3.impl", S3AFileSystem.class.getName()); - if (Boolean.valueOf(properties.getOrDefault(S3Resource.USE_PATH_STYLE, "false")).booleanValue()) { - s3Properties.put("fs.s3a.path.style.access", "true"); - } else { - s3Properties.put("fs.s3a.path.style.access", "false"); - } + s3Properties.put(Constants.PATH_STYLE_ACCESS, properties.getOrDefault(S3Resource.USE_PATH_STYLE, "false")); + s3Properties.put(Constants.AWS_CREDENTIALS_PROVIDER, properties.getOrDefault(S3Resource.S3_CREDENTIALS_PROVIDER, + TemporaryAWSCredentialsProvider.class.getName())); if (properties.containsKey(S3Resource.S3_TOKEN)) { - s3Properties.put("fs.s3a.session.token", properties.get(S3_TOKEN)); + s3Properties.put(Constants.SESSION_TOKEN, properties.get(S3_TOKEN)); s3Properties.put("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider"); - s3Properties.put("fs.s3a.impl.disable.cache", "true"); s3Properties.put("fs.s3.impl.disable.cache", "true"); + s3Properties.put("fs.s3a.impl.disable.cache", "true"); } for (Map.Entry entry : properties.entrySet()) { if (entry.getKey().startsWith(S3Resource.S3_FS_PREFIX)) {