diff --git a/docs/en/docs/lakehouse/multi-catalog/iceberg.md b/docs/en/docs/lakehouse/multi-catalog/iceberg.md index 383a01b7b7..f316ee6bfe 100644 --- a/docs/en/docs/lakehouse/multi-catalog/iceberg.md +++ b/docs/en/docs/lakehouse/multi-catalog/iceberg.md @@ -106,6 +106,23 @@ If the data is stored on S3, the following parameters can be used in properties: "s3.credentials.provider" = "provider-class-name" // 可选,默认凭证类基于BasicAWSCredentials实现。 ``` +#### Google Dataproc Metastore 作为元数据服务 + +```sql +CREATE CATALOG iceberg PROPERTIES ( + "type"="iceberg", + "iceberg.catalog.type"="hms", + "hive.metastore.uris" = "thrift://172.21.0.1:9083", + "gs.endpoint" = "https://storage.googleapis.com", + "gs.region" = "us-east-1", + "gs.access_key" = "ak", + "gs.secret_key" = "sk", + "use_path_style" = "true" +); +``` + +`hive.metastore.uris`: Dataproc Metastore URI,See in Metastore Services :[Dataproc Metastore Services](https://console.cloud.google.com/dataproc/metastore). + ## Column type mapping Consistent with Hive Catalog, please refer to the **column type mapping** section in [Hive Catalog](./hive.md). diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md index 77b959c239..7fbab6ba13 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/iceberg.md @@ -106,6 +106,23 @@ CREATE CATALOG iceberg PROPERTIES ( "s3.credentials.provider" = "provider-class-name" // 可选,默认凭证类基于BasicAWSCredentials实现。 ``` +#### Google Dataproc Metastore 作为元数据服务 + +```sql +CREATE CATALOG iceberg PROPERTIES ( + "type"="iceberg", + "iceberg.catalog.type"="hms", + "hive.metastore.uris" = "thrift://172.21.0.1:9083", + "gs.endpoint" = "https://storage.googleapis.com", + "gs.region" = "us-east-1", + "gs.access_key" = "ak", + "gs.secret_key" = "sk", + "use_path_style" = "true" +); +``` + +`hive.metastore.uris`: Dataproc Metastore 服务开放的接口,在 Metastore 管理页面获取 :[Dataproc Metastore Services](https://console.cloud.google.com/dataproc/metastore). + ## 列类型映射 和 Hive Catalog 一致,可参阅 [Hive Catalog](./hive.md) 中 **列类型映射** 一节。 diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index eb06ef86f3..63f6b82143 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -626,6 +626,20 @@ under the License. + + + com.google.cloud.bigdataoss + gcs-connector + hadoop2-2.2.8 + shaded + + + * + * + + + + org.apache.ranger diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index ceb4220e42..d266d35c61 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -77,6 +77,7 @@ public class FeConstants { public static String FS_PREFIX_S3A = "s3a"; public static String FS_PREFIX_S3N = "s3n"; public static String FS_PREFIX_OSS = "oss"; + public static String FS_PREFIX_GCS = "gs"; public static String FS_PREFIX_BOS = "bos"; public static String FS_PREFIX_COS = "cos"; public static String FS_PREFIX_OBS = "obs"; @@ -87,14 +88,4 @@ public class FeConstants { public static String FS_PREFIX_FILE = "file"; public static final String INTERNAL_DB_NAME = "__internal_schema"; public static String TEMP_MATERIZLIZE_DVIEW_PREFIX = "internal_tmp_materialized_view_"; - - public static boolean isObjStorage(String location) { - return location.startsWith(FeConstants.FS_PREFIX_S3) - || location.startsWith(FeConstants.FS_PREFIX_S3A) - || location.startsWith(FeConstants.FS_PREFIX_S3N) - || location.startsWith(FeConstants.FS_PREFIX_BOS) - || location.startsWith(FeConstants.FS_PREFIX_COS) - || location.startsWith(FeConstants.FS_PREFIX_OSS) - || location.startsWith(FeConstants.FS_PREFIX_OBS); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PrintableMap.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PrintableMap.java index 9186695ffe..c0e731b997 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PrintableMap.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PrintableMap.java @@ -19,6 +19,7 @@ package org.apache.doris.common.util; import org.apache.doris.datasource.property.constants.CosProperties; import org.apache.doris.datasource.property.constants.DLFProperties; +import org.apache.doris.datasource.property.constants.GCSProperties; import org.apache.doris.datasource.property.constants.GlueProperties; import org.apache.doris.datasource.property.constants.ObsProperties; import org.apache.doris.datasource.property.constants.OssProperties; @@ -53,8 +54,9 @@ public class PrintableMap { SENSITIVE_KEY.add("bos_secret_accesskey"); SENSITIVE_KEY.add("jdbc.password"); SENSITIVE_KEY.add("elasticsearch.password"); - SENSITIVE_KEY.addAll(Arrays.asList(S3Properties.SECRET_KEY, ObsProperties.SECRET_KEY, OssProperties.SECRET_KEY, - CosProperties.SECRET_KEY, GlueProperties.SECRET_KEY, DLFProperties.SECRET_KEY)); + SENSITIVE_KEY.addAll(Arrays.asList(S3Properties.SECRET_KEY, ObsProperties.SECRET_KEY, OssProperties.SECRET_KEY, + GCSProperties.SECRET_KEY, CosProperties.SECRET_KEY, GlueProperties.SECRET_KEY, + DLFProperties.SECRET_KEY)); HIDDEN_KEY = Sets.newHashSet(); HIDDEN_KEY.addAll(S3Properties.Env.FS_KEYS); HIDDEN_KEY.addAll(GlueProperties.META_KEYS); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java new file mode 100644 index 0000000000..4620bdfa6c --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/S3Util.java @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.apache.doris.common.FeConstants; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class S3Util { + private static final Logger LOG = LogManager.getLogger(S3Util.class); + + public static boolean isObjStorage(String location) { + return location.startsWith(FeConstants.FS_PREFIX_S3) + || location.startsWith(FeConstants.FS_PREFIX_S3A) + || location.startsWith(FeConstants.FS_PREFIX_S3N) + || location.startsWith(FeConstants.FS_PREFIX_GCS) + || location.startsWith(FeConstants.FS_PREFIX_BOS) + || location.startsWith(FeConstants.FS_PREFIX_COS) + || location.startsWith(FeConstants.FS_PREFIX_OSS) + || location.startsWith(FeConstants.FS_PREFIX_OBS); + } + + public static String convertToS3IfNecessary(String location) { + LOG.debug("try convert location to s3 prefix: " + location); + if (isObjStorage(location)) { + int pos = location.indexOf("://"); + if (pos == -1) { + throw new RuntimeException("No '://' found in location: " + location); + } + return "s3" + location.substring(pos); + } + return location; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java index c0d10e22f2..4a61b5f8e9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java @@ -27,6 +27,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.S3Util; import org.apache.doris.datasource.CacheException; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.external.hive.util.HiveUtil; @@ -296,7 +297,7 @@ public class HiveMetaStoreCache { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); - String finalLocation = convertToS3IfNecessary(key.location); + String finalLocation = S3Util.convertToS3IfNecessary(key.location); JobConf jobConf = getJobConf(); // For Tez engine, it may generate subdirectories for "union" query. // So there may be files and directories in the table directory at the same time. eg: @@ -345,23 +346,6 @@ public class HiveMetaStoreCache { } } - // convert oss:// to s3:// - private String convertToS3IfNecessary(String location) { - LOG.debug("try convert location to s3 prefix: " + location); - if (location.startsWith(FeConstants.FS_PREFIX_COS) - || location.startsWith(FeConstants.FS_PREFIX_BOS) - || location.startsWith(FeConstants.FS_PREFIX_OSS) - || location.startsWith(FeConstants.FS_PREFIX_S3A) - || location.startsWith(FeConstants.FS_PREFIX_S3N)) { - int pos = location.indexOf("://"); - if (pos == -1) { - throw new RuntimeException("No '://' found in location: " + location); - } - return "s3" + location.substring(pos); - } - return location; - } - private JobConf getJobConf() { Configuration configuration = new HdfsConfiguration(); for (Map.Entry entry : catalog.getCatalogProperty().getHadoopProperties().entrySet()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java index 4ba4ef107e..fad5b64e79 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/PropertyConverter.java @@ -23,6 +23,7 @@ import org.apache.doris.datasource.credentials.CloudCredentialWithEndpoint; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.datasource.property.constants.CosProperties; import org.apache.doris.datasource.property.constants.DLFProperties; +import org.apache.doris.datasource.property.constants.GCSProperties; import org.apache.doris.datasource.property.constants.GlueProperties; import org.apache.doris.datasource.property.constants.HMSProperties; import org.apache.doris.datasource.property.constants.ObsProperties; @@ -96,6 +97,8 @@ public class PropertyConverter { return convertToS3Properties(props, S3Properties.getCredential(props)); } else if (props.containsKey(ObsProperties.ENDPOINT)) { return convertToOBSProperties(props, ObsProperties.getCredential(props)); + } else if (props.containsKey(GCSProperties.ENDPOINT)) { + return convertToGCSProperties(props, GCSProperties.getCredential(props)); } else if (props.containsKey(OssProperties.ENDPOINT)) { return convertToOSSProperties(props, OssProperties.getCredential(props)); } else if (props.containsKey(CosProperties.ENDPOINT)) { @@ -215,6 +218,11 @@ public class PropertyConverter { } } + private static Map convertToGCSProperties(Map props, CloudCredential credential) { + // Now we use s3 client to access + return convertToS3Properties(S3Properties.prefixToS3(props), credential); + } + private static Map convertToOSSProperties(Map props, CloudCredential credential) { // Now we use s3 client to access return convertToS3Properties(S3Properties.prefixToS3(props), credential); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java index e195403aff..90efd12de1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/S3ClientBEProperties.java @@ -18,6 +18,7 @@ package org.apache.doris.datasource.property; import org.apache.doris.datasource.property.constants.CosProperties; +import org.apache.doris.datasource.property.constants.GCSProperties; import org.apache.doris.datasource.property.constants.ObsProperties; import org.apache.doris.datasource.property.constants.OssProperties; import org.apache.doris.datasource.property.constants.S3Properties; @@ -37,6 +38,7 @@ public class S3ClientBEProperties { return getBeAWSPropertiesFromS3(properties); } else if (properties.containsKey(ObsProperties.ENDPOINT) || properties.containsKey(OssProperties.ENDPOINT) + || properties.containsKey(GCSProperties.ENDPOINT) || properties.containsKey(CosProperties.ENDPOINT)) { return getBeAWSPropertiesFromS3(S3Properties.prefixToS3(properties)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/GCSProperties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/GCSProperties.java new file mode 100644 index 0000000000..8a42f417a3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/GCSProperties.java @@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.property.constants; + +import org.apache.doris.datasource.credentials.CloudCredential; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +public class GCSProperties extends BaseProperties { + + public static final String GCS_PREFIX = "gs."; + + public static final String ENDPOINT = "gs.endpoint"; + public static final String REGION = "gs.region"; + public static final String ACCESS_KEY = "gs.access_key"; + public static final String SECRET_KEY = "gs.secret_key"; + public static final String SESSION_TOKEN = "gs.session_token"; + public static final List REQUIRED_FIELDS = Arrays.asList(ENDPOINT, ACCESS_KEY, SECRET_KEY); + + public static CloudCredential getCredential(Map props) { + return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java index 59badfe25c..a100982995 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/constants/S3Properties.java @@ -131,7 +131,10 @@ public class S3Properties extends BaseProperties { if (entry.getKey().startsWith(OssProperties.OSS_PREFIX)) { String s3Key = entry.getKey().replace(OssProperties.OSS_PREFIX, S3Properties.S3_PREFIX); s3Properties.put(s3Key, entry.getValue()); - } else if (entry.getKey().startsWith(CosProperties.COS_PREFIX)) { + } else if (entry.getKey().startsWith(GCSProperties.GCS_PREFIX)) { + String s3Key = entry.getKey().replace(GCSProperties.GCS_PREFIX, S3Properties.S3_PREFIX); + s3Properties.put(s3Key, entry.getValue()); + } else if (entry.getKey().startsWith(CosProperties.COS_PREFIX)) { String s3Key = entry.getKey().replace(CosProperties.COS_PREFIX, S3Properties.S3_PREFIX); s3Properties.put(s3Key, entry.getValue()); } else if (entry.getKey().startsWith(ObsProperties.OBS_PREFIX)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java index afcef39c29..ceeb0ea638 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java @@ -32,6 +32,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.BrokerUtil; +import org.apache.doris.common.util.S3Util; import org.apache.doris.common.util.Util; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.planner.PlanNodeId; @@ -352,7 +353,7 @@ public abstract class FileQueryScanNode extends FileScanNode { protected static Optional getTFileType(String location) { if (location != null && !location.isEmpty()) { - if (FeConstants.isObjStorage(location)) { + if (S3Util.isObjStorage(location)) { return Optional.of(TFileType.FILE_S3); } else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) { return Optional.of(TFileType.FILE_HDFS); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java index f19a7a43de..c149f24525 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergScanNode.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.external.IcebergExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.S3Util; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; import org.apache.doris.external.iceberg.util.IcebergUtils; @@ -185,7 +186,8 @@ public class IcebergScanNode extends FileQueryScanNode { long fileSize = task.file().fileSizeInBytes(); for (FileScanTask splitTask : task.split(splitSize)) { String dataFilePath = splitTask.file().path().toString(); - IcebergSplit split = new IcebergSplit(new Path(dataFilePath), splitTask.start(), + String finalDataFilePath = S3Util.convertToS3IfNecessary(dataFilePath); + IcebergSplit split = new IcebergSplit(new Path(finalDataFilePath), splitTask.start(), splitTask.length(), fileSize, new String[0]); split.setFormatVersion(formatVersion); if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {