[feature-wip](multi-catalog)add properties converter (#18005)

Refactor properties of each cloud , use property converter to convert properties accessing fe
metadata and be data.
user docs #18287
This commit is contained in:
slothever
2023-04-06 09:55:30 +08:00
committed by GitHub
parent 66a0c090b8
commit d0219180a9
49 changed files with 1697 additions and 684 deletions

View File

@ -22,6 +22,8 @@ import org.apache.doris.backup.BlobStorage;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.S3ClientBEProperties;
import org.apache.doris.datasource.property.constants.BosProperties;
import org.apache.doris.thrift.TFileType;
import com.google.common.collect.Maps;
@ -47,6 +49,7 @@ public class BrokerDesc extends StorageDesc implements Writable {
// just for multi load
public static final String MULTI_LOAD_BROKER = "__DORIS_MULTI_LOAD_BROKER__";
public static final String MULTI_LOAD_BROKER_BACKEND_KEY = "__DORIS_MULTI_LOAD_BROKER_BACKEND__";
private boolean convertedToS3 = false;
// Only used for recovery
private BrokerDesc() {
@ -72,7 +75,11 @@ public class BrokerDesc extends StorageDesc implements Writable {
} else {
this.storageType = StorageBackend.StorageType.BROKER;
}
tryConvertToS3();
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
this.convertedToS3 = BosProperties.tryConvertBosToS3(this.properties, this.storageType);
if (this.convertedToS3) {
this.storageType = StorageBackend.StorageType.S3;
}
}
public BrokerDesc(String name, StorageBackend.StorageType storageType, Map<String, String> properties) {
@ -82,7 +89,15 @@ public class BrokerDesc extends StorageDesc implements Writable {
this.properties = Maps.newHashMap();
}
this.storageType = storageType;
tryConvertToS3();
this.properties.putAll(S3ClientBEProperties.getBeFSProperties(this.properties));
this.convertedToS3 = BosProperties.tryConvertBosToS3(this.properties, this.storageType);
if (this.convertedToS3) {
this.storageType = StorageBackend.StorageType.S3;
}
}
public String getFileLocation(String location) {
return this.convertedToS3 ? BosProperties.convertPathToS3(location) : location;
}
public static BrokerDesc createForStreamLoad() {
@ -90,18 +105,6 @@ public class BrokerDesc extends StorageDesc implements Writable {
return brokerDesc;
}
public String getName() {
return name;
}
public Map<String, String> getProperties() {
return properties;
}
public StorageBackend.StorageType getStorageType() {
return storageType;
}
public boolean isMultiLoadBroker() {
return this.name.equalsIgnoreCase(MULTI_LOAD_BROKER);
}

View File

@ -47,11 +47,11 @@ public class CreateRepositoryStmt extends DdlStmt {
}
public String getBrokerName() {
return storage.getStorageName();
return storage.getStorageDesc().getName();
}
public StorageBackend.StorageType getStorageType() {
return storage.getStorageType();
return storage.getStorageDesc().getStorageType();
}
public String getLocation() {
@ -59,7 +59,7 @@ public class CreateRepositoryStmt extends DdlStmt {
}
public Map<String, String> getProperties() {
return storage.getProperties();
return storage.getStorageDesc().getProperties();
}
@Override

View File

@ -407,8 +407,8 @@ public class LoadStmt extends DdlStmt {
}
if (brokerDesc != null && !brokerDesc.isMultiLoadBroker()) {
for (int i = 0; i < dataDescription.getFilePaths().size(); i++) {
dataDescription.getFilePaths().set(i,
brokerDesc.convertPathToS3(dataDescription.getFilePaths().get(i)));
String location = brokerDesc.getFileLocation(dataDescription.getFilePaths().get(i));
dataDescription.getFilePaths().set(i, location);
dataDescription.getFilePaths().set(i,
ExportStmt.checkPath(dataDescription.getFilePaths().get(i), brokerDesc.getStorageType()));
}

View File

@ -18,10 +18,8 @@
package org.apache.doris.analysis;
import org.apache.doris.backup.HdfsStorage;
import org.apache.doris.backup.S3Storage;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.S3Resource;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
@ -33,6 +31,8 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.ParseUtil;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TParquetCompressionType;
@ -633,7 +633,8 @@ public class OutFileClause {
if (entry.getKey().startsWith(BROKER_PROP_PREFIX) && !entry.getKey().equals(PROP_BROKER_NAME)) {
brokerProps.put(entry.getKey().substring(BROKER_PROP_PREFIX.length()), entry.getValue());
processedPropKeys.add(entry.getKey());
} else if (entry.getKey().toUpperCase().startsWith(S3Resource.S3_PROPERTIES_PREFIX)) {
} else if (entry.getKey().toLowerCase().startsWith(S3Properties.S3_PREFIX)
|| entry.getKey().toUpperCase().startsWith(S3Properties.Env.PROPERTIES_PREFIX)) {
brokerProps.put(entry.getKey(), entry.getValue());
processedPropKeys.add(entry.getKey());
} else if (entry.getKey().contains(HdfsResource.HADOOP_FS_NAME)
@ -648,11 +649,11 @@ public class OutFileClause {
}
}
if (storageType == StorageBackend.StorageType.S3) {
if (properties.containsKey(S3Resource.USE_PATH_STYLE)) {
brokerProps.put(S3Resource.USE_PATH_STYLE, properties.get(S3Resource.USE_PATH_STYLE));
processedPropKeys.add(S3Resource.USE_PATH_STYLE);
if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) {
brokerProps.put(PropertyConverter.USE_PATH_STYLE, properties.get(PropertyConverter.USE_PATH_STYLE));
processedPropKeys.add(PropertyConverter.USE_PATH_STYLE);
}
S3Storage.checkS3(brokerProps);
S3Properties.requiredS3Properties(brokerProps);
} else if (storageType == StorageBackend.StorageType.HDFS) {
if (!brokerProps.containsKey(HdfsResource.HADOOP_FS_NAME)) {
brokerProps.put(HdfsResource.HADOOP_FS_NAME, HdfsStorage.getFsName(filePath));

View File

@ -22,46 +22,37 @@ import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.property.constants.BosProperties;
import org.apache.doris.thrift.TStorageBackendType;
import com.google.common.base.Strings;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class StorageBackend extends StorageDesc implements ParseNode {
private static final Logger LOG = LoggerFactory.getLogger(StorageBackend.class);
public class StorageBackend implements ParseNode {
private String location;
private StorageType storageType;
private Map<String, String> properties;
private StorageDesc storageDesc;
public StorageBackend(String storageName, String location,
StorageType storageType, Map<String, String> properties) {
this.name = storageName;
this.storageDesc = new StorageDesc(storageName, storageType, properties);
this.location = location;
this.storageType = storageType;
this.properties = properties;
tryConvertToS3();
this.location = convertPathToS3(location);
boolean convertedToS3 = BosProperties.tryConvertBosToS3(properties, storageType);
if (convertedToS3) {
this.storageDesc.setStorageType(StorageBackend.StorageType.S3);
this.location = BosProperties.convertPathToS3(location);
} else {
this.location = location;
}
}
public StorageType getStorageType() {
return storageType;
public void setStorageDesc(StorageDesc storageDesc) {
this.storageDesc = storageDesc;
}
public void setStorageType(StorageType storageType) {
this.storageType = storageType;
}
public String getStorageName() {
return name;
}
public void setStorageName(String storageName) {
this.name = storageName;
public StorageDesc getStorageDesc() {
return storageDesc;
}
public String getLocation() {
@ -72,24 +63,17 @@ public class StorageBackend extends StorageDesc implements ParseNode {
this.location = location;
}
public Map<String, String> getProperties() {
return properties;
}
public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
if (this.storageType != StorageType.BROKER && StringUtils.isEmpty(name)) {
name = this.storageType.name();
StorageBackend.StorageType storageType = storageDesc.getStorageType();
if (storageType != StorageType.BROKER && StringUtils.isEmpty(storageDesc.getName())) {
storageDesc.setName(storageType.name());
}
if (this.storageType != StorageType.BROKER && this.storageType != StorageType.S3
&& this.storageType != StorageType.HDFS) {
throw new NotImplementedException(this.storageType.toString() + " is not support now.");
if (storageType != StorageType.BROKER && storageType != StorageType.S3
&& storageType != StorageType.HDFS) {
throw new NotImplementedException(storageType.toString() + " is not support now.");
}
FeNameFormat.checkCommonName("repository", name);
FeNameFormat.checkCommonName("repository", storageDesc.getName());
if (Strings.isNullOrEmpty(location)) {
throw new AnalysisException("You must specify a location on the repository");
@ -100,12 +84,14 @@ public class StorageBackend extends StorageDesc implements ParseNode {
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
StorageBackend.StorageType storageType = storageDesc.getStorageType();
sb.append(storageType.name());
if (storageType == StorageType.BROKER) {
sb.append(" `").append(name).append("`");
sb.append(" `").append(storageDesc.getName()).append("`");
}
sb.append(" ON LOCATION ").append(location).append(" PROPERTIES(")
.append(new PrintableMap<>(properties, " = ", true, false)).append(")");
.append(new PrintableMap<>(storageDesc.getProperties(), " = ", true, false))
.append(")");
return sb.toString();
}

View File

@ -17,77 +17,52 @@
package org.apache.doris.analysis;
import org.apache.doris.catalog.S3Resource;
import org.apache.doris.common.Config;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
public abstract class StorageDesc {
private static final Logger LOG = LoggerFactory.getLogger(StorageBackend.class);
// for bos
public static final String BOS_ENDPOINT = "bos_endpoint";
public static final String BOS_ACCESS_KEY = "bos_accesskey";
public static final String BOS_SECRET_ACCESS_KEY = "bos_secret_accesskey";
/**
* Describe storage properties
* The structure diagram is divided into three levels:
* StorageDesc
* / \
* BrokerDesc The other StorageBackend.StorageType desc
* |
* The broker's StorageBackend.StorageType desc
*/
public class StorageDesc {
protected String name;
protected StorageBackend.StorageType storageType;
protected Map<String, String> properties;
protected String name;
protected boolean convertedToS3 = false;
protected void tryConvertToS3() {
if (!Config.enable_access_file_without_broker || storageType != StorageBackend.StorageType.BROKER) {
return;
}
CaseInsensitiveMap ciProperties = new CaseInsensitiveMap();
ciProperties.putAll(properties);
if (StringUtils.isNotEmpty(ciProperties.get(BOS_ENDPOINT).toString())
&& StringUtils.isNotEmpty(ciProperties.get(BOS_ACCESS_KEY).toString())
&& StringUtils.isNotEmpty(ciProperties.get(BOS_SECRET_ACCESS_KEY).toString())) {
// bos endpoint like http[s]://gz.bcebos.com, we want to extract region gz,
// and convert to s3 endpoint http[s]://s3.gz.bcebos.com
String bosEndpiont = ciProperties.get(BOS_ENDPOINT).toString();
try {
URI uri = new URI(bosEndpiont);
String host = uri.getHost();
String[] hostSplit = host.split("\\.");
if (hostSplit.length < 3) {
return;
}
String region = hostSplit[0];
String s3Endpoint = new URIBuilder(uri).setHost("s3." + host).build().toString();
properties.clear();
properties.put(S3Resource.S3_ENDPOINT, s3Endpoint);
properties.put(S3Resource.S3_REGION, region);
properties.put(S3Resource.S3_ACCESS_KEY, ciProperties.get(BOS_ACCESS_KEY).toString());
properties.put(S3Resource.S3_SECRET_KEY, ciProperties.get(BOS_SECRET_ACCESS_KEY).toString());
storageType = StorageBackend.StorageType.S3;
convertedToS3 = true;
LOG.info("skip BROKER and access S3 directly.");
} catch (URISyntaxException e) {
LOG.warn(BOS_ENDPOINT + ": " + bosEndpiont + " is invalid.");
}
}
public StorageDesc() {}
public StorageDesc(String name, StorageBackend.StorageType storageType, Map<String, String> properties) {
this.name = name;
this.storageType = storageType;
this.properties = properties;
}
protected String convertPathToS3(String path) {
if (!convertedToS3) {
return path;
}
try {
URI orig = new URI(path);
URI s3url = new URI("s3", orig.getRawAuthority(),
orig.getRawPath(), orig.getRawQuery(), orig.getRawFragment());
return s3url.toString();
} catch (URISyntaxException e) {
return path;
}
public void setName(String name) {
this.name = name;
}
public void setStorageType(StorageBackend.StorageType storageType) {
this.storageType = storageType;
}
public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
public String getName() {
return name;
}
public StorageBackend.StorageType getStorageType() {
return storageType;
}
public Map<String, String> getProperties() {
return properties;
}
}

View File

@ -18,9 +18,11 @@
package org.apache.doris.backup;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.S3Resource;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3URI;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.commons.lang3.StringUtils;
@ -95,6 +97,16 @@ public class S3Storage extends BlobStorage {
public void setProperties(Map<String, String> properties) {
super.setProperties(properties);
caseInsensitiveProperties.putAll(properties);
if (!caseInsensitiveProperties.containsKey(S3Properties.ENDPOINT)) {
// try to get new properties from old version
// compatible with old version
S3Properties.convertToStdProperties(caseInsensitiveProperties);
}
try {
S3Properties.requiredS3Properties(properties);
} catch (DdlException e) {
throw new IllegalArgumentException(e);
}
// Virtual hosted-style is recommended in the s3 protocol.
// The path-style has been abandoned, but for some unexplainable reasons,
// the s3 client will determine whether the endpiont starts with `s3`
@ -113,29 +125,21 @@ public class S3Storage extends BlobStorage {
// That is, for S3 endpoint, ignore the `use_path_style` property, and the s3 client will automatically use
// virtual hosted-sytle.
// And for other endpoint, if `use_path_style` is true, use path style. Otherwise, use virtual hosted-sytle.
if (!caseInsensitiveProperties.get(S3Resource.S3_ENDPOINT).toLowerCase().startsWith("s3")) {
forceHostedStyle = !caseInsensitiveProperties.getOrDefault(S3Resource.USE_PATH_STYLE, "false")
if (!caseInsensitiveProperties.get(S3Properties.ENDPOINT).toLowerCase().startsWith("s3")) {
forceHostedStyle = !caseInsensitiveProperties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false")
.equalsIgnoreCase("true");
} else {
forceHostedStyle = false;
}
}
public static void checkS3(Map<String, String> properties) throws UserException {
for (String field : S3Resource.REQUIRED_FIELDS) {
if (!properties.containsKey(field)) {
throw new UserException(field + " not found.");
}
}
}
@Override
public FileSystem getFileSystem(String remotePath) throws UserException {
if (dfsFileSystem == null) {
checkS3(caseInsensitiveProperties);
S3Properties.requiredS3Properties(caseInsensitiveProperties);
Configuration conf = new Configuration();
System.setProperty("com.amazonaws.services.s3.enableV4", "true");
S3Resource.getS3HadoopProperties(caseInsensitiveProperties).forEach(conf::set);
PropertyConverter.convertToHadoopFSProperties(caseInsensitiveProperties).forEach(conf::set);
try {
dfsFileSystem = FileSystem.get(new URI(remotePath), conf);
} catch (Exception e) {
@ -147,19 +151,19 @@ public class S3Storage extends BlobStorage {
private S3Client getClient(String bucket) throws UserException {
if (client == null) {
checkS3(caseInsensitiveProperties);
URI tmpEndpoint = URI.create(caseInsensitiveProperties.get(S3Resource.S3_ENDPOINT));
S3Properties.requiredS3Properties(caseInsensitiveProperties);
URI tmpEndpoint = URI.create(caseInsensitiveProperties.get(S3Properties.ENDPOINT));
StaticCredentialsProvider scp;
if (!caseInsensitiveProperties.containsKey(S3Resource.S3_TOKEN)) {
if (!caseInsensitiveProperties.containsKey(S3Properties.SESSION_TOKEN)) {
AwsBasicCredentials awsBasic = AwsBasicCredentials.create(
caseInsensitiveProperties.get(S3Resource.S3_ACCESS_KEY),
caseInsensitiveProperties.get(S3Resource.S3_SECRET_KEY));
caseInsensitiveProperties.get(S3Properties.ACCESS_KEY),
caseInsensitiveProperties.get(S3Properties.SECRET_KEY));
scp = StaticCredentialsProvider.create(awsBasic);
} else {
AwsSessionCredentials awsSession = AwsSessionCredentials.create(
caseInsensitiveProperties.get(S3Resource.S3_ACCESS_KEY),
caseInsensitiveProperties.get(S3Resource.S3_SECRET_KEY),
caseInsensitiveProperties.get(S3Resource.S3_TOKEN));
caseInsensitiveProperties.get(S3Properties.ACCESS_KEY),
caseInsensitiveProperties.get(S3Properties.SECRET_KEY),
caseInsensitiveProperties.get(S3Properties.SESSION_TOKEN));
scp = StaticCredentialsProvider.create(awsSession);
}
EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy
@ -185,7 +189,7 @@ public class S3Storage extends BlobStorage {
client = S3Client.builder()
.endpointOverride(endpoint)
.credentialsProvider(scp)
.region(Region.of(caseInsensitiveProperties.get(S3Resource.S3_REGION)))
.region(Region.of(caseInsensitiveProperties.get(S3Properties.REGION)))
.overrideConfiguration(clientConf)
// disable chunkedEncoding because of bos not supported
// use virtual hosted-style access

View File

@ -19,19 +19,13 @@ package org.apache.doris.catalog;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.HMSProperties;
import com.amazonaws.glue.catalog.util.AWSGlueConfig;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@ -46,14 +40,6 @@ import java.util.Map;
* );
*/
public class HMSResource extends Resource {
private static final Logger LOG = LogManager.getLogger(HMSResource.class);
public static final String HIVE_METASTORE_TYPE = "hive.metastore.type";
public static final String DLF_TYPE = "dlf";
public static final String GLUE_TYPE = "glue";
public static final String HIVE_VERSION = "hive.version";
// required
public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
public static final List<String> REQUIRED_FIELDS = Collections.singletonList(HIVE_METASTORE_URIS);
@SerializedName(value = "properties")
private Map<String, String> properties;
@ -68,21 +54,19 @@ public class HMSResource extends Resource {
for (Map.Entry<String, String> kv : properties.entrySet()) {
replaceIfEffectiveValue(this.properties, kv.getKey(), kv.getValue());
}
this.properties = getPropertiesFromDLF(this.properties);
this.properties = getPropertiesFromGlue(this.properties);
this.properties = PropertyConverter.convertToMetaProperties(this.properties);
super.modifyProperties(this.properties);
}
@Override
protected void setProperties(Map<String, String> properties) throws DdlException {
for (String field : REQUIRED_FIELDS) {
for (String field : HMSProperties.REQUIRED_FIELDS) {
if (!properties.containsKey(field)) {
throw new DdlException("Missing [" + field + "] in properties.");
}
}
this.properties.putAll(properties);
this.properties = getPropertiesFromDLF(this.properties);
this.properties = getPropertiesFromGlue(this.properties);
this.properties = PropertyConverter.convertToMetaProperties(this.properties);
}
@Override
@ -97,106 +81,4 @@ public class HMSResource extends Resource {
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
}
}
public static Map<String, String> getPropertiesFromDLF(Map<String, String> props) {
Map<String, String> res = new HashMap<>();
if (LOG.isDebugEnabled()) {
LOG.debug("Get properties from hive-site.xml");
}
// read properties from hive-site.xml.
HiveConf hiveConf = new HiveConf();
String metastoreType = hiveConf.get(HIVE_METASTORE_TYPE);
String propsMetastoreType = props.get(HIVE_METASTORE_TYPE);
if (!DLF_TYPE.equalsIgnoreCase(metastoreType) && !DLF_TYPE.equalsIgnoreCase(propsMetastoreType)) {
return props;
}
// get following properties from hive-site.xml
// 1. region and endpoint. eg: cn-beijing
String region = hiveConf.get("dlf.catalog.region");
if (!Strings.isNullOrEmpty(region)) {
// See: https://help.aliyun.com/document_detail/31837.html
// And add "-internal" to access oss within vpc
res.put(S3Resource.S3_REGION, "oss-" + region);
String publicAccess = hiveConf.get("dlf.catalog.accessPublic", "false");
res.put(S3Resource.S3_ENDPOINT, getDLFEndpont(region, Boolean.parseBoolean(publicAccess)));
}
// 2. ak and sk
String ak = hiveConf.get("dlf.catalog.accessKeyId");
String sk = hiveConf.get("dlf.catalog.accessKeySecret");
if (!Strings.isNullOrEmpty(ak)) {
res.put(S3Resource.S3_ACCESS_KEY, ak);
}
if (!Strings.isNullOrEmpty(sk)) {
res.put(S3Resource.S3_SECRET_KEY, sk);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Get properties for oss in hive-site.xml: {}", res);
}
loadPropertiesFromDLFProps(res, props);
return res;
}
private static void loadPropertiesFromDLFProps(Map<String, String> res, Map<String, String> props) {
// add rewritten properties
String ak = props.get("dlf.catalog.accessKeyId");
String sk = props.get("dlf.catalog.accessKeySecret");
if (!Strings.isNullOrEmpty(ak) && !Strings.isNullOrEmpty(sk)) {
res.put(S3Resource.S3_ACCESS_KEY, ak);
res.put(S3Resource.S3_SECRET_KEY, sk);
}
String ifSetPublic = res.getOrDefault("dlf.catalog.accessPublic", "false");
String publicAccess = props.getOrDefault("dlf.catalog.accessPublic", ifSetPublic);
String region = props.get("dlf.catalog.region");
if (!Strings.isNullOrEmpty(region)) {
res.put(S3Resource.S3_REGION, "oss-" + region);
res.put(S3Resource.S3_ENDPOINT, getDLFEndpont(region, Boolean.parseBoolean(publicAccess)));
}
// add remain properties
res.putAll(props);
}
private static String getDLFEndpont(String region, boolean publicAccess) {
String prefix = "http://oss-";
String suffix = ".aliyuncs.com";
if (!publicAccess) {
suffix = "-internal" + suffix;
}
return prefix + region + suffix;
}
public static Map<String, String> getPropertiesFromGlue(Map<String, String> res) {
String metastoreType = res.get(HIVE_METASTORE_TYPE);
if (!GLUE_TYPE.equalsIgnoreCase(metastoreType)) {
return res;
}
// https://docs.aws.amazon.com/general/latest/gr/s3.html
// Convert:
// (
// "aws.region" = "us-east-1",
// "aws.glue.access-key" = "xx",
// "aws.glue.secret-key" = "yy"
// )
// To:
// (
// "AWS_REGION" = "us-east-1",
// "AWS_ENDPOINT" = "s3.us-east-1.amazonaws.com"
// "AWS_ACCESS_KEY" = "xx",
// "AWS_SCRETE_KEY" = "yy"
// )
String region = res.get(AWSGlueConfig.AWS_REGION);
if (!Strings.isNullOrEmpty(region)) {
res.put(S3Resource.S3_REGION, region);
res.put(S3Resource.S3_ENDPOINT, "s3." + region + ".amazonaws.com");
}
String ak = res.get(AWSGlueConfig.AWS_GLUE_ACCESS_KEY);
String sk = res.get(AWSGlueConfig.AWS_GLUE_SECRET_KEY);
if (!Strings.isNullOrEmpty(ak) && !Strings.isNullOrEmpty(sk)) {
res.put(S3Resource.S3_ACCESS_KEY, ak);
res.put(S3Resource.S3_SECRET_KEY, sk);
}
return res;
}
}

View File

@ -36,6 +36,7 @@ import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExprOpcode;
@ -148,7 +149,7 @@ public class HiveMetaStoreClientHelper {
hiveConf.set(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT.name(),
String.valueOf(Config.hive_metastore_client_timeout_second));
IMetaStoreClient metaStoreClient = null;
String type = hiveConf.get(HMSResource.HIVE_METASTORE_TYPE);
String type = hiveConf.get(HMSProperties.HIVE_METASTORE_TYPE);
try {
if ("dlf".equalsIgnoreCase(type)) {
// For aliyun DLF
@ -180,7 +181,7 @@ public class HiveMetaStoreClientHelper {
List<RemoteIterator<LocatedFileStatus>> remoteIterators = new ArrayList<>();
try {
if (remoteHiveTbl.getPartitionKeys().size() > 0) {
String metaStoreUris = hiveTable.getHiveProperties().get(HMSResource.HIVE_METASTORE_URIS);
String metaStoreUris = hiveTable.getHiveProperties().get(HMSProperties.HIVE_METASTORE_URIS);
// hive partitioned table, get file iterator from table partition sd info
List<Partition> hivePartitions = getHivePartitions(metaStoreUris, remoteHiveTbl,
hivePartitionPredicate);
@ -285,7 +286,7 @@ public class HiveMetaStoreClientHelper {
}
public static Table getTable(HiveTable hiveTable) throws DdlException {
IMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HMSResource.HIVE_METASTORE_URIS));
IMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HMSProperties.HIVE_METASTORE_URIS));
Table table;
try {
table = client.getTable(hiveTable.getHiveDb(), hiveTable.getHiveTable());
@ -891,7 +892,7 @@ public class HiveMetaStoreClientHelper {
hiveCatalog.setConf(conf);
// initialize hive catalog
Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, metastoreUri);
catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, metastoreUri);
catalogProperties.put("uri", metastoreUri);
hiveCatalog.initialize("hive", catalogProperties);

View File

@ -19,6 +19,8 @@ package org.apache.doris.catalog;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
@ -97,13 +99,13 @@ public class HiveTable extends Table {
// check hive properties
// hive.metastore.uris
String hiveMetaStoreUris = copiedProps.get(HMSResource.HIVE_METASTORE_URIS);
String hiveMetaStoreUris = copiedProps.get(HMSProperties.HIVE_METASTORE_URIS);
if (Strings.isNullOrEmpty(hiveMetaStoreUris)) {
throw new DdlException(String.format(
PROPERTY_MISSING_MSG, HMSResource.HIVE_METASTORE_URIS, HMSResource.HIVE_METASTORE_URIS));
PROPERTY_MISSING_MSG, HMSProperties.HIVE_METASTORE_URIS, HMSProperties.HIVE_METASTORE_URIS));
}
copiedProps.remove(HMSResource.HIVE_METASTORE_URIS);
hiveProperties.put(HMSResource.HIVE_METASTORE_URIS, hiveMetaStoreUris);
copiedProps.remove(HMSProperties.HIVE_METASTORE_URIS);
hiveProperties.put(HMSProperties.HIVE_METASTORE_URIS, hiveMetaStoreUris);
// check auth type
String authType = copiedProps.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION);
@ -146,7 +148,9 @@ public class HiveTable extends Table {
while (iter.hasNext()) {
Map.Entry<String, String> entry = iter.next();
String key = entry.getKey();
if (key.startsWith(HdfsResource.HADOOP_FS_PREFIX) || key.startsWith(S3Resource.S3_PROPERTIES_PREFIX)) {
if (key.startsWith(HdfsResource.HADOOP_FS_PREFIX)
|| key.startsWith(S3Properties.S3_PREFIX)
|| key.startsWith(S3Properties.Env.PROPERTIES_PREFIX)) {
hiveProperties.put(key, entry.getValue());
iter.remove();
}

View File

@ -21,19 +21,15 @@ import org.apache.doris.backup.S3Storage;
import org.apache.doris.backup.Status;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.datasource.credentials.DataLakeAWSCredentialsProvider;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.datasource.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
import org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider;
import org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -70,39 +66,6 @@ import java.util.Optional;
*/
public class S3Resource extends Resource {
private static final Logger LOG = LogManager.getLogger(S3Resource.class);
public static final String S3_PROPERTIES_PREFIX = "AWS";
public static final String S3_FS_PREFIX = "fs.s3";
// required
public static final String S3_ENDPOINT = "AWS_ENDPOINT";
public static final String S3_REGION = "AWS_REGION";
public static final String S3_ACCESS_KEY = "AWS_ACCESS_KEY";
public static final String S3_SECRET_KEY = "AWS_SECRET_KEY";
private static final String S3_CREDENTIALS_PROVIDER = "AWS_CREDENTIALS_PROVIDER";
public static final List<String> REQUIRED_FIELDS =
Arrays.asList(S3_ENDPOINT, S3_REGION, S3_ACCESS_KEY, S3_SECRET_KEY);
// required by storage policy
public static final String S3_ROOT_PATH = "AWS_ROOT_PATH";
public static final String S3_BUCKET = "AWS_BUCKET";
private static final String S3_VALIDITY_CHECK = "s3_validity_check";
// optional
public static final String S3_TOKEN = "AWS_TOKEN";
public static final String USE_PATH_STYLE = "use_path_style";
public static final String S3_MAX_CONNECTIONS = "AWS_MAX_CONNECTIONS";
public static final String S3_REQUEST_TIMEOUT_MS = "AWS_REQUEST_TIMEOUT_MS";
public static final String S3_CONNECTION_TIMEOUT_MS = "AWS_CONNECTION_TIMEOUT_MS";
public static final String DEFAULT_S3_MAX_CONNECTIONS = "50";
public static final String DEFAULT_S3_REQUEST_TIMEOUT_MS = "3000";
public static final String DEFAULT_S3_CONNECTION_TIMEOUT_MS = "1000";
public static final List<String> DEFAULT_CREDENTIALS_PROVIDERS = Arrays.asList(
DataLakeAWSCredentialsProvider.class.getName(),
TemporaryAWSCredentialsProvider.class.getName(),
SimpleAWSCredentialsProvider.class.getName(),
EnvironmentVariableCredentialsProvider.class.getName(),
IAMInstanceCredentialsProvider.class.getName());
@SerializedName(value = "properties")
private Map<String, String> properties;
@ -123,43 +86,53 @@ public class S3Resource extends Resource {
@Override
protected void setProperties(Map<String, String> properties) throws DdlException {
Preconditions.checkState(properties != null);
this.properties = properties;
// check properties
// required
checkRequiredProperty(S3_ENDPOINT);
checkRequiredProperty(S3_REGION);
checkRequiredProperty(S3_ACCESS_KEY);
checkRequiredProperty(S3_SECRET_KEY);
checkRequiredProperty(S3_BUCKET);
// compatible with old version
S3Properties.convertToStdProperties(properties);
// check properties
S3Properties.requiredS3PingProperties(properties);
// default need check resource conf valid, so need fix ut and regression case
boolean needCheck = !properties.containsKey(S3_VALIDITY_CHECK)
|| Boolean.parseBoolean(properties.get(S3_VALIDITY_CHECK));
boolean needCheck = isNeedCheck(properties);
LOG.debug("s3 info need check validity : {}", needCheck);
// the endpoint for ping need add uri scheme.
String pingEndpoint = properties.get(S3Properties.ENDPOINT);
if (!pingEndpoint.startsWith("http://")) {
pingEndpoint = "http://" + properties.get(S3Properties.ENDPOINT);
properties.put(S3Properties.ENDPOINT, pingEndpoint);
properties.put(S3Properties.Env.ENDPOINT, pingEndpoint);
}
String region = S3Properties.getRegionOfEndpoint(pingEndpoint);
properties.putIfAbsent(S3Properties.REGION, region);
String ak = properties.get(S3Properties.ACCESS_KEY);
String sk = properties.get(S3Properties.SECRET_KEY);
CloudCredentialWithEndpoint credential = new CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk);
if (needCheck) {
boolean available = pingS3(this.properties);
String bucketName = properties.get(S3Properties.BUCKET);
String rootPath = properties.get(S3Properties.ROOT_PATH);
boolean available = pingS3(credential, bucketName, rootPath, properties);
if (!available) {
throw new DdlException("S3 can't use, please check your properties");
}
}
// optional
checkOptionalProperty(S3_MAX_CONNECTIONS, DEFAULT_S3_MAX_CONNECTIONS);
checkOptionalProperty(S3_REQUEST_TIMEOUT_MS, DEFAULT_S3_REQUEST_TIMEOUT_MS);
checkOptionalProperty(S3_CONNECTION_TIMEOUT_MS, DEFAULT_S3_CONNECTION_TIMEOUT_MS);
S3Properties.optionalS3Property(properties);
this.properties = properties;
}
private boolean pingS3(Map<String, String> properties) {
String bucket = "s3://" + properties.getOrDefault(S3_BUCKET, "") + "/";
private static boolean pingS3(CloudCredentialWithEndpoint credential, String bucketName, String rootPath,
Map<String, String> properties) {
String bucket = "s3://" + bucketName + "/";
Map<String, String> propertiesPing = new HashMap<>();
propertiesPing.put("AWS_ACCESS_KEY", properties.getOrDefault(S3_ACCESS_KEY, ""));
propertiesPing.put("AWS_SECRET_KEY", properties.getOrDefault(S3_SECRET_KEY, ""));
propertiesPing.put("AWS_ENDPOINT", "http://" + properties.getOrDefault(S3_ENDPOINT, ""));
propertiesPing.put("AWS_REGION", properties.getOrDefault(S3_REGION, ""));
propertiesPing.put(S3Resource.USE_PATH_STYLE, "false");
S3Storage storage = new S3Storage(propertiesPing);
String testFile = bucket + properties.getOrDefault(S3_ROOT_PATH, "") + "/test-object-valid.txt";
propertiesPing.put(S3Properties.Env.ACCESS_KEY, credential.getAccessKey());
propertiesPing.put(S3Properties.Env.SECRET_KEY, credential.getSecretKey());
propertiesPing.put(S3Properties.Env.ENDPOINT, credential.getEndpoint());
propertiesPing.put(S3Properties.Env.REGION, credential.getRegion());
propertiesPing.put(PropertyConverter.USE_PATH_STYLE, "false");
properties.putAll(propertiesPing);
S3Storage storage = new S3Storage(properties);
String testFile = bucket + rootPath + "/test-object-valid.txt";
String content = "doris will be better";
try {
Status status = storage.directUpload(content, testFile);
@ -179,50 +152,29 @@ public class S3Resource extends Resource {
return true;
}
private void checkRequiredProperty(String propertyKey) throws DdlException {
String value = properties.get(propertyKey);
if (Strings.isNullOrEmpty(value)) {
throw new DdlException("Missing [" + propertyKey + "] in properties.");
}
}
private void checkOptionalProperty(String propertyKey, String defaultValue) {
this.properties.putIfAbsent(propertyKey, defaultValue);
}
@Override
public void modifyProperties(Map<String, String> properties) throws DdlException {
if (references.containsValue(ReferenceType.POLICY)) {
// can't change, because remote fs use it info to find data.
List<String> cantChangeProperties = Arrays.asList(S3_ENDPOINT, S3_REGION, S3_ROOT_PATH, S3_BUCKET);
List<String> cantChangeProperties = Arrays.asList(S3Properties.ENDPOINT, S3Properties.REGION,
S3Properties.ROOT_PATH, S3Properties.BUCKET, S3Properties.Env.ENDPOINT, S3Properties.Env.REGION,
S3Properties.Env.ROOT_PATH, S3Properties.Env.BUCKET);
Optional<String> any = cantChangeProperties.stream().filter(properties::containsKey).findAny();
if (any.isPresent()) {
throw new DdlException("current not support modify property : " + any.get());
}
}
boolean needCheck = !this.properties.containsKey(S3_VALIDITY_CHECK)
|| Boolean.parseBoolean(this.properties.get(S3_VALIDITY_CHECK));
if (properties.containsKey(S3_VALIDITY_CHECK)) {
needCheck = Boolean.parseBoolean(properties.get(S3_VALIDITY_CHECK));
}
// compatible with old version
S3Properties.convertToStdProperties(properties);
boolean needCheck = isNeedCheck(properties);
LOG.debug("s3 info need check validity : {}", needCheck);
if (needCheck) {
Map<String, String> s3Properties = new HashMap<>();
s3Properties.put(S3_BUCKET, properties.containsKey(S3_BUCKET) ? properties.get(S3_BUCKET) :
this.properties.getOrDefault(S3_BUCKET, ""));
s3Properties.put(S3_ACCESS_KEY, properties.containsKey(S3_ACCESS_KEY) ? properties.get(S3_ACCESS_KEY) :
this.properties.getOrDefault(S3_ACCESS_KEY, ""));
s3Properties.put(S3_SECRET_KEY, properties.containsKey(S3_SECRET_KEY) ? properties.get(S3_SECRET_KEY) :
this.properties.getOrDefault(S3_SECRET_KEY, ""));
s3Properties.put(S3_ENDPOINT, properties.containsKey(S3_ENDPOINT) ? properties.get(S3_ENDPOINT) :
this.properties.getOrDefault(S3_ENDPOINT, ""));
s3Properties.put(S3_REGION, properties.containsKey(S3_REGION) ? properties.get(S3_REGION) :
this.properties.getOrDefault(S3_REGION, ""));
s3Properties.put(S3_ROOT_PATH, properties.containsKey(S3_ROOT_PATH) ? properties.get(S3_ROOT_PATH) :
this.properties.getOrDefault(S3_ROOT_PATH, ""));
boolean available = pingS3(s3Properties);
S3Properties.requiredS3PingProperties(this.properties);
String bucketName = properties.getOrDefault(S3Properties.BUCKET, this.properties.get(S3Properties.BUCKET));
String rootPath = properties.getOrDefault(S3Properties.ROOT_PATH,
this.properties.get(S3Properties.ROOT_PATH));
boolean available = pingS3(getS3PingCredentials(properties), bucketName, rootPath, properties);
if (!available) {
throw new DdlException("S3 can't use, please check your properties");
}
@ -238,6 +190,25 @@ public class S3Resource extends Resource {
super.modifyProperties(properties);
}
private CloudCredentialWithEndpoint getS3PingCredentials(Map<String, String> properties) {
String ak = properties.getOrDefault(S3Properties.ACCESS_KEY, this.properties.get(S3Properties.ACCESS_KEY));
String sk = properties.getOrDefault(S3Properties.SECRET_KEY, this.properties.get(S3Properties.SECRET_KEY));
String endpoint = properties.getOrDefault(S3Properties.ENDPOINT, this.properties.get(S3Properties.ENDPOINT));
String pingEndpoint = "http://" + endpoint;
String region = S3Properties.getRegionOfEndpoint(pingEndpoint);
properties.putIfAbsent(S3Properties.REGION, region);
return new CloudCredentialWithEndpoint(pingEndpoint, region, ak, sk);
}
private boolean isNeedCheck(Map<String, String> newProperties) {
boolean needCheck = !this.properties.containsKey(S3Properties.VALIDITY_CHECK)
|| Boolean.parseBoolean(this.properties.get(S3Properties.VALIDITY_CHECK));
if (newProperties != null && newProperties.containsKey(S3Properties.VALIDITY_CHECK)) {
needCheck = Boolean.parseBoolean(newProperties.get(S3Properties.VALIDITY_CHECK));
}
return needCheck;
}
@Override
public Map<String, String> getCopiedProperties() {
return Maps.newHashMap(properties);
@ -250,9 +221,13 @@ public class S3Resource extends Resource {
readLock();
result.addRow(Lists.newArrayList(name, lowerCaseType, "version", String.valueOf(version)));
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (PrintableMap.HIDDEN_KEY.contains(entry.getKey())) {
continue;
}
// it's dangerous to show password in show odbc resource,
// so we use empty string to replace the real password
if (entry.getKey().equals(S3_SECRET_KEY)) {
if (entry.getKey().equals(S3Properties.Env.SECRET_KEY)
|| entry.getKey().equals(S3Properties.SECRET_KEY)) {
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), "******"));
} else {
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
@ -260,52 +235,4 @@ public class S3Resource extends Resource {
}
readUnlock();
}
public static Map<String, String> getS3HadoopProperties(Map<String, String> properties) {
Map<String, String> s3Properties = Maps.newHashMap();
if (properties.containsKey(S3_ACCESS_KEY)) {
s3Properties.put(Constants.ACCESS_KEY, properties.get(S3_ACCESS_KEY));
}
if (properties.containsKey(S3Resource.S3_SECRET_KEY)) {
s3Properties.put(Constants.SECRET_KEY, properties.get(S3_SECRET_KEY));
}
if (properties.containsKey(S3Resource.S3_ENDPOINT)) {
s3Properties.put(Constants.ENDPOINT, properties.get(S3_ENDPOINT));
}
if (properties.containsKey(S3Resource.S3_REGION)) {
s3Properties.put(Constants.AWS_REGION, properties.get(S3_REGION));
}
if (properties.containsKey(S3Resource.S3_MAX_CONNECTIONS)) {
s3Properties.put(Constants.MAXIMUM_CONNECTIONS, properties.get(S3_MAX_CONNECTIONS));
}
if (properties.containsKey(S3Resource.S3_REQUEST_TIMEOUT_MS)) {
s3Properties.put(Constants.REQUEST_TIMEOUT, properties.get(S3_REQUEST_TIMEOUT_MS));
}
if (properties.containsKey(S3Resource.S3_CONNECTION_TIMEOUT_MS)) {
s3Properties.put(Constants.SOCKET_TIMEOUT, properties.get(S3_CONNECTION_TIMEOUT_MS));
}
s3Properties.put(Constants.MAX_ERROR_RETRIES, "2");
s3Properties.put("fs.s3.impl.disable.cache", "true");
s3Properties.put("fs.s3.impl", S3AFileSystem.class.getName());
String defaultProviderList = String.join(",", DEFAULT_CREDENTIALS_PROVIDERS);
String credentialsProviders = s3Properties
.getOrDefault("fs.s3a.aws.credentials.provider", defaultProviderList);
s3Properties.put("fs.s3a.aws.credentials.provider", credentialsProviders);
s3Properties.put(Constants.PATH_STYLE_ACCESS, properties.getOrDefault(S3Resource.USE_PATH_STYLE, "false"));
if (properties.containsKey(S3Resource.S3_TOKEN)) {
s3Properties.put(Constants.SESSION_TOKEN, properties.get(S3_TOKEN));
s3Properties.put("fs.s3a.aws.credentials.provider", TemporaryAWSCredentialsProvider.class.getName());
s3Properties.put("fs.s3.impl.disable.cache", "true");
s3Properties.put("fs.s3a.impl.disable.cache", "true");
}
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(S3Resource.S3_FS_PREFIX)) {
s3Properties.put(entry.getKey(), entry.getValue());
}
}
return s3Properties;
}
}

View File

@ -17,11 +17,22 @@
package org.apache.doris.common.util;
import org.apache.doris.datasource.property.constants.CosProperties;
import org.apache.doris.datasource.property.constants.DLFProperties;
import org.apache.doris.datasource.property.constants.GlueProperties;
import org.apache.doris.datasource.property.constants.ObsProperties;
import org.apache.doris.datasource.property.constants.OssProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
public class PrintableMap<K, V> {
private Map<K, V> map;
@ -32,6 +43,7 @@ public class PrintableMap<K, V> {
private String entryDelimiter = ",";
public static final Set<String> SENSITIVE_KEY;
public static final Set<String> HIDDEN_KEY;
public static final String PASSWORD_MASK = "*XXX";
static {
@ -41,6 +53,11 @@ public class PrintableMap<K, V> {
SENSITIVE_KEY.add("bos_secret_accesskey");
SENSITIVE_KEY.add("jdbc.password");
SENSITIVE_KEY.add("elasticsearch.password");
SENSITIVE_KEY.addAll(Arrays.asList(S3Properties.SECRET_KEY, ObsProperties.SECRET_KEY, OssProperties.SECRET_KEY,
CosProperties.SECRET_KEY, GlueProperties.SECRET_KEY, DLFProperties.SECRET_KEY));
HIDDEN_KEY = Sets.newHashSet();
HIDDEN_KEY.addAll(S3Properties.Env.FS_KEYS);
HIDDEN_KEY.addAll(GlueProperties.META_KEYS);
}
public PrintableMap(Map<K, V> map, String keyValueSeparator,
@ -64,40 +81,65 @@ public class PrintableMap<K, V> {
this.hidePassword = hidePassword;
}
public PrintableMap(Map<K, V> map, String keyValueSeparator,
boolean withQuotation, boolean wrap, boolean hidePassword, boolean sorted) {
this(sorted ? new TreeMap<>(map).descendingMap() : map, keyValueSeparator, withQuotation, wrap);
this.hidePassword = hidePassword;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Iterator<Map.Entry<K, V>> iter = map.entrySet().iterator();
Iterator<Map.Entry<K, V>> iter = showEntries().iterator();
while (iter.hasNext()) {
Map.Entry<K, V> entry = iter.next();
if (withQuotation) {
sb.append("\"");
}
sb.append(entry.getKey());
if (withQuotation) {
sb.append("\"");
}
sb.append(" ").append(keyValueSeparator).append(" ");
if (withQuotation) {
sb.append("\"");
}
if (hidePassword && SENSITIVE_KEY.contains(entry.getKey())) {
sb.append(PASSWORD_MASK);
} else {
sb.append(entry.getValue());
}
if (withQuotation) {
sb.append("\"");
}
appendEntry(sb, iter.next());
if (iter.hasNext()) {
sb.append(entryDelimiter);
if (wrap) {
sb.append("\n");
} else {
sb.append(" ");
}
appendDelimiter(sb);
}
}
return sb.toString();
}
private List<Map.Entry<K, V>> showEntries() {
Iterator<Map.Entry<K, V>> iter = map.entrySet().iterator();
List<Map.Entry<K, V>> entries = new ArrayList<>();
while (iter.hasNext()) {
Map.Entry<K, V> entry = iter.next();
if (!HIDDEN_KEY.contains(entry.getKey())) {
entries.add(entry);
}
}
return entries;
}
private void appendEntry(StringBuilder sb, Map.Entry<K, V> entry) {
if (withQuotation) {
sb.append("\"");
}
sb.append(entry.getKey());
if (withQuotation) {
sb.append("\"");
}
sb.append(" ").append(keyValueSeparator).append(" ");
if (withQuotation) {
sb.append("\"");
}
if (hidePassword && SENSITIVE_KEY.contains(entry.getKey())) {
sb.append(PASSWORD_MASK);
} else {
sb.append(entry.getValue());
}
if (withQuotation) {
sb.append("\"");
}
}
private void appendDelimiter(StringBuilder sb) {
sb.append(entryDelimiter);
if (wrap) {
sb.append("\n");
} else {
sb.append(" ");
}
}
}

View File

@ -394,6 +394,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
rows.add(Arrays.asList("resource", catalog.getResource()));
}
for (Map.Entry<String, String> elem : catalog.getProperties().entrySet()) {
if (PrintableMap.HIDDEN_KEY.contains(elem.getKey())) {
continue;
}
if (PrintableMap.SENSITIVE_KEY.contains(elem.getKey())) {
rows.add(Arrays.asList(elem.getKey(), PrintableMap.PASSWORD_MASK));
} else {
@ -421,7 +424,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
.append("`");
if (catalog.getProperties().size() > 0) {
sb.append(" PROPERTIES (\n");
sb.append(new PrintableMap<>(catalog.getProperties(), "=", true, true, true));
sb.append(new PrintableMap<>(catalog.getProperties(), "=", true, true, true, true));
sb.append("\n);");
}

View File

@ -18,11 +18,10 @@
package org.apache.doris.datasource;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.S3Resource;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.base.Strings;
@ -94,17 +93,12 @@ public class CatalogProperty implements Writable {
}
public void modifyCatalogProps(Map<String, String> props) {
props = HMSResource.getPropertiesFromGlue(props);
properties.putAll(props);
}
public Map<String, String> getS3HadoopProperties() {
return S3Resource.getS3HadoopProperties(getProperties());
properties.putAll(PropertyConverter.convertToMetaProperties(props));
}
public Map<String, String> getHadoopProperties() {
Map<String, String> hadoopProperties = getProperties();
hadoopProperties.putAll(getS3HadoopProperties());
hadoopProperties.putAll(PropertyConverter.convertToHadoopFSProperties(getProperties()));
return hadoopProperties;
}

View File

@ -31,6 +31,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
@ -297,7 +298,7 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
@Override
public Map<String, String> getProperties() {
return catalogProperty.getProperties();
return PropertyConverter.convertToMetaProperties(catalogProperty.getProperties());
}
@Override

View File

@ -19,7 +19,6 @@ package org.apache.doris.datasource;
import org.apache.doris.catalog.AuthType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.catalog.external.HMSExternalDatabase;
@ -27,6 +26,8 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.HMSProperties;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@ -71,8 +72,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
public HMSExternalCatalog(long catalogId, String name, String resource, Map<String, String> props) {
super(catalogId, name);
this.type = "hms";
props = HMSResource.getPropertiesFromDLF(props);
props = HMSResource.getPropertiesFromGlue(props);
props = PropertyConverter.convertToMetaProperties(props);
catalogProperty = new CatalogProperty(resource, props);
}
@ -119,7 +119,7 @@ public class HMSExternalCatalog extends ExternalCatalog {
}
public String getHiveMetastoreUris() {
return catalogProperty.getOrDefault(HMSResource.HIVE_METASTORE_URIS, "");
return catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");
}
@Override
@ -163,7 +163,6 @@ public class HMSExternalCatalog extends ExternalCatalog {
for (Map.Entry<String, String> kv : catalogProperty.getHadoopProperties().entrySet()) {
hiveConf.set(kv.getKey(), kv.getValue());
}
String authentication = catalogProperty.getOrDefault(
HdfsResource.HADOOP_SECURITY_AUTHENTICATION, "");
if (AuthType.KERBEROS.getDesc().equals(authentication)) {

View File

@ -74,7 +74,6 @@ import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.HiveTable;
import org.apache.doris.catalog.IcebergTable;
@ -138,6 +137,7 @@ import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.external.elasticsearch.EsRepository;
import org.apache.doris.external.hudi.HudiProperty;
import org.apache.doris.external.hudi.HudiTable;
@ -2419,8 +2419,8 @@ public class InternalCatalog implements CatalogIf<Database> {
hiveTable.setComment(stmt.getComment());
// check hive table whether exists in hive database
HiveConf hiveConf = new HiveConf();
hiveConf.set(HMSResource.HIVE_METASTORE_URIS,
hiveTable.getHiveProperties().get(HMSResource.HIVE_METASTORE_URIS));
hiveConf.set(HMSProperties.HIVE_METASTORE_URIS,
hiveTable.getHiveProperties().get(HMSProperties.HIVE_METASTORE_URIS));
PooledHiveMetaStoreClient client = new PooledHiveMetaStoreClient(hiveConf, 1);
if (!client.tableExists(hiveTable.getHiveDb(), hiveTable.getHiveTable())) {
throw new DdlException(String.format("Table [%s] dose not exist in Hive.", hiveTable.getHiveDbTable()));
@ -2442,7 +2442,7 @@ public class InternalCatalog implements CatalogIf<Database> {
HudiUtils.validateCreateTable(hudiTable);
// check hudi table whether exists in hive database
HiveConf hiveConf = new HiveConf();
hiveConf.set(HMSResource.HIVE_METASTORE_URIS,
hiveConf.set(HMSProperties.HIVE_METASTORE_URIS,
hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS));
PooledHiveMetaStoreClient client = new PooledHiveMetaStoreClient(hiveConf, 1);
if (!client.tableExists(hudiTable.getHmsDatabaseName(), hudiTable.getHmsTableName())) {

View File

@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.credentials;
import org.apache.commons.lang3.StringUtils;
public class CloudCredential {
private String accessKey;
private String secretKey;
private String sessionToken;
public CloudCredential() {}
public String getAccessKey() {
return accessKey;
}
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}
public String getSecretKey() {
return secretKey;
}
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}
public String getSessionToken() {
return sessionToken;
}
public void setSessionToken(String sessionToken) {
this.sessionToken = sessionToken;
}
public boolean isWhole() {
return !StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey);
}
public boolean isTemporary() {
return !StringUtils.isEmpty(sessionToken);
}
}

View File

@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.credentials;
public class CloudCredentialWithEndpoint extends CloudCredential {
private String endpoint;
private String region;
public CloudCredentialWithEndpoint(String endpoint, String region, CloudCredential credential) {
this.endpoint = endpoint;
this.region = region;
setAccessKey(credential.getAccessKey());
setSecretKey(credential.getSecretKey());
setSessionToken(credential.getSessionToken());
}
public CloudCredentialWithEndpoint(String endpoint, String region, String accessKey, String secretKey) {
this(endpoint, region, accessKey, secretKey, null);
}
public CloudCredentialWithEndpoint(String endpoint, String region, String accessKey,
String secretKey, String token) {
this.endpoint = endpoint;
this.region = region;
setAccessKey(accessKey);
setSecretKey(secretKey);
setSessionToken(token);
}
public String getEndpoint() {
return endpoint;
}
public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}
public String getRegion() {
return region;
}
public void setRegion(String region) {
this.region = region;
}
}

View File

@ -17,10 +17,10 @@
package org.apache.doris.datasource.hive;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.HMSClientException;
import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
import org.apache.doris.datasource.property.constants.HMSProperties;
import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient;
@ -178,11 +178,11 @@ public class PooledHiveMetaStoreClient {
private final IMetaStoreClient client;
private CachedClient(HiveConf hiveConf) throws MetaException {
String type = hiveConf.get(HMSResource.HIVE_METASTORE_TYPE);
if (HMSResource.DLF_TYPE.equalsIgnoreCase(type)) {
String type = hiveConf.get(HMSProperties.HIVE_METASTORE_TYPE);
if (HMSProperties.DLF_TYPE.equalsIgnoreCase(type)) {
client = RetryingMetaStoreClient.getProxy(hiveConf, DUMMY_HOOK_LOADER,
ProxyMetaStoreClient.class.getName());
} else if (HMSResource.GLUE_TYPE.equalsIgnoreCase(type)) {
} else if (HMSProperties.GLUE_TYPE.equalsIgnoreCase(type)) {
client = RetryingMetaStoreClient.getProxy(hiveConf, DUMMY_HOOK_LOADER,
AWSCatalogMetastoreClient.class.getName());
} else {

View File

@ -17,9 +17,12 @@
package org.apache.doris.datasource.iceberg;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.iceberg.dlf.DLFCatalog;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.HMSProperties;
import com.aliyun.datalake.metastore.common.DataLakeConfig;
import java.util.Map;
@ -27,8 +30,8 @@ public class IcebergDLFExternalCatalog extends IcebergExternalCatalog {
public IcebergDLFExternalCatalog(long catalogId, String name, String resource, Map<String, String> props) {
super(catalogId, name);
props.put(HMSResource.HIVE_METASTORE_TYPE, "dlf");
props = HMSResource.getPropertiesFromDLF(props);
props.put(HMSProperties.HIVE_METASTORE_TYPE, "dlf");
props = PropertyConverter.convertToMetaProperties(props);
catalogProperty = new CatalogProperty(resource, props);
}
@ -39,7 +42,7 @@ public class IcebergDLFExternalCatalog extends IcebergExternalCatalog {
dlfCatalog.setConf(getConfiguration());
// initialize catalog
Map<String, String> catalogProperties = catalogProperty.getHadoopProperties();
String dlfUid = catalogProperties.get("dlf.catalog.uid");
String dlfUid = catalogProperties.get(DataLakeConfig.CATALOG_USER_ID);
dlfCatalog.initialize(dlfUid, catalogProperties);
catalog = dlfCatalog;
}

View File

@ -18,10 +18,12 @@
package org.apache.doris.datasource.iceberg;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.aws.glue.GlueCatalog;
import org.apache.iceberg.catalog.Namespace;
@ -31,8 +33,12 @@ import java.util.stream.Collectors;
public class IcebergGlueExternalCatalog extends IcebergExternalCatalog {
// As a default placeholder. The path just use for 'create table', query stmt will not use it.
private static final String CHECKED_WAREHOUSE = "s3://doris";
public IcebergGlueExternalCatalog(long catalogId, String name, String resource, Map<String, String> props) {
super(catalogId, name);
props = PropertyConverter.convertToMetaProperties(props);
catalogProperty = new CatalogProperty(resource, props);
}
@ -40,25 +46,20 @@ public class IcebergGlueExternalCatalog extends IcebergExternalCatalog {
protected void initLocalObjectsImpl() {
icebergCatalogType = ICEBERG_GLUE;
GlueCatalog glueCatalog = new GlueCatalog();
// AWSGlueAsync glueClient;
Configuration conf = setGlueProperties(getConfiguration());
glueCatalog.setConf(conf);
glueCatalog.setConf(getConfiguration());
// initialize glue catalog
Map<String, String> catalogProperties = catalogProperty.getProperties();
// check AwsProperties.GLUE_CATALOG_ENDPOINT
String metastoreUris = catalogProperty.getOrDefault(CatalogProperties.WAREHOUSE_LOCATION, "");
if (StringUtils.isEmpty(metastoreUris)) {
throw new IllegalArgumentException("Missing glue properties 'warehouse'.");
}
catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, metastoreUris);
Map<String, String> catalogProperties = catalogProperty.getHadoopProperties();
String warehouse = catalogProperty.getOrDefault(CatalogProperties.WAREHOUSE_LOCATION, CHECKED_WAREHOUSE);
catalogProperties.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
// read from converted s3 endpoint or default by BE s3 endpoint
String endpoint = catalogProperties.getOrDefault(Constants.ENDPOINT,
catalogProperties.get(S3Properties.Env.ENDPOINT));
catalogProperties.putIfAbsent(AwsProperties.S3FILEIO_ENDPOINT, endpoint);
glueCatalog.initialize(icebergCatalogType, catalogProperties);
catalog = glueCatalog;
}
private Configuration setGlueProperties(Configuration configuration) {
return configuration;
}
@Override
protected List<String> listDatabaseNames() {
return nsCatalog.listNamespaces().stream()

View File

@ -17,8 +17,9 @@
package org.apache.doris.datasource.iceberg;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.hive.HiveCatalog;
@ -30,6 +31,7 @@ public class IcebergHMSExternalCatalog extends IcebergExternalCatalog {
public IcebergHMSExternalCatalog(long catalogId, String name, String resource, Map<String, String> props) {
super(catalogId, name);
props = PropertyConverter.convertToMetaProperties(props);
catalogProperty = new CatalogProperty(resource, props);
}
@ -40,7 +42,7 @@ public class IcebergHMSExternalCatalog extends IcebergExternalCatalog {
hiveCatalog.setConf(getConfiguration());
// initialize hive catalog
Map<String, String> catalogProperties = new HashMap<>();
String metastoreUris = catalogProperty.getOrDefault(HMSResource.HIVE_METASTORE_URIS, "");
String metastoreUris = catalogProperty.getOrDefault(HMSProperties.HIVE_METASTORE_URIS, "");
catalogProperties.put(CatalogProperties.URI, metastoreUris);
hiveCatalog.initialize(icebergCatalogType, catalogProperties);

View File

@ -17,9 +17,9 @@
package org.apache.doris.datasource.iceberg;
import org.apache.doris.catalog.S3Resource;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.credentials.DataLakeAWSCredentialsProvider;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.Constants;
@ -33,6 +33,7 @@ public class IcebergRestExternalCatalog extends IcebergExternalCatalog {
public IcebergRestExternalCatalog(long catalogId, String name, String resource, Map<String, String> props) {
super(catalogId, name);
props = PropertyConverter.convertToMetaProperties(props);
catalogProperty = new CatalogProperty(resource, props);
}
@ -50,13 +51,13 @@ public class IcebergRestExternalCatalog extends IcebergExternalCatalog {
}
private Configuration replaceS3Properties(Configuration conf) {
Map<String, String> catalogProperties = catalogProperty.getProperties();
Map<String, String> catalogProperties = catalogProperty.getHadoopProperties();
String credentials = catalogProperties
.getOrDefault(Constants.AWS_CREDENTIALS_PROVIDER, DataLakeAWSCredentialsProvider.class.getName());
conf.set(Constants.AWS_CREDENTIALS_PROVIDER, credentials);
String usePahStyle = catalogProperties.getOrDefault(S3Resource.USE_PATH_STYLE, "true");
String usePahStyle = catalogProperties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "true");
// Set path style
conf.set(S3Resource.USE_PATH_STYLE, usePahStyle);
conf.set(PropertyConverter.USE_PATH_STYLE, usePahStyle);
conf.set(Constants.PATH_STYLE_ACCESS, usePahStyle);
// Get AWS client retry limit
conf.set(Constants.RETRY_LIMIT, catalogProperties.getOrDefault(Constants.RETRY_LIMIT, "1"));

View File

@ -0,0 +1,378 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property;
import org.apache.doris.datasource.credentials.CloudCredential;
import org.apache.doris.datasource.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.property.constants.CosProperties;
import org.apache.doris.datasource.property.constants.DLFProperties;
import org.apache.doris.datasource.property.constants.GlueProperties;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.datasource.property.constants.ObsProperties;
import org.apache.doris.datasource.property.constants.OssProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
import com.aliyun.datalake.metastore.common.DataLakeConfig;
import com.amazonaws.glue.catalog.util.AWSGlueConfig;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.obs.OBSConstants;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.Map;
public class PropertyConverter {
private static final Logger LOG = LogManager.getLogger(PropertyConverter.class);
public static final String USE_PATH_STYLE = "use_path_style";
/**
* Convert properties defined at doris to metadata properties on Cloud
* Step 1: convert and set cloud metadata properties and s3 properties
* example:
* glue.endpoint -> aws.glue.endpoint for Glue
* -> s3.endpoint for S3
* glue.access_key -> aws.glue.access-key for Glue
* > s3.access_key for S3
* Step 2: convert props to BE properties, put them all to metaProperties
* example:
* s3.endpoint -> AWS_ENDPOINT
* s3.access_key -> AWS_ACCESS_KEY
* These properties will be used for catalog/resource, and persisted to catalog/resource properties.
*/
public static Map<String, String> convertToMetaProperties(Map<String, String> props) {
Map<String, String> metaProperties = new HashMap<>();
CloudCredential credential = GlueProperties.getCredential(props);
if (!credential.isWhole()) {
credential = GlueProperties.getCompatibleCredential(props);
}
if (props.containsKey(GlueProperties.ENDPOINT)
|| props.containsKey(AWSGlueConfig.AWS_GLUE_ENDPOINT)) {
metaProperties = convertToGlueProperties(props, credential);
} else if (props.containsKey(DLFProperties.ENDPOINT)
|| props.containsKey(DataLakeConfig.CATALOG_ENDPOINT)) {
metaProperties = convertToDLFProperties(props, credential);
}
metaProperties.putAll(props);
metaProperties.putAll(S3ClientBEProperties.getBeFSProperties(props));
return metaProperties;
}
/**
* Convert properties defined at doris to FE S3 client properties
* Support other cloud client here.
*/
public static Map<String, String> convertToHadoopFSProperties(Map<String, String> props) {
if (props.containsKey(S3Properties.ENDPOINT)) {
return convertToS3Properties(props, S3Properties.getCredential(props));
} else if (props.containsKey(ObsProperties.ENDPOINT)) {
return convertToOBSProperties(props, ObsProperties.getCredential(props));
} else if (props.containsKey(OssProperties.ENDPOINT)) {
return convertToOSSProperties(props, OssProperties.getCredential(props));
} else if (props.containsKey(CosProperties.ENDPOINT)) {
return convertToCOSProperties(props, CosProperties.getCredential(props));
} else if (props.containsKey(S3Properties.Env.ENDPOINT)) {
// checkout env in the end
// compatible with the s3,obs,oss,cos when they use aws client.
return convertToS3EnvProperties(props, S3Properties.getEnvironmentCredentialWithEndpoint(props));
}
return props;
}
private static Map<String, String> convertToOBSProperties(Map<String, String> props,
CloudCredential credential) {
Map<String, String> obsProperties = Maps.newHashMap();
obsProperties.put(OBSConstants.ENDPOINT, props.get(ObsProperties.ENDPOINT));
obsProperties.put(ObsProperties.FS.IMPL_DISABLE_CACHE, "true");
if (credential.isWhole()) {
obsProperties.put(OBSConstants.ACCESS_KEY, credential.getAccessKey());
obsProperties.put(OBSConstants.SECRET_KEY, credential.getSecretKey());
}
if (credential.isTemporary()) {
obsProperties.put(ObsProperties.FS.SESSION_TOKEN, credential.getSessionToken());
}
for (Map.Entry<String, String> entry : props.entrySet()) {
if (entry.getKey().startsWith(ObsProperties.OBS_FS_PREFIX)) {
obsProperties.put(entry.getKey(), entry.getValue());
}
}
return obsProperties;
}
private static Map<String, String> convertToS3EnvProperties(Map<String, String> properties,
CloudCredentialWithEndpoint credential) {
// Old properties to new properties
properties.put(S3Properties.ENDPOINT, credential.getEndpoint());
properties.put(S3Properties.REGION, credential.getRegion());
properties.put(S3Properties.ACCESS_KEY, credential.getAccessKey());
properties.put(S3Properties.SECRET_KEY, credential.getSecretKey());
properties.put(S3Properties.SESSION_TOKEN, credential.getSessionToken());
if (properties.containsKey(S3Properties.Env.MAX_CONNECTIONS)) {
properties.put(S3Properties.MAX_CONNECTIONS, properties.get(S3Properties.Env.MAX_CONNECTIONS));
}
if (properties.containsKey(S3Properties.Env.REQUEST_TIMEOUT_MS)) {
properties.put(S3Properties.REQUEST_TIMEOUT_MS, properties.get(S3Properties.Env.REQUEST_TIMEOUT_MS));
}
if (properties.containsKey(S3Properties.Env.CONNECTION_TIMEOUT_MS)) {
properties.put(S3Properties.REQUEST_TIMEOUT_MS, properties.get(S3Properties.Env.CONNECTION_TIMEOUT_MS));
}
return convertToS3Properties(properties, credential);
}
private static Map<String, String> convertToS3Properties(Map<String, String> properties,
CloudCredential credential) {
Map<String, String> s3Properties = Maps.newHashMap();
String endpoint = properties.get(S3Properties.ENDPOINT);
s3Properties.put(Constants.ENDPOINT, endpoint);
s3Properties.put(Constants.AWS_REGION, S3Properties.getRegionOfEndpoint(endpoint));
if (properties.containsKey(S3Properties.MAX_CONNECTIONS)) {
s3Properties.put(Constants.MAXIMUM_CONNECTIONS, properties.get(S3Properties.MAX_CONNECTIONS));
}
if (properties.containsKey(S3Properties.REQUEST_TIMEOUT_MS)) {
s3Properties.put(Constants.REQUEST_TIMEOUT, properties.get(S3Properties.REQUEST_TIMEOUT_MS));
}
if (properties.containsKey(S3Properties.CONNECTION_TIMEOUT_MS)) {
s3Properties.put(Constants.SOCKET_TIMEOUT, properties.get(S3Properties.CONNECTION_TIMEOUT_MS));
}
setS3FsAccess(s3Properties, properties, credential);
return s3Properties;
}
private static void setS3FsAccess(Map<String, String> s3Properties, Map<String, String> properties,
CloudCredential credential) {
s3Properties.put(Constants.MAX_ERROR_RETRIES, "2");
s3Properties.put("fs.s3.impl.disable.cache", "true");
s3Properties.put("fs.s3.impl", S3AFileSystem.class.getName());
String defaultProviderList = String.join(",", S3Properties.AWS_CREDENTIALS_PROVIDERS);
String credentialsProviders = s3Properties
.getOrDefault(S3Properties.CREDENTIALS_PROVIDER, defaultProviderList);
s3Properties.put(Constants.AWS_CREDENTIALS_PROVIDER, credentialsProviders);
if (credential.isWhole()) {
s3Properties.put(Constants.ACCESS_KEY, credential.getAccessKey());
s3Properties.put(Constants.SECRET_KEY, credential.getSecretKey());
}
if (credential.isTemporary()) {
s3Properties.put(Constants.SESSION_TOKEN, credential.getSessionToken());
s3Properties.put(Constants.AWS_CREDENTIALS_PROVIDER, TemporaryAWSCredentialsProvider.class.getName());
s3Properties.put("fs.s3.impl.disable.cache", "true");
s3Properties.put("fs.s3a.impl.disable.cache", "true");
}
s3Properties.put(Constants.PATH_STYLE_ACCESS, properties.getOrDefault(USE_PATH_STYLE, "false"));
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(S3Properties.S3_FS_PREFIX)) {
s3Properties.put(entry.getKey(), entry.getValue());
}
}
}
private static Map<String, String> convertToOSSProperties(Map<String, String> props, CloudCredential credential) {
// Now we use s3 client to access
return convertToS3Properties(S3Properties.prefixToS3(props), credential);
}
private static Map<String, String> convertToCOSProperties(Map<String, String> props, CloudCredential credential) {
// Now we use s3 client to access
return convertToS3Properties(S3Properties.prefixToS3(props), credential);
}
private static Map<String, String> convertToDLFProperties(Map<String, String> props, CloudCredential credential) {
getPropertiesFromDLFConf(props);
// if configure DLF properties in catalog properties, use them to override config in hive-site.xml
getPropertiesFromDLFProps(props, credential);
return props;
}
public static void getPropertiesFromDLFConf(Map<String, String> props) {
if (LOG.isDebugEnabled()) {
LOG.debug("Get properties from hive-site.xml");
}
// read properties from hive-site.xml.
HiveConf hiveConf = new HiveConf();
String metastoreType = hiveConf.get(HMSProperties.HIVE_METASTORE_TYPE);
if (!HMSProperties.DLF_TYPE.equalsIgnoreCase(metastoreType)) {
return;
}
String uid = props.get(DataLakeConfig.CATALOG_USER_ID);
if (Strings.isNullOrEmpty(uid)) {
throw new IllegalArgumentException("Required dlf property: " + DataLakeConfig.CATALOG_USER_ID);
}
// access OSS by AWS client, so set s3 parameters
getAWSPropertiesFromDLFConf(props, hiveConf);
}
private static void getAWSPropertiesFromDLFConf(Map<String, String> props, HiveConf hiveConf) {
// get following properties from hive-site.xml
// 1. region and endpoint. eg: cn-beijing
String region = hiveConf.get(DataLakeConfig.CATALOG_REGION_ID);
if (!Strings.isNullOrEmpty(region)) {
// See: https://help.aliyun.com/document_detail/31837.html
// And add "-internal" to access oss within vpc
props.put(S3Properties.REGION, "oss-" + region);
String publicAccess = hiveConf.get("dlf.catalog.accessPublic", "false");
props.put(S3Properties.ENDPOINT, getDLFEndpoint(region, Boolean.parseBoolean(publicAccess)));
}
// 2. ak and sk
String ak = hiveConf.get(DataLakeConfig.CATALOG_ACCESS_KEY_ID);
String sk = hiveConf.get(DataLakeConfig.CATALOG_ACCESS_KEY_SECRET);
if (!Strings.isNullOrEmpty(ak)) {
props.put(S3Properties.ACCESS_KEY, ak);
}
if (!Strings.isNullOrEmpty(sk)) {
props.put(S3Properties.SECRET_KEY, sk);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Get properties for oss in hive-site.xml: {}", props);
}
}
private static void getPropertiesFromDLFProps(Map<String, String> props,
CloudCredential credential) {
String metastoreType = props.get(HMSProperties.HIVE_METASTORE_TYPE);
if (!HMSProperties.DLF_TYPE.equalsIgnoreCase(metastoreType)) {
return;
}
// convert to dlf client properties. not convert if origin key found.
if (!props.containsKey(DataLakeConfig.CATALOG_USER_ID)) {
props.put(DataLakeConfig.CATALOG_USER_ID, props.get(DLFProperties.UID));
String uid = props.get(DLFProperties.UID);
if (Strings.isNullOrEmpty(uid)) {
throw new IllegalArgumentException("Required dlf property: " + DLFProperties.UID);
}
props.put(DataLakeConfig.CATALOG_ENDPOINT, props.get(DLFProperties.ENDPOINT));
props.put(DataLakeConfig.CATALOG_PROXY_MODE, props.getOrDefault(DLFProperties.PROXY_MODE, "DLF_ONLY"));
props.put(DataLakeConfig.CATALOG_ACCESS_KEY_ID, credential.getAccessKey());
props.put(DataLakeConfig.CATALOG_ACCESS_KEY_SECRET, credential.getSecretKey());
props.put(DLFProperties.Site.ACCESS_PUBLIC, props.get(DLFProperties.ACCESS_PUBLIC));
}
String uid = props.get(DataLakeConfig.CATALOG_USER_ID);
if (Strings.isNullOrEmpty(uid)) {
throw new IllegalArgumentException("Required dlf property: " + DataLakeConfig.CATALOG_USER_ID);
}
// convert to s3 client property
if (props.containsKey(props.get(DataLakeConfig.CATALOG_ACCESS_KEY_ID))) {
props.put(S3Properties.ACCESS_KEY, props.get(DataLakeConfig.CATALOG_ACCESS_KEY_ID));
}
if (props.containsKey(props.get(DataLakeConfig.CATALOG_ACCESS_KEY_SECRET))) {
props.put(S3Properties.SECRET_KEY, props.get(DataLakeConfig.CATALOG_ACCESS_KEY_SECRET));
}
String publicAccess = props.getOrDefault(DLFProperties.Site.ACCESS_PUBLIC, "false");
String region = props.get(DataLakeConfig.CATALOG_REGION_ID);
String endpoint = props.getOrDefault(DataLakeConfig.CATALOG_ENDPOINT,
getDLFEndpoint(region, Boolean.parseBoolean(publicAccess)));
if (!Strings.isNullOrEmpty(region)) {
props.put(S3Properties.REGION, "oss-" + region);
props.put(S3Properties.ENDPOINT, endpoint);
}
}
private static String getDLFEndpoint(String region, boolean publicAccess) {
String prefix = "http://oss-";
String suffix = ".aliyuncs.com";
if (!publicAccess) {
suffix = "-internal" + suffix;
}
return prefix + region + suffix;
}
private static Map<String, String> convertToGlueProperties(Map<String, String> props, CloudCredential credential) {
// convert doris glue property to glue properties, s3 client property and BE property
String metastoreType = props.get(HMSProperties.HIVE_METASTORE_TYPE);
String icebergType = props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE);
boolean isGlueIceberg = IcebergExternalCatalog.ICEBERG_GLUE.equals(icebergType);
if (!HMSProperties.GLUE_TYPE.equalsIgnoreCase(metastoreType) && !isGlueIceberg) {
return props;
}
if (isGlueIceberg) {
// glue ak sk for iceberg
props.putIfAbsent(GlueProperties.ACCESS_KEY, credential.getAccessKey());
props.putIfAbsent(GlueProperties.SECRET_KEY, credential.getSecretKey());
}
// set glue client metadata
if (props.containsKey(GlueProperties.ENDPOINT)) {
String endpoint = props.get(GlueProperties.ENDPOINT);
props.put(AWSGlueConfig.AWS_GLUE_ENDPOINT, endpoint);
String region = S3Properties.getRegionOfEndpoint(endpoint);
props.put(GlueProperties.REGION, region);
props.put(AWSGlueConfig.AWS_REGION, region);
if (credential.isWhole()) {
props.put(AWSGlueConfig.AWS_GLUE_ACCESS_KEY, credential.getAccessKey());
props.put(AWSGlueConfig.AWS_GLUE_SECRET_KEY, credential.getSecretKey());
}
if (credential.isTemporary()) {
props.put(AWSGlueConfig.AWS_GLUE_SESSION_TOKEN, credential.getSessionToken());
}
} else {
// compatible with old version, deprecated in the future version
// put GlueProperties to map if origin key found.
if (props.containsKey(AWSGlueConfig.AWS_GLUE_ENDPOINT)) {
String endpoint = props.get(AWSGlueConfig.AWS_GLUE_ENDPOINT);
props.put(GlueProperties.ENDPOINT, endpoint);
if (props.containsKey(AWSGlueConfig.AWS_GLUE_ACCESS_KEY)) {
props.put(GlueProperties.ACCESS_KEY, props.get(AWSGlueConfig.AWS_GLUE_ACCESS_KEY));
}
if (props.containsKey(AWSGlueConfig.AWS_GLUE_SECRET_KEY)) {
props.put(GlueProperties.SECRET_KEY, props.get(AWSGlueConfig.AWS_GLUE_SECRET_KEY));
}
if (props.containsKey(AWSGlueConfig.AWS_GLUE_SESSION_TOKEN)) {
props.put(GlueProperties.SESSION_TOKEN, props.get(AWSGlueConfig.AWS_GLUE_SESSION_TOKEN));
}
}
}
// set s3 client credential
// https://docs.aws.amazon.com/general/latest/gr/s3.html
// Convert:
// (
// "glue.region" = "us-east-1",
// "glue.access_key" = "xx",
// "glue.secret_key" = "yy"
// )
// To:
// (
// "s3.region" = "us-east-1",
// "s3.endpoint" = "s3.us-east-1.amazonaws.com"
// "s3.access_key" = "xx",
// "s3.secret_key" = "yy"
// )
String endpoint = props.get(GlueProperties.ENDPOINT);
String region = props.getOrDefault(GlueProperties.REGION, S3Properties.getRegionOfEndpoint(endpoint));
if (!Strings.isNullOrEmpty(region)) {
props.put(S3Properties.REGION, region);
String s3Endpoint = "s3." + region + ".amazonaws.com";
if (isGlueIceberg) {
s3Endpoint = "https://" + s3Endpoint;
}
props.put(S3Properties.ENDPOINT, s3Endpoint);
}
if (credential.isWhole()) {
props.put(S3Properties.ACCESS_KEY, credential.getAccessKey());
props.put(S3Properties.SECRET_KEY, credential.getSecretKey());
}
if (credential.isTemporary()) {
props.put(S3Properties.SESSION_TOKEN, credential.getSessionToken());
}
return props;
}
}

View File

@ -0,0 +1,69 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property;
import org.apache.doris.datasource.property.constants.CosProperties;
import org.apache.doris.datasource.property.constants.ObsProperties;
import org.apache.doris.datasource.property.constants.OssProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
import java.util.HashMap;
import java.util.Map;
public class S3ClientBEProperties {
/**
* convert FE properties to BE S3 client properties
* On BE, should use properties like AWS_XXX.
*/
public static Map<String, String> getBeFSProperties(Map<String, String> properties) {
if (properties.containsKey(S3Properties.ENDPOINT)) {
// s3,oss,cos,obs use this.
return getBeAWSPropertiesFromS3(properties);
} else if (properties.containsKey(ObsProperties.ENDPOINT)
|| properties.containsKey(OssProperties.ENDPOINT)
|| properties.containsKey(CosProperties.ENDPOINT)) {
return getBeAWSPropertiesFromS3(S3Properties.prefixToS3(properties));
}
return properties;
}
private static Map<String, String> getBeAWSPropertiesFromS3(Map<String, String> properties) {
Map<String, String> beProperties = new HashMap<>();
String endpoint = properties.get(S3Properties.ENDPOINT);
beProperties.put(S3Properties.Env.ENDPOINT, endpoint);
String region = S3Properties.getRegionOfEndpoint(endpoint);
beProperties.put(S3Properties.Env.REGION, properties.getOrDefault(S3Properties.REGION, region));
if (properties.containsKey(S3Properties.ACCESS_KEY)) {
beProperties.put(S3Properties.Env.ACCESS_KEY, properties.get(S3Properties.ACCESS_KEY));
}
if (properties.containsKey(S3Properties.SECRET_KEY)) {
beProperties.put(S3Properties.Env.SECRET_KEY, properties.get(S3Properties.SECRET_KEY));
}
if (properties.containsKey(S3Properties.SESSION_TOKEN)) {
beProperties.put(S3Properties.Env.TOKEN, properties.get(S3Properties.SESSION_TOKEN));
}
if (properties.containsKey(S3Properties.ROOT_PATH)) {
beProperties.put(S3Properties.Env.ROOT_PATH, properties.get(S3Properties.ROOT_PATH));
}
if (properties.containsKey(S3Properties.BUCKET)) {
beProperties.put(S3Properties.Env.BUCKET, properties.get(S3Properties.BUCKET));
}
return beProperties;
}
}

View File

@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property.constants;
import org.apache.doris.datasource.credentials.CloudCredential;
import java.util.Map;
public class BaseProperties {
public static CloudCredential getCloudCredential(Map<String, String> props,
String accessKeyName,
String secretKeyName,
String sessionTokenName) {
CloudCredential credential = new CloudCredential();
credential.setAccessKey(props.getOrDefault(accessKeyName, ""));
credential.setSecretKey(props.getOrDefault(secretKeyName, ""));
credential.setSessionToken(props.getOrDefault(sessionTokenName, ""));
return credential;
}
public static CloudCredential getCompatibleCredential(Map<String, String> props) {
// Compatible with older versions.
return getCloudCredential(props, S3Properties.Env.ACCESS_KEY, S3Properties.Env.SECRET_KEY,
S3Properties.Env.TOKEN);
}
}

View File

@ -0,0 +1,86 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property.constants;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.common.Config;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
public class BosProperties {
private static final Logger LOG = LoggerFactory.getLogger(BosProperties.class);
private static final String BOS_ENDPOINT = "bos_endpoint";
private static final String BOS_ACCESS_KEY = "bos_accesskey";
private static final String BOS_SECRET_ACCESS_KEY = "bos_secret_accesskey";
public static boolean tryConvertBosToS3(Map<String, String> properties, StorageBackend.StorageType storageType) {
if (!Config.enable_access_file_without_broker || storageType != StorageBackend.StorageType.BROKER) {
return false;
}
CaseInsensitiveMap ciProperties = new CaseInsensitiveMap();
ciProperties.putAll(properties);
if (StringUtils.isNotEmpty(ciProperties.get(BOS_ENDPOINT).toString())
&& StringUtils.isNotEmpty(ciProperties.get(BOS_ACCESS_KEY).toString())
&& StringUtils.isNotEmpty(ciProperties.get(BOS_SECRET_ACCESS_KEY).toString())) {
// bos endpoint like http[s]://gz.bcebos.com, we want to extract region gz,
// and convert to s3 endpoint http[s]://s3.gz.bcebos.com
String bosEndpiont = ciProperties.get(BOS_ENDPOINT).toString();
try {
URI uri = new URI(bosEndpiont);
String host = uri.getHost();
String[] hostSplit = host.split("\\.");
if (hostSplit.length < 3) {
return false;
}
String region = hostSplit[0];
String s3Endpoint = new URIBuilder(uri).setHost("s3." + host).build().toString();
properties.clear();
properties.put(S3Properties.Env.ENDPOINT, s3Endpoint);
properties.put(S3Properties.Env.REGION, region);
properties.put(S3Properties.Env.ACCESS_KEY, ciProperties.get(BOS_ACCESS_KEY).toString());
properties.put(S3Properties.Env.SECRET_KEY, ciProperties.get(BOS_SECRET_ACCESS_KEY).toString());
LOG.info("skip BROKER and access S3 directly.");
return true;
} catch (URISyntaxException e) {
LOG.warn(BOS_ENDPOINT + ": " + bosEndpiont + " is invalid.");
}
}
return false;
}
public static String convertPathToS3(String path) {
try {
URI orig = new URI(path);
URI s3url = new URI("s3", orig.getRawAuthority(),
orig.getRawPath(), orig.getRawQuery(), orig.getRawFragment());
return s3url.toString();
} catch (URISyntaxException e) {
return path;
}
}
}

View File

@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property.constants;
import org.apache.doris.datasource.credentials.CloudCredential;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class CosProperties extends BaseProperties {
public static final String COS_PREFIX = "cos.";
public static final String COS_FS_PREFIX = "fs.cos";
public static final String ENDPOINT = "cos.endpoint";
public static final String ACCESS_KEY = "cos.access_key";
public static final String SECRET_KEY = "cos.secret_key";
public static final String REGION = "cos.region";
public static final String SESSION_TOKEN = "cos.session_token";
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT, ACCESS_KEY, SECRET_KEY);
public static CloudCredential getCredential(Map<String, String> props) {
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
}
}

View File

@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property.constants;
import org.apache.doris.datasource.credentials.CloudCredential;
import java.util.Map;
public class DLFProperties extends BaseProperties {
public static final String ACCESS_PUBLIC = "dlf.access.public";
public static final String UID = "dlf.uid";
public static final String PROXY_MODE = "dlf.proxy.mode";
public static final String ENDPOINT = "dlf.endpoint";
public static final String ACCESS_KEY = "dlf.access_key";
public static final String SECRET_KEY = "dlf.secret_key";
public static final String REGION = "dlf.region";
public static final String SESSION_TOKEN = "dlf.session_token";
public static class Site {
public static final String ACCESS_PUBLIC = "dlf.catalog.accessPublic";
}
public static CloudCredential getCredential(Map<String, String> props) {
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
}
}

View File

@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property.constants;
import org.apache.doris.datasource.credentials.CloudCredential;
import com.amazonaws.glue.catalog.util.AWSGlueConfig;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class GlueProperties extends BaseProperties {
public static final String ENDPOINT = "glue.endpoint";
public static final String REGION = "glue.region";
public static final String ACCESS_KEY = "glue.access_key";
public static final String SECRET_KEY = "glue.secret_key";
public static final String SESSION_TOKEN = "glue.session_token";
public static final List<String> META_KEYS = Arrays.asList(AWSGlueConfig.AWS_GLUE_ENDPOINT,
AWSGlueConfig.AWS_REGION, AWSGlueConfig.AWS_GLUE_ACCESS_KEY, AWSGlueConfig.AWS_GLUE_SECRET_KEY,
AWSGlueConfig.AWS_GLUE_SESSION_TOKEN);
public static CloudCredential getCredential(Map<String, String> props) {
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
}
}

View File

@ -0,0 +1,32 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property.constants;
import java.util.Collections;
import java.util.List;
public class HMSProperties {
public static final String HIVE_METASTORE_TYPE = "hive.metastore.type";
public static final String DLF_TYPE = "dlf";
public static final String GLUE_TYPE = "glue";
public static final String HIVE_VERSION = "hive.version";
// required
public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
public static final List<String> REQUIRED_FIELDS = Collections.singletonList(HMSProperties.HIVE_METASTORE_URIS);
}

View File

@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property.constants;
import org.apache.doris.datasource.credentials.CloudCredential;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class ObsProperties extends BaseProperties {
public static final String OBS_PREFIX = "obs.";
public static final String OBS_FS_PREFIX = "fs.obs";
public static final String ENDPOINT = "obs.endpoint";
public static final String REGION = "obs.region";
public static final String ACCESS_KEY = "obs.access_key";
public static final String SECRET_KEY = "obs.secret_key";
public static final String SESSION_TOKEN = "obs.session_token";
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT, ACCESS_KEY, SECRET_KEY);
public static class FS {
public static final String SESSION_TOKEN = "fs.obs.session.token";
public static final String IMPL_DISABLE_CACHE = "fs.obs.impl.disable.cache";
}
public static CloudCredential getCredential(Map<String, String> props) {
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
}
}

View File

@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property.constants;
import org.apache.doris.datasource.credentials.CloudCredential;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class OssProperties extends BaseProperties {
public static final String OSS_PREFIX = "oss.";
public static final String OSS_FS_PREFIX = "fs.oss";
public static final String ENDPOINT = "oss.endpoint";
public static final String REGION = "oss.region";
public static final String ACCESS_KEY = "oss.access_key";
public static final String SECRET_KEY = "oss.secret_key";
public static final String SESSION_TOKEN = "oss.session_token";
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT, ACCESS_KEY, SECRET_KEY);
public static CloudCredential getCredential(Map<String, String> props) {
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
}
}

View File

@ -0,0 +1,234 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.property.constants;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.credentials.CloudCredential;
import org.apache.doris.datasource.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.datasource.credentials.DataLakeAWSCredentialsProvider;
import org.apache.doris.thrift.TS3StorageParam;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
import org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider;
import org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class S3Properties extends BaseProperties {
public static final String S3_PREFIX = "s3.";
public static final String S3_FS_PREFIX = "fs.s3";
public static final String CREDENTIALS_PROVIDER = "s3.credentials.provider";
public static final String ENDPOINT = "s3.endpoint";
public static final String REGION = "s3.region";
public static final String ACCESS_KEY = "s3.access_key";
public static final String SECRET_KEY = "s3.secret_key";
public static final String SESSION_TOKEN = "s3.session_token";
public static final String MAX_CONNECTIONS = "s3.connection.maximum";
public static final String REQUEST_TIMEOUT_MS = "s3.connection.request.timeout";
public static final String CONNECTION_TIMEOUT_MS = "s3.connection.timeout";
// required by storage policy
public static final String ROOT_PATH = "s3.root.path";
public static final String BUCKET = "s3.bucket";
public static final String VALIDITY_CHECK = "s3_validity_check";
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT, ACCESS_KEY, SECRET_KEY);
public static final List<String> TVF_REQUIRED_FIELDS = Arrays.asList(ACCESS_KEY, SECRET_KEY);
public static final List<String> AWS_CREDENTIALS_PROVIDERS = Arrays.asList(
DataLakeAWSCredentialsProvider.class.getName(),
TemporaryAWSCredentialsProvider.class.getName(),
SimpleAWSCredentialsProvider.class.getName(),
EnvironmentVariableCredentialsProvider.class.getName(),
IAMInstanceCredentialsProvider.class.getName());
public static Map<String, String> credentialToMap(CloudCredentialWithEndpoint credential) {
Map<String, String> resMap = new HashMap<>();
resMap.put(S3Properties.ENDPOINT, credential.getEndpoint());
resMap.put(S3Properties.REGION, credential.getRegion());
if (credential.isWhole()) {
resMap.put(S3Properties.ACCESS_KEY, credential.getAccessKey());
resMap.put(S3Properties.SECRET_KEY, credential.getSecretKey());
}
if (credential.isTemporary()) {
resMap.put(S3Properties.SESSION_TOKEN, credential.getSessionToken());
}
return resMap;
}
public static class Env {
public static final String PROPERTIES_PREFIX = "AWS";
// required
public static final String ENDPOINT = "AWS_ENDPOINT";
public static final String REGION = "AWS_REGION";
public static final String ACCESS_KEY = "AWS_ACCESS_KEY";
public static final String SECRET_KEY = "AWS_SECRET_KEY";
public static final String TOKEN = "AWS_TOKEN";
// required by storage policy
public static final String ROOT_PATH = "AWS_ROOT_PATH";
public static final String BUCKET = "AWS_BUCKET";
// optional
public static final String MAX_CONNECTIONS = "AWS_MAX_CONNECTIONS";
public static final String REQUEST_TIMEOUT_MS = "AWS_REQUEST_TIMEOUT_MS";
public static final String CONNECTION_TIMEOUT_MS = "AWS_CONNECTION_TIMEOUT_MS";
public static final String DEFAULT_MAX_CONNECTIONS = "50";
public static final String DEFAULT_REQUEST_TIMEOUT_MS = "3000";
public static final String DEFAULT_CONNECTION_TIMEOUT_MS = "1000";
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT, REGION, ACCESS_KEY, SECRET_KEY);
public static final List<String> FS_KEYS = Arrays.asList(ENDPOINT, REGION, ACCESS_KEY, SECRET_KEY, TOKEN,
ROOT_PATH, BUCKET, MAX_CONNECTIONS, REQUEST_TIMEOUT_MS, CONNECTION_TIMEOUT_MS);
}
public static CloudCredential getCredential(Map<String, String> props) {
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
}
public static CloudCredentialWithEndpoint getEnvironmentCredentialWithEndpoint(Map<String, String> props) {
CloudCredential credential = getCloudCredential(props, Env.ACCESS_KEY, Env.SECRET_KEY,
Env.TOKEN);
if (!props.containsKey(Env.ENDPOINT)) {
throw new IllegalArgumentException("Missing 'AWS_ENDPOINT' property. ");
}
String endpoint = props.get(Env.ENDPOINT);
String region = props.getOrDefault(S3Properties.REGION, S3Properties.getRegionOfEndpoint(endpoint));
return new CloudCredentialWithEndpoint(endpoint, region, credential);
}
public static String getRegionOfEndpoint(String endpoint) {
String[] endpointSplit = endpoint.split("\\.");
if (endpointSplit.length < 2) {
return null;
}
return endpointSplit[1];
}
public static Map<String, String> prefixToS3(Map<String, String> properties) {
Map<String, String> s3Properties = Maps.newHashMap();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(OssProperties.OSS_PREFIX)) {
String s3Key = entry.getKey().replace(OssProperties.OSS_PREFIX, S3Properties.S3_PREFIX);
s3Properties.put(s3Key, entry.getValue());
} else if (entry.getKey().startsWith(CosProperties.COS_PREFIX)) {
String s3Key = entry.getKey().replace(CosProperties.COS_PREFIX, S3Properties.S3_PREFIX);
s3Properties.put(s3Key, entry.getValue());
} else if (entry.getKey().startsWith(ObsProperties.OBS_PREFIX)) {
String s3Key = entry.getKey().replace(ObsProperties.OBS_PREFIX, S3Properties.S3_PREFIX);
s3Properties.put(s3Key, entry.getValue());
}
}
return s3Properties;
}
public static Map<String, String> requiredS3TVFProperties(Map<String, String> properties)
throws AnalysisException {
try {
for (String field : S3Properties.TVF_REQUIRED_FIELDS) {
checkRequiredProperty(properties, field);
}
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}
return properties;
}
public static void requiredS3Properties(Map<String, String> properties) throws DdlException {
for (String field : S3Properties.REQUIRED_FIELDS) {
checkRequiredProperty(properties, field);
}
}
public static void requiredS3PingProperties(Map<String, String> properties) throws DdlException {
requiredS3Properties(properties);
checkRequiredProperty(properties, S3Properties.BUCKET);
}
public static void checkRequiredProperty(Map<String, String> properties, String propertyKey)
throws DdlException {
String value = properties.get(propertyKey);
if (Strings.isNullOrEmpty(value)) {
throw new DdlException("Missing [" + propertyKey + "] in properties.");
}
}
public static void optionalS3Property(Map<String, String> properties) {
properties.putIfAbsent(S3Properties.MAX_CONNECTIONS, S3Properties.Env.DEFAULT_MAX_CONNECTIONS);
properties.putIfAbsent(S3Properties.REQUEST_TIMEOUT_MS, S3Properties.Env.DEFAULT_REQUEST_TIMEOUT_MS);
properties.putIfAbsent(S3Properties.CONNECTION_TIMEOUT_MS, S3Properties.Env.DEFAULT_CONNECTION_TIMEOUT_MS);
// compatible with old version
properties.putIfAbsent(S3Properties.Env.MAX_CONNECTIONS, S3Properties.Env.DEFAULT_MAX_CONNECTIONS);
properties.putIfAbsent(S3Properties.Env.REQUEST_TIMEOUT_MS, S3Properties.Env.DEFAULT_REQUEST_TIMEOUT_MS);
properties.putIfAbsent(S3Properties.Env.CONNECTION_TIMEOUT_MS, S3Properties.Env.DEFAULT_CONNECTION_TIMEOUT_MS);
}
public static void convertToStdProperties(Map<String, String> properties) {
properties.putIfAbsent(S3Properties.ENDPOINT, properties.get(S3Properties.Env.ENDPOINT));
properties.putIfAbsent(S3Properties.REGION, properties.get(S3Properties.Env.REGION));
properties.putIfAbsent(S3Properties.ACCESS_KEY, properties.get(S3Properties.Env.ACCESS_KEY));
properties.putIfAbsent(S3Properties.SECRET_KEY, properties.get(S3Properties.Env.SECRET_KEY));
if (properties.containsKey(S3Properties.Env.TOKEN)) {
properties.putIfAbsent(S3Properties.SESSION_TOKEN, properties.get(S3Properties.Env.TOKEN));
}
if (properties.containsKey(S3Properties.Env.MAX_CONNECTIONS)) {
properties.putIfAbsent(S3Properties.MAX_CONNECTIONS, properties.get(S3Properties.Env.MAX_CONNECTIONS));
}
if (properties.containsKey(S3Properties.Env.REQUEST_TIMEOUT_MS)) {
properties.putIfAbsent(S3Properties.REQUEST_TIMEOUT_MS,
properties.get(S3Properties.Env.REQUEST_TIMEOUT_MS));
}
if (properties.containsKey(S3Properties.Env.CONNECTION_TIMEOUT_MS)) {
properties.putIfAbsent(S3Properties.CONNECTION_TIMEOUT_MS,
properties.get(S3Properties.Env.CONNECTION_TIMEOUT_MS));
}
if (properties.containsKey(S3Properties.Env.ROOT_PATH)) {
properties.putIfAbsent(S3Properties.ROOT_PATH, properties.get(S3Properties.Env.ROOT_PATH));
}
if (properties.containsKey(S3Properties.Env.BUCKET)) {
properties.putIfAbsent(S3Properties.BUCKET, properties.get(S3Properties.Env.BUCKET));
}
}
public static TS3StorageParam getS3TStorageParam(Map<String, String> properties) {
TS3StorageParam s3Info = new TS3StorageParam();
s3Info.setEndpoint(properties.get(S3Properties.ENDPOINT));
s3Info.setRegion(properties.get(S3Properties.REGION));
s3Info.setAk(properties.get(S3Properties.ACCESS_KEY));
s3Info.setSk(properties.get(S3Properties.SECRET_KEY));
s3Info.setRootPath(properties.get(S3Properties.ROOT_PATH));
s3Info.setBucket(properties.get(S3Properties.BUCKET));
String maxConnections = properties.get(S3Properties.MAX_CONNECTIONS);
s3Info.setMaxConn(Integer.parseInt(maxConnections == null
? S3Properties.Env.DEFAULT_MAX_CONNECTIONS : maxConnections));
String requestTimeoutMs = properties.get(S3Properties.REQUEST_TIMEOUT_MS);
s3Info.setMaxConn(Integer.parseInt(requestTimeoutMs == null
? S3Properties.Env.DEFAULT_REQUEST_TIMEOUT_MS : requestTimeoutMs));
String connTimeoutMs = properties.get(S3Properties.CONNECTION_TIMEOUT_MS);
s3Info.setMaxConn(Integer.parseInt(connTimeoutMs == null
? S3Properties.Env.DEFAULT_CONNECTION_TIMEOUT_MS : connTimeoutMs));
return s3Info;
}
}

View File

@ -21,11 +21,11 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.Resource.ReferenceType;
import org.apache.doris.catalog.S3Resource;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.qe.ShowResultSetMetaData;
import com.google.common.base.Strings;
@ -193,13 +193,13 @@ public class StoragePolicy extends Policy {
throw new AnalysisException("current storage policy just support resource type S3_COOLDOWN");
}
Map<String, String> properties = resource.getCopiedProperties();
if (!properties.containsKey(S3Resource.S3_ROOT_PATH)) {
if (!properties.containsKey(S3Properties.ROOT_PATH)) {
throw new AnalysisException(String.format(
"Missing [%s] in '%s' resource", S3Resource.S3_ROOT_PATH, storageResource));
"Missing [%s] in '%s' resource", S3Properties.ROOT_PATH, storageResource));
}
if (!properties.containsKey(S3Resource.S3_BUCKET)) {
if (!properties.containsKey(S3Properties.BUCKET)) {
throw new AnalysisException(String.format(
"Missing [%s] in '%s' resource", S3Resource.S3_BUCKET, storageResource));
"Missing [%s] in '%s' resource", S3Properties.BUCKET, storageResource));
}
return resource;
}

View File

@ -17,8 +17,8 @@
package org.apache.doris.statistics;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.util.StatisticsUtil;
@ -73,7 +73,7 @@ public class IcebergAnalysisTask extends HMSAnalysisTask {
}
hiveCatalog.setConf(conf);
Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, table.getMetastoreUri());
catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, table.getMetastoreUri());
catalogProperties.put("uri", table.getMetastoreUri());
hiveCatalog.initialize("hive", catalogProperties);
return hiveCatalog.loadTable(TableIdentifier.of(table.getDbName(), table.getName()));

View File

@ -20,7 +20,6 @@ package org.apache.doris.tablefunction;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
@ -29,6 +28,7 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.external.ExternalFileScanNode;
@ -153,7 +153,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
if (fileType == TFileType.FILE_HDFS) {
return locationProperties.get(HdfsTableValuedFunction.HADOOP_FS_NAME);
} else if (fileType == TFileType.FILE_S3) {
return locationProperties.get(S3TableValuedFunction.S3_ENDPOINT);
return locationProperties.get(S3Properties.ENDPOINT);
}
return "";
}
@ -324,7 +324,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
// get one BE address
TNetworkAddress address = null;
columns = Lists.newArrayList();
for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) {
for (Backend be : org.apache.doris.catalog.Env.getCurrentSystemInfo().getIdToBackend().values()) {
if (be.isAlive()) {
address = new TNetworkAddress(be.getIp(), be.getBrpcPort());
break;

View File

@ -48,10 +48,7 @@ public class IcebergTableValuedFunction extends MetadataTableValuedFunction {
private static final String TABLE = "table";
private static final String QUERY_TYPE = "query_type";
private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(TABLE)
.add(QUERY_TYPE)
.build();
private static final ImmutableSet<String> PROPERTIES_SET = ImmutableSet.of(TABLE, QUERY_TYPE);
private TIcebergQueryType queryType;

View File

@ -19,11 +19,11 @@ package org.apache.doris.tablefunction;
import org.apache.doris.alter.DecommissionType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.cluster.Cluster;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.property.constants.HMSProperties;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TBackendsMetadataParams;
@ -348,7 +348,7 @@ public class MetadataGenerator {
}
hiveCatalog.setConf(conf);
Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, catalog.getHiveMetastoreUris());
catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, catalog.getHiveMetastoreUris());
catalogProperties.put("uri", catalog.getHiveMetastoreUris());
hiveCatalog.initialize("hive", catalogProperties);
return hiveCatalog.loadTable(TableIdentifier.of(db, tbl));

View File

@ -22,14 +22,14 @@ import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3URI;
import org.apache.doris.datasource.credentials.CloudCredentialWithEndpoint;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.thrift.TFileType;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.Map;
/**
@ -44,56 +44,68 @@ import java.util.Map;
* https://bucket.us-east-1.amazonaws.com/csv/taxi.csv with "use_path_style"="false"
*/
public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
private static final Logger LOG = LogManager.getLogger(BrokerDesc.class);
public static final String NAME = "s3";
public static final String S3_URI = "uri";
public static final String S3_AK = "AWS_ACCESS_KEY";
public static final String S3_SK = "AWS_SECRET_KEY";
public static final String S3_ENDPOINT = "AWS_ENDPOINT";
public static final String S3_REGION = "AWS_REGION";
private static final String AK = "access_key";
private static final String SK = "secret_key";
private static final String USE_PATH_STYLE = "use_path_style";
private static final String REGION = "region";
private static final ImmutableSet<String> DEPRECATED_KEYS =
ImmutableSet.of("access_key", "secret_key", "session_token", "region", S3Properties.REGION);
private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(S3_URI)
.add(AK)
.add(SK)
.add(USE_PATH_STYLE)
.add(REGION)
.build();
private S3URI s3uri;
private String s3AK;
private String s3SK;
private String endPoint;
private static final ImmutableSet<String> OPTIONAL_KEYS =
ImmutableSet.of(S3Properties.SESSION_TOKEN, PropertyConverter.USE_PATH_STYLE);
private static final ImmutableSet<String> PROPERTIES_SET = ImmutableSet.<String>builder()
.add(S3_URI)
.addAll(DEPRECATED_KEYS)
.addAll(S3Properties.TVF_REQUIRED_FIELDS)
.addAll(OPTIONAL_KEYS)
.build();
private final S3URI s3uri;
private final boolean forceVirtualHosted;
private String virtualBucket;
private boolean forceVirtualHosted;
public S3TableValuedFunction(Map<String, String> params) throws AnalysisException {
Map<String, String> validParams = new CaseInsensitiveMap();
for (String key : params.keySet()) {
if (!PROPERTIES_SET.contains(key.toLowerCase()) && !FILE_FORMAT_PROPERTIES.contains(key.toLowerCase())) {
throw new AnalysisException(key + " is invalid property");
Map<String, String> tvfParams = getValidParams(params);
forceVirtualHosted = isVirtualHosted(tvfParams);
s3uri = getS3Uri(tvfParams);
String endpoint = getEndpointFromUri();
CloudCredentialWithEndpoint credential = new CloudCredentialWithEndpoint(endpoint,
S3Properties.getRegionOfEndpoint(endpoint),
tvfParams.get(S3Properties.ACCESS_KEY),
tvfParams.get(S3Properties.SECRET_KEY));
if (tvfParams.containsKey(S3Properties.SESSION_TOKEN)) {
credential.setSessionToken(tvfParams.get(S3Properties.SESSION_TOKEN));
}
parseProperties(tvfParams);
// set S3 location properties
// these five properties is necessary, no one can be lost.
locationProperties = S3Properties.credentialToMap(credential);
String usePathStyle = tvfParams.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false");
locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle);
parseFile();
}
private static Map<String, String> getValidParams(Map<String, String> params) throws AnalysisException {
if (!params.containsKey(S3_URI)) {
throw new AnalysisException("Missing required property: " + S3_URI);
}
Map<String, String> validParams = new HashMap<>();
for (Map.Entry<String, String> entry : params.entrySet()) {
String key = entry.getKey();
String lowerKey = key.toLowerCase();
if (!PROPERTIES_SET.contains(lowerKey) && !FILE_FORMAT_PROPERTIES.contains(lowerKey)) {
throw new AnalysisException("Invalid property: " + key);
}
validParams.put(key, params.get(key));
if (DEPRECATED_KEYS.contains(lowerKey)) {
lowerKey = S3Properties.S3_PREFIX + lowerKey;
}
validParams.put(lowerKey, entry.getValue());
}
return S3Properties.requiredS3TVFProperties(validParams);
}
String originUri = validParams.getOrDefault(S3_URI, "");
if (originUri.toLowerCase().startsWith("s3")) {
// s3 protocol, default virtual-hosted style
forceVirtualHosted = true;
} else {
// not s3 protocol, forceVirtualHosted is determined by USE_PATH_STYLE.
forceVirtualHosted = !Boolean.valueOf(validParams.get(USE_PATH_STYLE)).booleanValue();
}
try {
s3uri = S3URI.create(validParams.get(S3_URI), forceVirtualHosted);
} catch (UserException e) {
throw new AnalysisException("parse s3 uri failed, uri = " + validParams.get(S3_URI), e);
}
private String getEndpointFromUri() throws AnalysisException {
if (forceVirtualHosted) {
// s3uri.getVirtualBucket() is: virtualBucket.endpoint, Eg:
// uri: http://my_bucket.cos.ap-beijing.myqcloud.com/file.txt
@ -102,29 +114,32 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
String[] fileds = s3uri.getVirtualBucket().split("\\.", 2);
virtualBucket = fileds[0];
if (fileds.length > 1) {
endPoint = fileds[1];
return fileds[1];
} else {
throw new AnalysisException("can not parse endpoint, please check uri.");
}
} else {
endPoint = s3uri.getBucketScheme();
return s3uri.getBucketScheme();
}
s3AK = validParams.getOrDefault(AK, "");
s3SK = validParams.getOrDefault(SK, "");
String usePathStyle = validParams.getOrDefault(USE_PATH_STYLE, "false");
}
parseProperties(validParams);
private boolean isVirtualHosted(Map<String, String> validParams) {
String originUri = validParams.getOrDefault(S3_URI, "");
if (originUri.toLowerCase().startsWith("s3")) {
// s3 protocol, default virtual-hosted style
return true;
} else {
// not s3 protocol, forceVirtualHosted is determined by USE_PATH_STYLE.
return !Boolean.parseBoolean(validParams.get(PropertyConverter.USE_PATH_STYLE));
}
}
// set S3 location properties
// these five properties is necessary, no one can be lost.
locationProperties = Maps.newHashMap();
locationProperties.put(S3_ENDPOINT, endPoint);
locationProperties.put(S3_AK, s3AK);
locationProperties.put(S3_SK, s3SK);
locationProperties.put(S3_REGION, validParams.getOrDefault(REGION, ""));
locationProperties.put(USE_PATH_STYLE, usePathStyle);
parseFile();
private S3URI getS3Uri(Map<String, String> validParams) throws AnalysisException {
try {
return S3URI.create(validParams.get(S3_URI), forceVirtualHosted);
} catch (UserException e) {
throw new AnalysisException("parse s3 uri failed, uri = " + validParams.get(S3_URI), e);
}
}
// =========== implement abstract methods of ExternalFileTableValuedFunction =================

View File

@ -20,11 +20,10 @@ package org.apache.doris.task;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.Resource.ResourceType;
import org.apache.doris.catalog.S3Resource;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.thrift.TPushStoragePolicyReq;
import org.apache.doris.thrift.TS3StorageParam;
import org.apache.doris.thrift.TStoragePolicy;
import org.apache.doris.thrift.TStorageResource;
import org.apache.doris.thrift.TTaskType;
@ -86,25 +85,8 @@ public class PushStoragePolicyTask extends AgentTask {
item.setId(r.getId());
item.setName(r.getName());
item.setVersion(r.getVersion());
TS3StorageParam s3Info = new TS3StorageParam();
S3Resource s3Resource = (S3Resource) r;
s3Info.setEndpoint(s3Resource.getProperty(S3Resource.S3_ENDPOINT));
s3Info.setRegion(s3Resource.getProperty(S3Resource.S3_REGION));
s3Info.setAk(s3Resource.getProperty(S3Resource.S3_ACCESS_KEY));
s3Info.setSk(s3Resource.getProperty(S3Resource.S3_SECRET_KEY));
s3Info.setRootPath(s3Resource.getProperty(S3Resource.S3_ROOT_PATH));
s3Info.setBucket(s3Resource.getProperty(S3Resource.S3_BUCKET));
String maxConnections = s3Resource.getProperty(S3Resource.S3_MAX_CONNECTIONS);
s3Info.setMaxConn(Integer.parseInt(maxConnections == null
? S3Resource.DEFAULT_S3_MAX_CONNECTIONS : maxConnections));
String requestTimeoutMs = s3Resource.getProperty(S3Resource.S3_REQUEST_TIMEOUT_MS);
s3Info.setMaxConn(Integer.parseInt(requestTimeoutMs == null
? S3Resource.DEFAULT_S3_REQUEST_TIMEOUT_MS : requestTimeoutMs));
String connTimeoutMs = s3Resource.getProperty(S3Resource.S3_CONNECTION_TIMEOUT_MS);
s3Info.setMaxConn(Integer.parseInt(connTimeoutMs == null
? S3Resource.DEFAULT_S3_CONNECTION_TIMEOUT_MS : connTimeoutMs));
item.setS3StorageParam(S3Properties.getS3TStorageParam(r.getCopiedProperties()));
r.readUnlock();
item.setS3StorageParam(s3Info);
tStorageResources.add(item);
});
ret.setResource(tStorageResources);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.datasource.hive.HiveVersionUtil;
import org.apache.doris.datasource.hive.HiveVersionUtil.HiveVersion;
import org.apache.doris.datasource.property.constants.HMSProperties;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@ -340,7 +341,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
this.conf = new Configuration(conf);
}
hiveVersion = HiveVersionUtil.getVersion(conf.get(HMSResource.HIVE_VERSION));
hiveVersion = HiveVersionUtil.getVersion(conf.get(HMSProperties.HIVE_VERSION));
version = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) ? TEST_VERSION : VERSION;
filterHook = loadFilterHooks();

View File

@ -17,7 +17,7 @@
package org.apache.doris.backup;
import org.apache.doris.catalog.S3Resource;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.commons.codec.digest.DigestUtils;
import org.junit.Assert;
@ -58,7 +58,7 @@ public class S3StorageTest {
properties.put("AWS_ACCESS_KEY", System.getenv().getOrDefault("AWS_AK", ""));
properties.put("AWS_SECRET_KEY", System.getenv().getOrDefault("AWS_SK", ""));
properties.put("AWS_ENDPOINT", "http://s3.bj.bcebos.com");
properties.put(S3Resource.USE_PATH_STYLE, "false");
properties.put(PropertyConverter.USE_PATH_STYLE, "false");
properties.put("AWS_REGION", "bj");
storage = new S3Storage(properties);

View File

@ -23,6 +23,7 @@ import org.apache.doris.analysis.CreateResourceStmt;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.PrivPredicate;
@ -64,7 +65,7 @@ public class S3ResourceTest {
public void setUp() {
name = "s3";
type = "s3";
s3Endpoint = "aaa";
s3Endpoint = "http://aaa";
s3Region = "bj";
s3RootPath = "/path/to/root";
s3AccessKey = "xxx";
@ -104,34 +105,34 @@ public class S3ResourceTest {
S3Resource s3Resource = (S3Resource) Resource.fromStmt(stmt);
Assert.assertEquals(name, s3Resource.getName());
Assert.assertEquals(type, s3Resource.getType().name().toLowerCase());
Assert.assertEquals(s3Endpoint, s3Resource.getProperty("AWS_ENDPOINT"));
Assert.assertEquals(s3Region, s3Resource.getProperty("AWS_REGION"));
Assert.assertEquals(s3RootPath, s3Resource.getProperty("AWS_ROOT_PATH"));
Assert.assertEquals(s3AccessKey, s3Resource.getProperty("AWS_ACCESS_KEY"));
Assert.assertEquals(s3SecretKey, s3Resource.getProperty("AWS_SECRET_KEY"));
Assert.assertEquals(s3MaxConnections, s3Resource.getProperty("AWS_MAX_CONNECTIONS"));
Assert.assertEquals(s3ReqTimeoutMs, s3Resource.getProperty("AWS_REQUEST_TIMEOUT_MS"));
Assert.assertEquals(s3ConnTimeoutMs, s3Resource.getProperty("AWS_CONNECTION_TIMEOUT_MS"));
Assert.assertEquals(s3Endpoint, s3Resource.getProperty(S3Properties.ENDPOINT));
Assert.assertEquals(s3Region, s3Resource.getProperty(S3Properties.REGION));
Assert.assertEquals(s3RootPath, s3Resource.getProperty(S3Properties.ROOT_PATH));
Assert.assertEquals(s3AccessKey, s3Resource.getProperty(S3Properties.ACCESS_KEY));
Assert.assertEquals(s3SecretKey, s3Resource.getProperty(S3Properties.SECRET_KEY));
Assert.assertEquals(s3MaxConnections, s3Resource.getProperty(S3Properties.MAX_CONNECTIONS));
Assert.assertEquals(s3ReqTimeoutMs, s3Resource.getProperty(S3Properties.REQUEST_TIMEOUT_MS));
Assert.assertEquals(s3ConnTimeoutMs, s3Resource.getProperty(S3Properties.CONNECTION_TIMEOUT_MS));
// with no default settings
s3Properties.put("AWS_MAX_CONNECTIONS", "100");
s3Properties.put("AWS_REQUEST_TIMEOUT_MS", "2000");
s3Properties.put("AWS_CONNECTION_TIMEOUT_MS", "2000");
s3Properties.put("s3_validity_check", "false");
s3Properties.put(S3Properties.MAX_CONNECTIONS, "100");
s3Properties.put(S3Properties.REQUEST_TIMEOUT_MS, "2000");
s3Properties.put(S3Properties.CONNECTION_TIMEOUT_MS, "2000");
s3Properties.put(S3Properties.VALIDITY_CHECK, "false");
stmt = new CreateResourceStmt(true, false, name, s3Properties);
stmt.analyze(analyzer);
s3Resource = (S3Resource) Resource.fromStmt(stmt);
Assert.assertEquals(name, s3Resource.getName());
Assert.assertEquals(type, s3Resource.getType().name().toLowerCase());
Assert.assertEquals(s3Endpoint, s3Resource.getProperty("AWS_ENDPOINT"));
Assert.assertEquals(s3Region, s3Resource.getProperty("AWS_REGION"));
Assert.assertEquals(s3RootPath, s3Resource.getProperty("AWS_ROOT_PATH"));
Assert.assertEquals(s3AccessKey, s3Resource.getProperty("AWS_ACCESS_KEY"));
Assert.assertEquals(s3SecretKey, s3Resource.getProperty("AWS_SECRET_KEY"));
Assert.assertEquals("100", s3Resource.getProperty("AWS_MAX_CONNECTIONS"));
Assert.assertEquals("2000", s3Resource.getProperty("AWS_REQUEST_TIMEOUT_MS"));
Assert.assertEquals("2000", s3Resource.getProperty("AWS_CONNECTION_TIMEOUT_MS"));
Assert.assertEquals(s3Endpoint, s3Resource.getProperty(S3Properties.ENDPOINT));
Assert.assertEquals(s3Region, s3Resource.getProperty(S3Properties.REGION));
Assert.assertEquals(s3RootPath, s3Resource.getProperty(S3Properties.ROOT_PATH));
Assert.assertEquals(s3AccessKey, s3Resource.getProperty(S3Properties.ACCESS_KEY));
Assert.assertEquals(s3SecretKey, s3Resource.getProperty(S3Properties.SECRET_KEY));
Assert.assertEquals("100", s3Resource.getProperty(S3Properties.MAX_CONNECTIONS));
Assert.assertEquals("2000", s3Resource.getProperty(S3Properties.REQUEST_TIMEOUT_MS));
Assert.assertEquals("2000", s3Resource.getProperty(S3Properties.CONNECTION_TIMEOUT_MS));
}
@Test(expected = DdlException.class)
@ -158,7 +159,8 @@ public class S3ResourceTest {
metaContext.setThreadLocalInfo();
// 1. write
Path path = Files.createFile(Paths.get("./s3Resource"));
// Path path = Files.createFile(Paths.get("./s3Resource"));
Path path = Paths.get("./s3Resource");
DataOutputStream s3Dos = new DataOutputStream(Files.newOutputStream(path));
S3Resource s3Resource1 = new S3Resource("s3_1");
@ -187,14 +189,14 @@ public class S3ResourceTest {
Assert.assertEquals("s3_1", rS3Resource1.getName());
Assert.assertEquals("s3_2", rS3Resource2.getName());
Assert.assertEquals(rS3Resource2.getProperty("AWS_ENDPOINT"), "aaa");
Assert.assertEquals(rS3Resource2.getProperty("AWS_REGION"), "bbb");
Assert.assertEquals(rS3Resource2.getProperty("AWS_ROOT_PATH"), "/path/to/root");
Assert.assertEquals(rS3Resource2.getProperty("AWS_ACCESS_KEY"), "xxx");
Assert.assertEquals(rS3Resource2.getProperty("AWS_SECRET_KEY"), "yyy");
Assert.assertEquals(rS3Resource2.getProperty("AWS_MAX_CONNECTIONS"), "50");
Assert.assertEquals(rS3Resource2.getProperty("AWS_REQUEST_TIMEOUT_MS"), "3000");
Assert.assertEquals(rS3Resource2.getProperty("AWS_CONNECTION_TIMEOUT_MS"), "1000");
Assert.assertEquals(rS3Resource2.getProperty(S3Properties.ENDPOINT), "http://aaa");
Assert.assertEquals(rS3Resource2.getProperty(S3Properties.REGION), "bbb");
Assert.assertEquals(rS3Resource2.getProperty(S3Properties.ROOT_PATH), "/path/to/root");
Assert.assertEquals(rS3Resource2.getProperty(S3Properties.ACCESS_KEY), "xxx");
Assert.assertEquals(rS3Resource2.getProperty(S3Properties.SECRET_KEY), "yyy");
Assert.assertEquals(rS3Resource2.getProperty(S3Properties.MAX_CONNECTIONS), "50");
Assert.assertEquals(rS3Resource2.getProperty(S3Properties.REQUEST_TIMEOUT_MS), "3000");
Assert.assertEquals(rS3Resource2.getProperty(S3Properties.CONNECTION_TIMEOUT_MS), "1000");
// 3. delete
s3Dis.close();