[fix](multi-catalog)fix minio default region and throw minio error msg, support s3 bucket root path (#21994)
1. check minio region, set default region if user region is not provided, and throw minio error msg 2. support read root path s3://bucket1 3. fix max compute public access
This commit is contained in:
@ -43,11 +43,11 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
|
||||
private String secretKey;
|
||||
@SerializedName(value = "publicAccess")
|
||||
private boolean enablePublicAccess;
|
||||
private static final String odpsUrlTemplate = "http://service.{}.maxcompute.aliyun.com/api";
|
||||
private static final String odpsUrlTemplate = "http://service.{}.maxcompute.aliyun-inc.com/api";
|
||||
private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun-inc.com";
|
||||
|
||||
public MaxComputeExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
|
||||
String comment) {
|
||||
String comment) {
|
||||
super(catalogId, name, InitCatalogLog.Type.MAX_COMPUTE, comment);
|
||||
catalogProperty = new CatalogProperty(resource, props);
|
||||
}
|
||||
@ -77,9 +77,13 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
|
||||
secretKey = credential.getSecretKey();
|
||||
Account account = new AliyunAccount(accessKey, secretKey);
|
||||
this.odps = new Odps(account);
|
||||
odps.setEndpoint(odpsUrlTemplate.replace("{}", region));
|
||||
odps.setDefaultProject(defaultProject);
|
||||
enablePublicAccess = Boolean.parseBoolean(props.getOrDefault(MCProperties.PUBLIC_ACCESS, "false"));
|
||||
String odpsUrl = odpsUrlTemplate.replace("{}", region);
|
||||
if (enablePublicAccess) {
|
||||
odpsUrl = odpsUrl.replace("-inc", "");
|
||||
}
|
||||
odps.setEndpoint(odpsUrl);
|
||||
odps.setDefaultProject(defaultProject);
|
||||
}
|
||||
|
||||
public long getTotalRows(String project, String table) throws TunnelException {
|
||||
@ -87,7 +91,7 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
|
||||
TableTunnel tunnel = new TableTunnel(odps);
|
||||
String tunnelUrl = tunnelUrlTemplate.replace("{}", region);
|
||||
if (enablePublicAccess) {
|
||||
tunnelUrl = tunnelUrlTemplate.replace("-inc", "");
|
||||
tunnelUrl = tunnelUrl.replace("-inc", "");
|
||||
}
|
||||
tunnel.setEndpoint(tunnelUrl);
|
||||
return tunnel.createDownloadSession(project, table).getRecordCount();
|
||||
|
||||
@ -309,8 +309,9 @@ public class PropertyConverter {
|
||||
}
|
||||
|
||||
private static Map<String, String> convertToMinioProperties(Map<String, String> props, CloudCredential credential) {
|
||||
// minio does not have region, use an arbitrary one.
|
||||
props.put(MinioProperties.REGION, "us-east-1");
|
||||
if (!props.containsKey(MinioProperties.REGION)) {
|
||||
props.put(MinioProperties.REGION, MinioProperties.DEFAULT_REGION);
|
||||
}
|
||||
return convertToS3Properties(S3Properties.prefixToS3(props), credential);
|
||||
}
|
||||
|
||||
|
||||
@ -28,15 +28,15 @@ import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class S3ClientBEProperties {
|
||||
|
||||
/**
|
||||
* convert FE properties to BE S3 client properties
|
||||
* On BE, should use properties like AWS_XXX.
|
||||
*/
|
||||
public static Map<String, String> getBeFSProperties(Map<String, String> properties) {
|
||||
if (properties.containsKey(MinioProperties.ENDPOINT)) {
|
||||
// minio does not have region, use an arbitrary one.
|
||||
properties.put(MinioProperties.REGION, "us-east-1");
|
||||
if (!properties.containsKey(MinioProperties.REGION)) {
|
||||
properties.put(MinioProperties.REGION, MinioProperties.DEFAULT_REGION);
|
||||
}
|
||||
return getBeAWSPropertiesFromS3(S3Properties.prefixToS3(properties));
|
||||
} else if (properties.containsKey(S3Properties.ENDPOINT)) {
|
||||
// s3,oss,cos,obs use this.
|
||||
|
||||
@ -31,6 +31,7 @@ public class MinioProperties extends BaseProperties {
|
||||
public static final String ACCESS_KEY = "minio.access_key";
|
||||
public static final String SECRET_KEY = "minio.secret_key";
|
||||
public static final String SESSION_TOKEN = "minio.session_token";
|
||||
public static final String DEFAULT_REGION = "us-east-1";
|
||||
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT, ACCESS_KEY, SECRET_KEY, REGION);
|
||||
|
||||
public static CloudCredential getCredential(Map<String, String> props) {
|
||||
|
||||
@ -126,7 +126,7 @@ public class S3Properties extends BaseProperties {
|
||||
if (endpointSplit.length < 2) {
|
||||
return null;
|
||||
}
|
||||
if (endpointSplit[0].startsWith("oss-")) {
|
||||
if (endpointSplit[0].contains("oss-")) {
|
||||
// compatible with the endpoint: oss-cn-bejing.aliyuncs.com
|
||||
return endpointSplit[0];
|
||||
}
|
||||
|
||||
@ -23,6 +23,7 @@ import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.datasource.property.PropertyConverter;
|
||||
import org.apache.doris.fs.obj.S3ObjStorage;
|
||||
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
@ -86,6 +87,17 @@ public class S3FileSystem extends ObjFileSystem {
|
||||
LOG.info("file not found: " + e.getMessage());
|
||||
return new Status(Status.ErrCode.NOT_FOUND, "file not found: " + e.getMessage());
|
||||
} catch (Exception e) {
|
||||
if (e.getCause() instanceof AmazonS3Exception) {
|
||||
// process minio error msg
|
||||
AmazonS3Exception ea = (AmazonS3Exception) e.getCause();
|
||||
Map<String, String> callbackHeaders = ea.getHttpHeaders();
|
||||
if (callbackHeaders != null && !callbackHeaders.isEmpty()) {
|
||||
String minioErrMsg = callbackHeaders.get("X-Minio-Error-Desc");
|
||||
if (minioErrMsg != null) {
|
||||
return new Status(Status.ErrCode.COMMON_ERROR, "Minio request error: " + minioErrMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG.error("errors while get file status ", e);
|
||||
return new Status(Status.ErrCode.COMMON_ERROR, "errors while get file status " + e.getMessage());
|
||||
}
|
||||
|
||||
@ -67,7 +67,7 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
|
||||
|
||||
private final S3URI s3uri;
|
||||
private final boolean forceVirtualHosted;
|
||||
private String virtualBucket;
|
||||
private String virtualBucket = "";
|
||||
private String virtualKey;
|
||||
|
||||
public S3TableValuedFunction(Map<String, String> params) throws AnalysisException {
|
||||
@ -77,8 +77,12 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
|
||||
final String endpoint = forceVirtualHosted
|
||||
? getEndpointAndSetVirtualBucket(params)
|
||||
: s3uri.getBucketScheme();
|
||||
if (!tvfParams.containsKey(S3Properties.REGION)) {
|
||||
String region = S3Properties.getRegionOfEndpoint(endpoint);
|
||||
tvfParams.put(S3Properties.REGION, region);
|
||||
}
|
||||
CloudCredentialWithEndpoint credential = new CloudCredentialWithEndpoint(endpoint,
|
||||
tvfParams.getOrDefault(S3Properties.REGION, S3Properties.getRegionOfEndpoint(endpoint)),
|
||||
tvfParams.get(S3Properties.REGION),
|
||||
tvfParams.get(S3Properties.ACCESS_KEY),
|
||||
tvfParams.get(S3Properties.SECRET_KEY));
|
||||
if (tvfParams.containsKey(S3Properties.SESSION_TOKEN)) {
|
||||
|
||||
Reference in New Issue
Block a user