[fix](glue)support access glue iceberg with credential list (#30473)
merge from #30292
This commit is contained in:
@ -21,8 +21,15 @@ import org.apache.doris.datasource.credentials.CloudCredential;
|
||||
|
||||
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
|
||||
import software.amazon.awssdk.auth.credentials.AwsCredentials;
|
||||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
|
||||
import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
|
||||
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
|
||||
import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
|
||||
import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
|
||||
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
|
||||
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
|
||||
import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
|
||||
import software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider;
|
||||
import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
|
||||
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
|
||||
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
|
||||
@ -39,7 +46,7 @@ import java.time.Duration;
|
||||
public class S3Util {
|
||||
|
||||
public static S3Client buildS3Client(URI endpoint, String region, CloudCredential credential) {
|
||||
StaticCredentialsProvider scp;
|
||||
AwsCredentialsProvider scp;
|
||||
AwsCredentials awsCredential;
|
||||
if (!credential.isTemporary()) {
|
||||
awsCredential = AwsBasicCredentials.create(credential.getAccessKey(), credential.getSecretKey());
|
||||
@ -47,7 +54,16 @@ public class S3Util {
|
||||
awsCredential = AwsSessionCredentials.create(credential.getAccessKey(), credential.getSecretKey(),
|
||||
credential.getSessionToken());
|
||||
}
|
||||
scp = StaticCredentialsProvider.create(awsCredential);
|
||||
if (!credential.isWhole()) {
|
||||
scp = AwsCredentialsProviderChain.of(
|
||||
SystemPropertyCredentialsProvider.create(),
|
||||
EnvironmentVariableCredentialsProvider.create(),
|
||||
WebIdentityTokenFileCredentialsProvider.create(),
|
||||
ProfileCredentialsProvider.create(),
|
||||
InstanceProfileCredentialsProvider.create());
|
||||
} else {
|
||||
scp = StaticCredentialsProvider.create(awsCredential);
|
||||
}
|
||||
EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy
|
||||
.builder()
|
||||
.baseDelay(Duration.ofSeconds(1))
|
||||
|
||||
@ -23,6 +23,7 @@ import org.apache.doris.datasource.property.PropertyConverter;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.iceberg.CatalogProperties;
|
||||
import org.apache.iceberg.hadoop.HadoopCatalog;
|
||||
|
||||
@ -54,10 +55,11 @@ public class IcebergHadoopExternalCatalog extends IcebergExternalCatalog {
|
||||
protected void initLocalObjectsImpl() {
|
||||
icebergCatalogType = ICEBERG_HADOOP;
|
||||
HadoopCatalog hadoopCatalog = new HadoopCatalog();
|
||||
hadoopCatalog.setConf(getConfiguration());
|
||||
Configuration conf = getConfiguration();
|
||||
// initialize hive catalog
|
||||
Map<String, String> catalogProperties = new HashMap<>();
|
||||
String warehouse = catalogProperty.getProperties().get(CatalogProperties.WAREHOUSE_LOCATION);
|
||||
String warehouse = catalogProperty.getHadoopProperties().get(CatalogProperties.WAREHOUSE_LOCATION);
|
||||
hadoopCatalog.setConf(conf);
|
||||
catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
|
||||
hadoopCatalog.initialize(icebergCatalogType, catalogProperties);
|
||||
catalog = hadoopCatalog;
|
||||
|
||||
@ -18,9 +18,9 @@
|
||||
package org.apache.doris.datasource.iceberg;
|
||||
|
||||
import org.apache.doris.datasource.CatalogProperty;
|
||||
import org.apache.doris.datasource.credentials.DataLakeAWSCredentialsProvider;
|
||||
import org.apache.doris.datasource.iceberg.rest.DorisIcebergRestResolvedIO;
|
||||
import org.apache.doris.datasource.property.PropertyConverter;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.s3a.Constants;
|
||||
@ -58,9 +58,8 @@ public class IcebergRestExternalCatalog extends IcebergExternalCatalog {
|
||||
|
||||
private Configuration replaceS3Properties(Configuration conf) {
|
||||
Map<String, String> catalogProperties = catalogProperty.getHadoopProperties();
|
||||
String credentials = catalogProperties
|
||||
.getOrDefault(Constants.AWS_CREDENTIALS_PROVIDER, DataLakeAWSCredentialsProvider.class.getName());
|
||||
conf.set(Constants.AWS_CREDENTIALS_PROVIDER, credentials);
|
||||
String defaultProviderList = String.join(",", S3Properties.AWS_CREDENTIALS_PROVIDERS);
|
||||
conf.set(Constants.AWS_CREDENTIALS_PROVIDER, defaultProviderList);
|
||||
String usePahStyle = catalogProperties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "true");
|
||||
// Set path style
|
||||
conf.set(PropertyConverter.USE_PATH_STYLE, usePahStyle);
|
||||
|
||||
@ -56,7 +56,7 @@ public class S3Properties extends BaseProperties {
|
||||
public static final String ROOT_PATH = "s3.root.path";
|
||||
public static final String BUCKET = "s3.bucket";
|
||||
public static final String VALIDITY_CHECK = "s3_validity_check";
|
||||
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT, ACCESS_KEY, SECRET_KEY);
|
||||
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT);
|
||||
public static final List<String> TVF_REQUIRED_FIELDS = Arrays.asList(ACCESS_KEY, SECRET_KEY);
|
||||
public static final List<String> FS_KEYS = Arrays.asList(ENDPOINT, REGION, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN,
|
||||
ROOT_PATH, BUCKET, MAX_CONNECTIONS, REQUEST_TIMEOUT_MS, CONNECTION_TIMEOUT_MS);
|
||||
@ -100,7 +100,7 @@ public class S3Properties extends BaseProperties {
|
||||
public static final String DEFAULT_MAX_CONNECTIONS = "50";
|
||||
public static final String DEFAULT_REQUEST_TIMEOUT_MS = "3000";
|
||||
public static final String DEFAULT_CONNECTION_TIMEOUT_MS = "1000";
|
||||
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT, ACCESS_KEY, SECRET_KEY);
|
||||
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT);
|
||||
public static final List<String> FS_KEYS = Arrays.asList(ENDPOINT, REGION, ACCESS_KEY, SECRET_KEY, TOKEN,
|
||||
ROOT_PATH, BUCKET, MAX_CONNECTIONS, REQUEST_TIMEOUT_MS, CONNECTION_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
@ -117,12 +117,7 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
|
||||
if (Strings.isNullOrEmpty(props.get(S3Properties.REGION))) {
|
||||
throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.REGION));
|
||||
}
|
||||
if (Strings.isNullOrEmpty(props.get(S3Properties.ACCESS_KEY))) {
|
||||
throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.ACCESS_KEY));
|
||||
}
|
||||
if (Strings.isNullOrEmpty(props.get(S3Properties.SECRET_KEY))) {
|
||||
throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.SECRET_KEY));
|
||||
}
|
||||
// do not check ak and sk, because we can read them from system environment.
|
||||
}
|
||||
|
||||
private String getEndpointAndSetVirtualBucket(S3URI s3uri, Map<String, String> props)
|
||||
|
||||
Reference in New Issue
Block a user