From 6c19e106ad1557e316bc7d0233832f5b30e0ab20 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Thu, 21 Sep 2023 20:19:43 +0800 Subject: [PATCH] [fix](rest catalog)support set region for s3 (#24566) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use REST Catalog to access S3 and support setting up regions: ``` CREATE CATALOG iceberg_rest_s3 PROPERTIES ( "type"="iceberg", "iceberg.catalog.type"="rest", "uri" = "http://127.0.0.1:8181", "s3.endpoint" = "http://127.0.0.1:8010", "s3.access_key" = "admin", "s3.secret_key" = "password", "s3.region" = "us-east-1" ); ``` --- .../docker-compose/iceberg/entrypoint.sh.tpl | 2 +- .../docker-compose/iceberg/iceberg.yaml.tpl | 2 +- .../iceberg/IcebergExternalCatalog.java | 2 +- .../iceberg/IcebergRestExternalCatalog.java | 7 +- .../rest/DorisIcebergRestResolvedIO.java | 225 ++++++++++++++++++ .../external/iceberg/IcebergApiSource.java | 3 +- .../iceberg/IcebergMetadataCache.java | 29 ++- 7 files changed, 255 insertions(+), 15 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rest/DorisIcebergRestResolvedIO.java diff --git a/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl b/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl index e862778b63..642ffdb4c4 100755 --- a/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl +++ b/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl @@ -21,7 +21,7 @@ export SPARK_MASTER_HOST=doris--spark-iceberg start-master.sh -p 7077 start-worker.sh spark://doris--spark-iceberg:7077 start-history-server.sh -start-thriftserver.sh +start-thriftserver.sh --driver-java-options "-Dderby.system.home=/tmp/derby" # Entrypoint, for example notebook, pyspark or spark-sql if [[ $# -gt 0 ]]; then diff --git a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl index fe8e29b77e..1cd3d4acf9 100644 --- a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl +++ b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl @@ -90,7 +90,7 @@ services: /usr/bin/mc rm -r --force minio/warehouse; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; - exit 0; + tail -f /dev/null " networks: doris--iceberg: diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java index 20e2a7ebfe..c8ff468ab2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java @@ -106,6 +106,6 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog { return Env.getCurrentEnv() .getExtMetaCacheMgr() .getIcebergMetadataCache() - .getIcebergTable(catalog, id, dbName, tblName); + .getIcebergTable(catalog, id, dbName, tblName, getProperties()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java index b021f84da6..25e5488b65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergRestExternalCatalog.java @@ -19,6 +19,7 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.credentials.DataLakeAWSCredentialsProvider; +import org.apache.doris.datasource.iceberg.rest.DorisIcebergRestResolvedIO; import org.apache.doris.datasource.property.PropertyConverter; import org.apache.hadoop.conf.Configuration; @@ -44,8 +45,12 @@ public class IcebergRestExternalCatalog extends IcebergExternalCatalog { Map restProperties = new HashMap<>(); String restUri = catalogProperty.getProperties().getOrDefault(CatalogProperties.URI, ""); restProperties.put(CatalogProperties.URI, restUri); - RESTCatalog restCatalog = new RESTCatalog(); + restProperties.put(CatalogProperties.FILE_IO_IMPL, DorisIcebergRestResolvedIO.class.getName()); + restProperties.putAll(catalogProperty.getProperties()); + Configuration conf = replaceS3Properties(getConfiguration()); + + RESTCatalog restCatalog = new RESTCatalog(); restCatalog.setConf(conf); restCatalog.initialize(icebergCatalogType, restProperties); catalog = restCatalog; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rest/DorisIcebergRestResolvedIO.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rest/DorisIcebergRestResolvedIO.java new file mode 100644 index 0000000000..f55f304d73 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rest/DorisIcebergRestResolvedIO.java @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.rest; + +import org.apache.doris.common.util.S3Util; +import org.apache.doris.datasource.credentials.CloudCredential; +import org.apache.doris.datasource.property.constants.OssProperties; +import org.apache.doris.datasource.property.constants.S3Properties; +import org.apache.doris.datasource.property.constants.S3Properties.Env; + +import com.amazonaws.glue.catalog.util.AWSGlueConfig; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.aws.s3.S3FileIO; +import org.apache.iceberg.hadoop.HadoopConfigurable; +import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.util.SerializableMap; +import org.apache.iceberg.util.SerializableSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +/** + * FileIO implementation that uses location scheme to choose the correct FileIO implementation. + * Copy from org.apache.iceberg.io.ResolvingFileIO and only modify the io for s3 (to set region) + * */ +public class DorisIcebergRestResolvedIO implements FileIO, HadoopConfigurable { + private static final Logger LOG = LoggerFactory.getLogger(DorisIcebergRestResolvedIO.class); + private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO"; + private static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO"; + private static final Map SCHEME_TO_FILE_IO = + ImmutableMap.of( + "s3", S3_FILE_IO_IMPL, + "s3a", S3_FILE_IO_IMPL, + "s3n", S3_FILE_IO_IMPL); + + private final Map ioInstances = Maps.newHashMap(); + private SerializableMap properties; + private SerializableSupplier hadoopConf; + + /** + * No-arg constructor to load the FileIO dynamically. + * + *

All fields are initialized by calling {@link DorisIcebergRestResolvedIO#initialize(Map)} later. + */ + public DorisIcebergRestResolvedIO() {} + + @Override + public InputFile newInputFile(String location) { + return io(location).newInputFile(location); + } + + @Override + public InputFile newInputFile(String location, long length) { + return io(location).newInputFile(location, length); + } + + @Override + public OutputFile newOutputFile(String location) { + return io(location).newOutputFile(location); + } + + @Override + public void deleteFile(String location) { + io(location).deleteFile(location); + } + + @Override + public Map properties() { + return properties.immutableMap(); + } + + @Override + public void initialize(Map newProperties) { + close(); // close and discard any existing FileIO instances + this.properties = SerializableMap.copyOf(newProperties); + } + + @Override + public void close() { + List instances = Lists.newArrayList(); + + synchronized (ioInstances) { + instances.addAll(ioInstances.values()); + ioInstances.clear(); + } + + for (FileIO io : instances) { + io.close(); + } + } + + @Override + public void serializeConfWith( + Function> confSerializer) { + this.hadoopConf = confSerializer.apply(hadoopConf.get()); + } + + @Override + public void setConf(Configuration conf) { + this.hadoopConf = new SerializableConfiguration(conf)::get; + } + + @Override + public Configuration getConf() { + return hadoopConf.get(); + } + + private FileIO io(String location) { + String impl = implFromLocation(location); + FileIO io = ioInstances.get(impl); + if (io != null) { + return io; + } + + synchronized (ioInstances) { + // double check while holding the lock + io = ioInstances.get(impl); + if (io != null) { + return io; + } + + Configuration conf = hadoopConf.get(); + + try { + if (impl.equals(S3_FILE_IO_IMPL)) { + io = createS3FileIO(properties); + } else { + io = CatalogUtil.loadFileIO(impl, properties, conf); + } + } catch (IllegalArgumentException e) { + if (impl.equals(FALLBACK_IMPL)) { + // no implementation to fall back to, throw the exception + throw e; + } else { + // couldn't load the normal class, fall back to HadoopFileIO + LOG.warn( + "Failed to load FileIO implementation: {}, falling back to {}", + impl, + FALLBACK_IMPL, + e); + try { + io = CatalogUtil.loadFileIO(FALLBACK_IMPL, properties, conf); + } catch (IllegalArgumentException suppressed) { + LOG.warn( + "Failed to load FileIO implementation: {} (fallback)", FALLBACK_IMPL, suppressed); + // both attempts failed, throw the original exception with the later exception + // suppressed + e.addSuppressed(suppressed); + throw e; + } + } + } + + ioInstances.put(impl, io); + } + + return io; + } + + private static String implFromLocation(String location) { + return SCHEME_TO_FILE_IO.getOrDefault(scheme(location), FALLBACK_IMPL); + } + + private static String scheme(String location) { + int colonPos = location.indexOf(":"); + if (colonPos > 0) { + return location.substring(0, colonPos); + } + + return null; + } + + protected FileIO createS3FileIO(Map properties) { + + // get region + String region = properties.getOrDefault(S3Properties.REGION, + properties.getOrDefault(AWSGlueConfig.AWS_REGION, properties.get(Env.REGION))); + + // get endpoint + String s3Endpoint = properties.getOrDefault(S3Properties.ENDPOINT, properties.get(Env.ENDPOINT)); + URI endpointUri = URI.create(s3Endpoint); + + // set credential + CloudCredential credential = new CloudCredential(); + credential.setAccessKey(properties.getOrDefault(S3Properties.ACCESS_KEY, + properties.get(S3Properties.Env.ACCESS_KEY))); + credential.setSecretKey(properties.getOrDefault(S3Properties.SECRET_KEY, + properties.get(S3Properties.Env.SECRET_KEY))); + if (properties.containsKey(OssProperties.SESSION_TOKEN) + || properties.containsKey(S3Properties.Env.TOKEN)) { + credential.setSessionToken(properties.getOrDefault(OssProperties.SESSION_TOKEN, + properties.get(S3Properties.Env.TOKEN))); + } + + FileIO io = new S3FileIO(() -> S3Util.buildS3Client(endpointUri, region, credential)); + io.initialize(properties); + return io; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java index 0e20747dfe..73ac8ed7b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergApiSource.java @@ -51,7 +51,8 @@ public class IcebergApiSource implements IcebergSource { ((IcebergExternalCatalog) icebergExtTable.getCatalog()).getCatalog(), icebergExtTable.getCatalog().getId(), icebergExtTable.getDbName(), - icebergExtTable.getName()); + icebergExtTable.getName(), + icebergExtTable.getCatalog().getProperties()); this.desc = desc; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java index 1f8b226e45..5f79623ff4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/iceberg/IcebergMetadataCache.java @@ -93,11 +93,12 @@ public class IcebergMetadataCache { ctg.getHiveMetastoreUris(), ctg.getCatalogProperty().getHadoopProperties(), dbName, - tbName); + tbName, + ctg.getProperties()); } else if (catalog instanceof IcebergExternalCatalog) { - IcebergExternalCatalog icebergExternalCatalog = (IcebergExternalCatalog) catalog; + IcebergExternalCatalog extCatalog = (IcebergExternalCatalog) catalog; icebergTable = getIcebergTable( - icebergExternalCatalog.getCatalog(), icebergExternalCatalog.getId(), dbName, tbName); + extCatalog.getCatalog(), extCatalog.getId(), dbName, tbName, extCatalog.getProperties()); } else { throw new UserException("Only support 'hms' and 'iceberg' type for iceberg table"); } @@ -124,7 +125,8 @@ public class IcebergMetadataCache { return icebergTable; } - public Table getIcebergTable(Catalog catalog, long catalogId, String dbName, String tbName) { + public Table getIcebergTable(Catalog catalog, long catalogId, String dbName, String tbName, + Map props) { IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of( catalogId, dbName, @@ -133,9 +135,10 @@ public class IcebergMetadataCache { if (cacheTable != null) { return cacheTable; } + Table table = HiveMetaStoreClientHelper.ugiDoAs(catalogId, () -> catalog.loadTable(TableIdentifier.of(dbName, tbName))); - initIcebergTableFileIO(table); + initIcebergTableFileIO(table, props); tableCache.put(key, table); @@ -187,7 +190,8 @@ public class IcebergMetadataCache { }); } - private Table createIcebergTable(String uri, Map hdfsConf, String db, String tbl) { + private Table createIcebergTable(String uri, Map hdfsConf, String db, String tbl, + Map props) { // set hdfs configure Configuration conf = new HdfsConfiguration(); for (Map.Entry entry : hdfsConf.entrySet()) { @@ -204,7 +208,7 @@ public class IcebergMetadataCache { Table table = HiveMetaStoreClientHelper.ugiDoAs(conf, () -> hiveCatalog.loadTable(TableIdentifier.of(db, tbl))); - initIcebergTableFileIO(table); + initIcebergTableFileIO(table, props); return table; } @@ -213,17 +217,22 @@ public class IcebergMetadataCache { return createIcebergTable(hmsTable.getMetastoreUri(), hmsTable.getHadoopProperties(), hmsTable.getDbName(), - hmsTable.getName()); + hmsTable.getName(), + hmsTable.getCatalogProperties()); } - private void initIcebergTableFileIO(Table table) { + private void initIcebergTableFileIO(Table table, Map props) { Map ioConf = new HashMap<>(); table.properties().forEach((key, value) -> { if (key.startsWith("io.")) { ioConf.put(key, value); } }); - table.io().initialize(ioConf); + + // This `initialize` method will directly override the properties as a whole, + // so we need to merge the table's io-related properties with the doris's catalog-related properties + props.putAll(ioConf); + table.io().initialize(props); } static class IcebergMetadataCacheKey {