[enhance](iceberg)upgrade iceberg to 1.4.3 (#30799)
This commit is contained in:
@ -23,8 +23,8 @@ import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
|
||||
import org.apache.hadoop.fs.s3a.Constants;
|
||||
import org.apache.iceberg.CatalogProperties;
|
||||
import org.apache.iceberg.aws.AwsProperties;
|
||||
import org.apache.iceberg.aws.glue.GlueCatalog;
|
||||
import org.apache.iceberg.aws.s3.S3FileIOProperties;
|
||||
import org.apache.iceberg.catalog.Namespace;
|
||||
|
||||
import java.util.List;
|
||||
@ -55,7 +55,7 @@ public class IcebergGlueExternalCatalog extends IcebergExternalCatalog {
|
||||
// read from converted s3 endpoint or default by BE s3 endpoint
|
||||
String endpoint = catalogProperties.getOrDefault(Constants.ENDPOINT,
|
||||
catalogProperties.get(S3Properties.Env.ENDPOINT));
|
||||
catalogProperties.putIfAbsent(AwsProperties.S3FILEIO_ENDPOINT, endpoint);
|
||||
catalogProperties.putIfAbsent(S3FileIOProperties.ENDPOINT, endpoint);
|
||||
|
||||
glueCatalog.initialize(icebergCatalogType, catalogProperties);
|
||||
catalog = glueCatalog;
|
||||
|
||||
@ -18,14 +18,16 @@
|
||||
package org.apache.doris.datasource.iceberg;
|
||||
|
||||
import org.apache.doris.datasource.CatalogProperty;
|
||||
import org.apache.doris.datasource.iceberg.rest.DorisIcebergRestResolvedIO;
|
||||
import org.apache.doris.datasource.property.PropertyConverter;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.s3a.Constants;
|
||||
import org.apache.iceberg.CatalogProperties;
|
||||
import org.apache.iceberg.rest.RESTCatalog;
|
||||
import org.apache.iceberg.CatalogUtil;
|
||||
import org.apache.iceberg.aws.AwsClientProperties;
|
||||
import org.apache.iceberg.aws.s3.S3FileIO;
|
||||
import org.apache.iceberg.aws.s3.S3FileIOProperties;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@ -33,7 +35,7 @@ import java.util.Map;
|
||||
public class IcebergRestExternalCatalog extends IcebergExternalCatalog {
|
||||
|
||||
public IcebergRestExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
|
||||
String comment) {
|
||||
String comment) {
|
||||
super(catalogId, name, comment);
|
||||
props = PropertyConverter.convertToMetaProperties(props);
|
||||
catalogProperty = new CatalogProperty(resource, props);
|
||||
@ -42,18 +44,12 @@ public class IcebergRestExternalCatalog extends IcebergExternalCatalog {
|
||||
@Override
|
||||
protected void initLocalObjectsImpl() {
|
||||
icebergCatalogType = ICEBERG_REST;
|
||||
Map<String, String> restProperties = new HashMap<>();
|
||||
String restUri = catalogProperty.getProperties().getOrDefault(CatalogProperties.URI, "");
|
||||
restProperties.put(CatalogProperties.URI, restUri);
|
||||
restProperties.put(CatalogProperties.FILE_IO_IMPL, DorisIcebergRestResolvedIO.class.getName());
|
||||
restProperties.putAll(catalogProperty.getProperties());
|
||||
|
||||
Configuration conf = replaceS3Properties(getConfiguration());
|
||||
|
||||
RESTCatalog restCatalog = new RESTCatalog();
|
||||
restCatalog.setConf(conf);
|
||||
restCatalog.initialize(icebergCatalogType, restProperties);
|
||||
catalog = restCatalog;
|
||||
catalog = CatalogUtil.buildIcebergCatalog(icebergCatalogType,
|
||||
convertToRestCatalogProperties(),
|
||||
conf);
|
||||
}
|
||||
|
||||
private Configuration replaceS3Properties(Configuration conf) {
|
||||
@ -71,4 +67,30 @@ public class IcebergRestExternalCatalog extends IcebergExternalCatalog {
|
||||
catalogProperties.getOrDefault(Constants.S3GUARD_CONSISTENCY_RETRY_LIMIT, "1"));
|
||||
return conf;
|
||||
}
|
||||
|
||||
private Map<String, String> convertToRestCatalogProperties() {
|
||||
|
||||
Map<String, String> props = catalogProperty.getProperties();
|
||||
Map<String, String> restProperties = new HashMap<>(props);
|
||||
restProperties.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName());
|
||||
restProperties.put(CatalogUtil.ICEBERG_CATALOG_TYPE, CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
|
||||
String restUri = props.getOrDefault(CatalogProperties.URI, "");
|
||||
restProperties.put(CatalogProperties.URI, restUri);
|
||||
if (props.containsKey(S3Properties.ENDPOINT)) {
|
||||
restProperties.put(S3FileIOProperties.ENDPOINT, props.get(S3Properties.ENDPOINT));
|
||||
}
|
||||
if (props.containsKey(S3Properties.ACCESS_KEY)) {
|
||||
restProperties.put(S3FileIOProperties.ACCESS_KEY_ID, props.get(S3Properties.ACCESS_KEY));
|
||||
}
|
||||
if (props.containsKey(S3Properties.SECRET_KEY)) {
|
||||
restProperties.put(S3FileIOProperties.SECRET_ACCESS_KEY, props.get(S3Properties.SECRET_KEY));
|
||||
}
|
||||
if (props.containsKey(S3Properties.REGION)) {
|
||||
restProperties.put(AwsClientProperties.CLIENT_REGION, props.get(S3Properties.REGION));
|
||||
}
|
||||
if (props.containsKey(PropertyConverter.USE_PATH_STYLE)) {
|
||||
restProperties.put(S3FileIOProperties.PATH_STYLE_ACCESS, props.get(PropertyConverter.USE_PATH_STYLE));
|
||||
}
|
||||
return restProperties;
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,21 +19,13 @@ package org.apache.doris.datasource.iceberg.dlf;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
|
||||
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
|
||||
import org.apache.hadoop.hive.metastore.api.Table;
|
||||
import org.apache.iceberg.ClientPool;
|
||||
import org.apache.iceberg.exceptions.NoSuchTableException;
|
||||
import org.apache.iceberg.hive.HiveTableOperations;
|
||||
import org.apache.iceberg.io.FileIO;
|
||||
import shade.doris.hive.org.apache.thrift.TException;
|
||||
|
||||
public class DLFTableOperations extends HiveTableOperations {
|
||||
|
||||
private final ClientPool<IMetaStoreClient, TException> metaClients;
|
||||
private final String database;
|
||||
private final String tableName;
|
||||
private final int metadataRefreshMaxRetries;
|
||||
|
||||
public DLFTableOperations(Configuration conf,
|
||||
ClientPool<IMetaStoreClient, TException> metaClients,
|
||||
FileIO fileIO,
|
||||
@ -41,31 +33,5 @@ public class DLFTableOperations extends HiveTableOperations {
|
||||
String database,
|
||||
String table) {
|
||||
super(conf, metaClients, fileIO, catalogName, database, table);
|
||||
this.metaClients = metaClients;
|
||||
this.database = database;
|
||||
this.tableName = table;
|
||||
metadataRefreshMaxRetries = conf.getInt(
|
||||
"iceberg.hive.metadata-refresh-max-retries", 2);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRefresh() {
|
||||
String metadataLocation = null;
|
||||
try {
|
||||
Table table = metaClients.run(client -> client.getTable(database, tableName));
|
||||
metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);
|
||||
} catch (NoSuchObjectException e) {
|
||||
if (currentMetadataLocation() != null) {
|
||||
throw new NoSuchTableException("No such table: %s.%s", database, tableName);
|
||||
}
|
||||
} catch (TException e) {
|
||||
String errMsg = String.format("Failed to get table info from metastore %s.%s", database, tableName);
|
||||
throw new RuntimeException(errMsg, e);
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException("Interrupted during refresh", e);
|
||||
}
|
||||
refreshFromMetadataLocation(metadataLocation, metadataRefreshMaxRetries);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,225 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.datasource.iceberg.rest;
|
||||
|
||||
import org.apache.doris.common.util.S3Util;
|
||||
import org.apache.doris.datasource.credentials.CloudCredential;
|
||||
import org.apache.doris.datasource.property.constants.OssProperties;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties.Env;
|
||||
|
||||
import com.amazonaws.glue.catalog.util.AWSGlueConfig;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.iceberg.CatalogUtil;
|
||||
import org.apache.iceberg.aws.s3.S3FileIO;
|
||||
import org.apache.iceberg.hadoop.HadoopConfigurable;
|
||||
import org.apache.iceberg.hadoop.SerializableConfiguration;
|
||||
import org.apache.iceberg.io.FileIO;
|
||||
import org.apache.iceberg.io.InputFile;
|
||||
import org.apache.iceberg.io.OutputFile;
|
||||
import org.apache.iceberg.util.SerializableMap;
|
||||
import org.apache.iceberg.util.SerializableSupplier;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* FileIO implementation that uses location scheme to choose the correct FileIO implementation.
|
||||
* Copy from org.apache.iceberg.io.ResolvingFileIO and only modify the io for s3 (to set region)
|
||||
* */
|
||||
public class DorisIcebergRestResolvedIO implements FileIO, HadoopConfigurable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DorisIcebergRestResolvedIO.class);
|
||||
private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO";
|
||||
private static final String S3_FILE_IO_IMPL = "org.apache.iceberg.aws.s3.S3FileIO";
|
||||
private static final Map<String, String> SCHEME_TO_FILE_IO =
|
||||
ImmutableMap.of(
|
||||
"s3", S3_FILE_IO_IMPL,
|
||||
"s3a", S3_FILE_IO_IMPL,
|
||||
"s3n", S3_FILE_IO_IMPL);
|
||||
|
||||
private final Map<String, FileIO> ioInstances = Maps.newHashMap();
|
||||
private SerializableMap<String, String> properties;
|
||||
private SerializableSupplier<Configuration> hadoopConf;
|
||||
|
||||
/**
|
||||
* No-arg constructor to load the FileIO dynamically.
|
||||
*
|
||||
* <p>All fields are initialized by calling {@link DorisIcebergRestResolvedIO#initialize(Map)} later.
|
||||
*/
|
||||
public DorisIcebergRestResolvedIO() {}
|
||||
|
||||
@Override
|
||||
public InputFile newInputFile(String location) {
|
||||
return io(location).newInputFile(location);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputFile newInputFile(String location, long length) {
|
||||
return io(location).newInputFile(location, length);
|
||||
}
|
||||
|
||||
@Override
|
||||
public OutputFile newOutputFile(String location) {
|
||||
return io(location).newOutputFile(location);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteFile(String location) {
|
||||
io(location).deleteFile(location);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> properties() {
|
||||
return properties.immutableMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(Map<String, String> newProperties) {
|
||||
close(); // close and discard any existing FileIO instances
|
||||
this.properties = SerializableMap.copyOf(newProperties);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
List<FileIO> instances = Lists.newArrayList();
|
||||
|
||||
synchronized (ioInstances) {
|
||||
instances.addAll(ioInstances.values());
|
||||
ioInstances.clear();
|
||||
}
|
||||
|
||||
for (FileIO io : instances) {
|
||||
io.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serializeConfWith(
|
||||
Function<Configuration, SerializableSupplier<Configuration>> confSerializer) {
|
||||
this.hadoopConf = confSerializer.apply(hadoopConf.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.hadoopConf = new SerializableConfiguration(conf)::get;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return hadoopConf.get();
|
||||
}
|
||||
|
||||
private FileIO io(String location) {
|
||||
String impl = implFromLocation(location);
|
||||
FileIO io = ioInstances.get(impl);
|
||||
if (io != null) {
|
||||
return io;
|
||||
}
|
||||
|
||||
synchronized (ioInstances) {
|
||||
// double check while holding the lock
|
||||
io = ioInstances.get(impl);
|
||||
if (io != null) {
|
||||
return io;
|
||||
}
|
||||
|
||||
Configuration conf = hadoopConf.get();
|
||||
|
||||
try {
|
||||
if (impl.equals(S3_FILE_IO_IMPL)) {
|
||||
io = createS3FileIO(properties);
|
||||
} else {
|
||||
io = CatalogUtil.loadFileIO(impl, properties, conf);
|
||||
}
|
||||
} catch (IllegalArgumentException e) {
|
||||
if (impl.equals(FALLBACK_IMPL)) {
|
||||
// no implementation to fall back to, throw the exception
|
||||
throw e;
|
||||
} else {
|
||||
// couldn't load the normal class, fall back to HadoopFileIO
|
||||
LOG.warn(
|
||||
"Failed to load FileIO implementation: {}, falling back to {}",
|
||||
impl,
|
||||
FALLBACK_IMPL,
|
||||
e);
|
||||
try {
|
||||
io = CatalogUtil.loadFileIO(FALLBACK_IMPL, properties, conf);
|
||||
} catch (IllegalArgumentException suppressed) {
|
||||
LOG.warn(
|
||||
"Failed to load FileIO implementation: {} (fallback)", FALLBACK_IMPL, suppressed);
|
||||
// both attempts failed, throw the original exception with the later exception
|
||||
// suppressed
|
||||
e.addSuppressed(suppressed);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ioInstances.put(impl, io);
|
||||
}
|
||||
|
||||
return io;
|
||||
}
|
||||
|
||||
private static String implFromLocation(String location) {
|
||||
return SCHEME_TO_FILE_IO.getOrDefault(scheme(location), FALLBACK_IMPL);
|
||||
}
|
||||
|
||||
private static String scheme(String location) {
|
||||
int colonPos = location.indexOf(":");
|
||||
if (colonPos > 0) {
|
||||
return location.substring(0, colonPos);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
protected FileIO createS3FileIO(Map<String, String> properties) {
|
||||
|
||||
// get region
|
||||
String region = properties.getOrDefault(S3Properties.REGION,
|
||||
properties.getOrDefault(AWSGlueConfig.AWS_REGION, properties.get(Env.REGION)));
|
||||
|
||||
// get endpoint
|
||||
String s3Endpoint = properties.getOrDefault(S3Properties.ENDPOINT, properties.get(Env.ENDPOINT));
|
||||
URI endpointUri = URI.create(s3Endpoint);
|
||||
|
||||
// set credential
|
||||
CloudCredential credential = new CloudCredential();
|
||||
credential.setAccessKey(properties.getOrDefault(S3Properties.ACCESS_KEY,
|
||||
properties.get(S3Properties.Env.ACCESS_KEY)));
|
||||
credential.setSecretKey(properties.getOrDefault(S3Properties.SECRET_KEY,
|
||||
properties.get(S3Properties.Env.SECRET_KEY)));
|
||||
if (properties.containsKey(OssProperties.SESSION_TOKEN)
|
||||
|| properties.containsKey(S3Properties.Env.TOKEN)) {
|
||||
credential.setSessionToken(properties.getOrDefault(OssProperties.SESSION_TOKEN,
|
||||
properties.get(S3Properties.Env.TOKEN)));
|
||||
}
|
||||
|
||||
FileIO io = new S3FileIO(() -> S3Util.buildS3Client(endpointUri, region, credential));
|
||||
io.initialize(properties);
|
||||
return io;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user