[fix](muti-catalog)convert to s3 path when use aws endpoint (#22784)
Convert to s3 path when use aws endpoint For compatibility, we can also use s3 client to access other cloud by setting s3 endpoint properties
This commit is contained in:
@ -20,6 +20,7 @@ package org.apache.doris.common.util;
|
||||
import org.apache.doris.catalog.HdfsResource;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.datasource.credentials.CloudCredential;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -67,6 +68,12 @@ public class S3Util {
|
||||
|| location.startsWith(FeConstants.FS_PREFIX_BOS);
|
||||
}
|
||||
|
||||
private static boolean isS3EndPoint(String location, Map<String, String> props) {
|
||||
// wide check range for the compatibility of s3 properties
|
||||
return (props.containsKey(S3Properties.ENDPOINT) || props.containsKey(S3Properties.Env.ENDPOINT))
|
||||
&& isObjStorage(location);
|
||||
}
|
||||
|
||||
/**
|
||||
* The converted path is used for FE to get metadata
|
||||
* @param location origin location
|
||||
@ -74,7 +81,8 @@ public class S3Util {
|
||||
*/
|
||||
public static String convertToS3IfNecessary(String location, Map<String, String> props) {
|
||||
LOG.debug("try convert location to s3 prefix: " + location);
|
||||
if (isObjStorageUseS3Client(location)) {
|
||||
// include the check for multi locations and in a table, such as both s3 and hdfs are in a table.
|
||||
if (isS3EndPoint(location, props) || isObjStorageUseS3Client(location)) {
|
||||
int pos = location.indexOf("://");
|
||||
if (pos == -1) {
|
||||
throw new RuntimeException("No '://' found in location: " + location);
|
||||
|
||||
@ -123,11 +123,24 @@ public class PropertyConverter {
|
||||
} else if (props.containsKey(MinioProperties.ENDPOINT)) {
|
||||
return convertToMinioProperties(props, MinioProperties.getCredential(props));
|
||||
} else if (props.containsKey(S3Properties.ENDPOINT)) {
|
||||
CloudCredential credential = S3Properties.getCredential(props);
|
||||
String s3CliEndpoint = props.get(S3Properties.ENDPOINT);
|
||||
if (s3CliEndpoint.contains(CosProperties.COS_PREFIX)) {
|
||||
props.putIfAbsent(CosProperties.ENDPOINT, s3CliEndpoint);
|
||||
// CosN is not compatible with S3, when use s3 properties, will convert to cosn properties.
|
||||
return convertToCOSProperties(props, credential);
|
||||
}
|
||||
return convertToS3Properties(props, S3Properties.getCredential(props));
|
||||
} else if (props.containsKey(S3Properties.Env.ENDPOINT)) {
|
||||
// checkout env in the end
|
||||
// compatible with the s3,obs,oss,cos when they use aws client.
|
||||
return convertToS3EnvProperties(props, S3Properties.getEnvironmentCredentialWithEndpoint(props), false);
|
||||
CloudCredentialWithEndpoint envCredentials = S3Properties.getEnvironmentCredentialWithEndpoint(props);
|
||||
if (envCredentials.getEndpoint().contains(CosProperties.COS_PREFIX)) {
|
||||
props.putIfAbsent(CosProperties.ENDPOINT, envCredentials.getEndpoint());
|
||||
// CosN is not compatible with S3, when use s3 properties, will convert to cosn properties.
|
||||
return convertToCOSProperties(props, envCredentials);
|
||||
}
|
||||
return convertToS3EnvProperties(props, envCredentials, false);
|
||||
}
|
||||
return props;
|
||||
}
|
||||
|
||||
@ -464,12 +464,12 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
return Optional.of(TFileType.FILE_S3);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
|
||||
return Optional.of(TFileType.FILE_HDFS);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_COSN)) {
|
||||
return Optional.of(TFileType.FILE_HDFS);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
|
||||
return Optional.of(TFileType.FILE_LOCAL);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
|
||||
return Optional.of(TFileType.FILE_BROKER);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_COSN)) {
|
||||
return Optional.of(TFileType.FILE_BROKER);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) {
|
||||
return Optional.of(TFileType.FILE_BROKER);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
|
||||
|
||||
Reference in New Issue
Block a user