[fix](multi-catalog) fix iceberg defalut credentials proviers (#31704)
This commit is contained in:
@ -21,10 +21,14 @@ import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.datasource.InitCatalogLog;
|
||||
import org.apache.doris.datasource.SessionContext;
|
||||
import org.apache.doris.datasource.operations.ExternalMetadataOperations;
|
||||
import org.apache.doris.datasource.property.PropertyConverter;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.s3a.Constants;
|
||||
import org.apache.iceberg.catalog.Catalog;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public abstract class IcebergExternalCatalog extends ExternalCatalog {
|
||||
|
||||
@ -76,4 +80,9 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
|
||||
makeSureInitialized();
|
||||
return metadataOps.listTableNames(dbName);
|
||||
}
|
||||
|
||||
protected void initS3Param(Configuration conf) {
|
||||
Map<String, String> properties = catalogProperty.getHadoopProperties();
|
||||
conf.set(Constants.AWS_CREDENTIALS_PROVIDER, PropertyConverter.getAWSCredentialsProviders(properties));
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.datasource.CatalogProperty;
|
||||
import org.apache.doris.datasource.property.PropertyConverter;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.s3a.Constants;
|
||||
import org.apache.iceberg.CatalogProperties;
|
||||
import org.apache.iceberg.aws.glue.GlueCatalog;
|
||||
@ -45,7 +46,9 @@ public class IcebergGlueExternalCatalog extends IcebergExternalCatalog {
|
||||
protected void initCatalog() {
|
||||
icebergCatalogType = ICEBERG_GLUE;
|
||||
GlueCatalog glueCatalog = new GlueCatalog();
|
||||
glueCatalog.setConf(getConfiguration());
|
||||
Configuration conf = getConfiguration();
|
||||
initS3Param(conf);
|
||||
glueCatalog.setConf(conf);
|
||||
// initialize glue catalog
|
||||
Map<String, String> catalogProperties = catalogProperty.getHadoopProperties();
|
||||
String warehouse = catalogProperty.getOrDefault(CatalogProperties.WAREHOUSE_LOCATION, CHECKED_WAREHOUSE);
|
||||
|
||||
@ -60,6 +60,7 @@ public class IcebergHMSExternalCatalog extends IcebergExternalCatalog {
|
||||
HadoopUGI.tryKrbLogin(this.getName(), AuthenticationConfig.getKerberosConfig(hiveConf,
|
||||
AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
|
||||
AuthenticationConfig.HADOOP_KERBEROS_KEYTAB));
|
||||
initS3Param(hiveConf);
|
||||
HMSCachedClient cachedClient = HiveMetadataOps.createCachedClient(hiveConf, 1, null);
|
||||
String location = cachedClient.getCatalogLocation("hive");
|
||||
catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, location);
|
||||
|
||||
@ -56,6 +56,7 @@ public class IcebergHadoopExternalCatalog extends IcebergExternalCatalog {
|
||||
icebergCatalogType = ICEBERG_HADOOP;
|
||||
HadoopCatalog hadoopCatalog = new HadoopCatalog();
|
||||
Configuration conf = getConfiguration();
|
||||
initS3Param(conf);
|
||||
// initialize hive catalog
|
||||
Map<String, String> catalogProperties = new HashMap<>();
|
||||
String warehouse = catalogProperty.getHadoopProperties().get(CatalogProperties.WAREHOUSE_LOCATION);
|
||||
|
||||
@ -54,8 +54,7 @@ public class IcebergRestExternalCatalog extends IcebergExternalCatalog {
|
||||
|
||||
private Configuration replaceS3Properties(Configuration conf) {
|
||||
Map<String, String> catalogProperties = catalogProperty.getHadoopProperties();
|
||||
String defaultProviderList = String.join(",", S3Properties.AWS_CREDENTIALS_PROVIDERS);
|
||||
conf.set(Constants.AWS_CREDENTIALS_PROVIDER, defaultProviderList);
|
||||
initS3Param(conf);
|
||||
String usePahStyle = catalogProperties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "true");
|
||||
// Set path style
|
||||
conf.set(PropertyConverter.USE_PATH_STYLE, usePahStyle);
|
||||
|
||||
@ -263,9 +263,7 @@ public class PropertyConverter {
|
||||
s3Properties.put(Constants.MAX_ERROR_RETRIES, "2");
|
||||
s3Properties.put("fs.s3.impl.disable.cache", "true");
|
||||
s3Properties.putIfAbsent("fs.s3.impl", S3AFileSystem.class.getName());
|
||||
String defaultProviderList = String.join(",", S3Properties.AWS_CREDENTIALS_PROVIDERS);
|
||||
String credentialsProviders = properties
|
||||
.getOrDefault(S3Properties.CREDENTIALS_PROVIDER, defaultProviderList);
|
||||
String credentialsProviders = getAWSCredentialsProviders(properties);
|
||||
s3Properties.put(Constants.AWS_CREDENTIALS_PROVIDER, credentialsProviders);
|
||||
if (credential.isWhole()) {
|
||||
s3Properties.put(Constants.ACCESS_KEY, credential.getAccessKey());
|
||||
@ -285,6 +283,18 @@ public class PropertyConverter {
|
||||
}
|
||||
}
|
||||
|
||||
public static String getAWSCredentialsProviders(Map<String, String> properties) {
|
||||
String credentialsProviders;
|
||||
String hadoopCredProviders = properties.get(Constants.AWS_CREDENTIALS_PROVIDER);
|
||||
if (hadoopCredProviders != null) {
|
||||
credentialsProviders = hadoopCredProviders;
|
||||
} else {
|
||||
String defaultProviderList = String.join(",", S3Properties.AWS_CREDENTIALS_PROVIDERS);
|
||||
credentialsProviders = properties.getOrDefault(S3Properties.CREDENTIALS_PROVIDER, defaultProviderList);
|
||||
}
|
||||
return credentialsProviders;
|
||||
}
|
||||
|
||||
private static Map<String, String> convertToGCSProperties(Map<String, String> props, CloudCredential credential) {
|
||||
// Now we use s3 client to access
|
||||
return convertToS3Properties(S3Properties.prefixToS3(props), credential);
|
||||
|
||||
@ -26,6 +26,10 @@ import org.apache.doris.datasource.property.PropertyConverter;
|
||||
import org.apache.doris.thrift.TS3StorageParam;
|
||||
|
||||
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
|
||||
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
|
||||
import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
|
||||
import com.amazonaws.auth.WebIdentityTokenCredentialsProvider;
|
||||
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
|
||||
@ -66,6 +70,10 @@ public class S3Properties extends BaseProperties {
|
||||
TemporaryAWSCredentialsProvider.class.getName(),
|
||||
SimpleAWSCredentialsProvider.class.getName(),
|
||||
EnvironmentVariableCredentialsProvider.class.getName(),
|
||||
SystemPropertiesCredentialsProvider.class.getName(),
|
||||
ProfileCredentialsProvider.class.getName(),
|
||||
InstanceProfileCredentialsProvider.class.getName(),
|
||||
WebIdentityTokenCredentialsProvider.class.getName(),
|
||||
IAMInstanceCredentialsProvider.class.getName());
|
||||
|
||||
public static Map<String, String> credentialToMap(CloudCredentialWithEndpoint credential) {
|
||||
|
||||
Reference in New Issue
Block a user