[fix](multi-catalog)fix obj file cache and dlf iceberg catalog (#21238)

1. fix storage prefix for obj file cache: oss/cos/obs don't need convert to s3 prefix , just convert when create split
2. dlf iceberg catalog: support dlf iceberg table, use s3 file io.
This commit is contained in:
slothever
2023-07-02 21:08:41 +08:00
committed by GitHub
parent f74e635aa5
commit f5af735fa6
10 changed files with 174 additions and 84 deletions

View File

@ -18,30 +18,54 @@
package org.apache.doris.common.util;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.credentials.CloudCredential;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
import java.net.URI;
import java.time.Duration;
public class S3Util {
private static final Logger LOG = LogManager.getLogger(S3Util.class);
public static boolean isObjStorage(String location) {
return isS3CompatibleObjStorage(location) || location.startsWith(FeConstants.FS_PREFIX_OBS);
return isObjStorageUseS3Client(location)
|| location.startsWith(FeConstants.FS_PREFIX_COS)
|| location.startsWith(FeConstants.FS_PREFIX_OSS)
|| location.startsWith(FeConstants.FS_PREFIX_OBS);
}
private static boolean isS3CompatibleObjStorage(String location) {
private static boolean isObjStorageUseS3Client(String location) {
return location.startsWith(FeConstants.FS_PREFIX_S3)
|| location.startsWith(FeConstants.FS_PREFIX_S3A)
|| location.startsWith(FeConstants.FS_PREFIX_S3N)
|| location.startsWith(FeConstants.FS_PREFIX_GCS)
|| location.startsWith(FeConstants.FS_PREFIX_BOS)
|| location.startsWith(FeConstants.FS_PREFIX_COS)
|| location.startsWith(FeConstants.FS_PREFIX_OSS);
|| location.startsWith(FeConstants.FS_PREFIX_BOS);
}
public static String convertToS3IfNecessary(String location) {
/**
* The converted path is used for FE to get metadata
* @param location origin location
* @return metadata location path. just convert when storage is compatible with s3 client.
*/
public static String convertToS3IfNecessary(String location) {
LOG.debug("try convert location to s3 prefix: " + location);
if (isS3CompatibleObjStorage(location)) {
if (isObjStorageUseS3Client(location)) {
int pos = location.indexOf("://");
if (pos == -1) {
throw new RuntimeException("No '://' found in location: " + location);
@ -51,4 +75,63 @@ public class S3Util {
return location;
}
/**
* The converted path is used for BE
* @param location origin split path
* @return BE scan range path
*/
public static Path toScanRangeLocation(String location) {
// All storage will use s3 client on BE.
if (isObjStorage(location)) {
int pos = location.indexOf("://");
if (pos == -1) {
throw new RuntimeException("No '://' found in location: " + location);
}
location = "s3" + location.substring(pos);
}
return new Path(location);
}
public static S3Client buildS3Client(URI endpoint, String region, CloudCredential credential) {
StaticCredentialsProvider scp;
AwsCredentials awsCredential;
if (!credential.isTemporary()) {
awsCredential = AwsBasicCredentials.create(credential.getAccessKey(), credential.getSecretKey());
} else {
awsCredential = AwsSessionCredentials.create(credential.getAccessKey(), credential.getSecretKey(),
credential.getSessionToken());
}
scp = StaticCredentialsProvider.create(awsCredential);
EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy
.builder()
.baseDelay(Duration.ofSeconds(1))
.maxBackoffTime(Duration.ofMinutes(1))
.build();
// retry 3 time with Equal backoff
RetryPolicy retryPolicy = RetryPolicy
.builder()
.numRetries(3)
.backoffStrategy(backoffStrategy)
.build();
ClientOverrideConfiguration clientConf = ClientOverrideConfiguration
.builder()
// set retry policy
.retryPolicy(retryPolicy)
// using AwsS3V4Signer
.putAdvancedOption(SdkAdvancedClientOption.SIGNER, AwsS3V4Signer.create())
.build();
return S3Client.builder()
.httpClient(UrlConnectionHttpClient.create())
.endpointOverride(endpoint)
.credentialsProvider(scp)
.region(Region.of(region))
.overrideConfiguration(clientConf)
// disable chunkedEncoding because of bos not supported
// use virtual hosted-style access
.serviceConfiguration(S3Configuration.builder()
.chunkedEncodingEnabled(false)
.pathStyleAccessEnabled(false)
.build())
.build();
}
}

View File

@ -309,7 +309,14 @@ public class HiveMetaStoreCache {
// So we need to recursively list data location.
// https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, true);
locatedFiles.files().forEach(result::addFile);
for (RemoteFile remoteFile : locatedFiles.files()) {
Path srcPath = remoteFile.getPath();
Path convertedPath = S3Util.toScanRangeLocation(srcPath.toString());
if (!convertedPath.toString().equals(srcPath.toString())) {
remoteFile.setPath(convertedPath);
}
result.addFile(remoteFile);
}
} catch (Exception e) {
// User may manually remove partition under HDFS, in this case,
// Hive doesn't aware that the removed partition is missing.
@ -362,7 +369,8 @@ public class HiveMetaStoreCache {
for (int i = 0; i < splits.length; i++) {
org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]);
// todo: get modification time
result.addSplit(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(), -1, null, null));
Path splitFilePath = S3Util.toScanRangeLocation(fs.getPath().toString());
result.addSplit(new FileSplit(splitFilePath, fs.getStart(), fs.getLength(), -1, null, null));
}
}

View File

@ -24,13 +24,13 @@ import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import shade.doris.hive.org.apache.thrift.TException;
@ -57,7 +57,11 @@ public abstract class HiveCompatibleCatalog extends BaseMetastoreCatalog impleme
protected FileIO initializeFileIO(Map<String, String> properties, Configuration hadoopConf) {
String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
if (fileIOImpl == null) {
FileIO io = new S3FileIO();
/* when use the S3FileIO, we need some custom configurations,
* so HadoopFileIO is used in the superclass by default
* we can add better implementations to derived class just like the implementation in DLFCatalog.
*/
FileIO io = new HadoopFileIO(hadoopConf);
io.initialize(properties);
return io;
} else {

View File

@ -17,12 +17,21 @@
package org.apache.doris.datasource.iceberg.dlf;
import org.apache.doris.common.util.S3Util;
import org.apache.doris.datasource.credentials.CloudCredential;
import org.apache.doris.datasource.iceberg.HiveCompatibleCatalog;
import org.apache.doris.datasource.iceberg.dlf.client.DLFCachedClientPool;
import org.apache.doris.datasource.property.constants.OssProperties;
import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.aliyun.oss.Constants;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import java.net.URI;
import java.util.Map;
public class DLFCatalog extends HiveCompatibleCatalog {
@ -38,4 +47,26 @@ public class DLFCatalog extends HiveCompatibleCatalog {
String tableName = tableIdentifier.name();
return new DLFTableOperations(this.conf, this.clients, this.fileIO, this.uid, dbName, tableName);
}
protected FileIO initializeFileIO(Map<String, String> properties, Configuration hadoopConf) {
// read from converted properties or default by old s3 aws properties
String endpoint = properties.getOrDefault(Constants.ENDPOINT_KEY, properties.get(S3Properties.Env.ENDPOINT));
CloudCredential credential = new CloudCredential();
credential.setAccessKey(properties.getOrDefault(OssProperties.ACCESS_KEY,
properties.get(S3Properties.Env.ACCESS_KEY)));
credential.setSecretKey(properties.getOrDefault(OssProperties.SECRET_KEY,
properties.get(S3Properties.Env.SECRET_KEY)));
if (properties.containsKey(OssProperties.SESSION_TOKEN)
|| properties.containsKey(S3Properties.Env.TOKEN)) {
credential.setSessionToken(properties.getOrDefault(OssProperties.SESSION_TOKEN,
properties.get(S3Properties.Env.TOKEN)));
}
String region = properties.getOrDefault(OssProperties.REGION, properties.get(S3Properties.Env.REGION));
// s3 file io just supports s3-like endpoint
String s3Endpoint = endpoint.replace(region, "s3." + region);
URI endpointUri = URI.create(s3Endpoint);
FileIO io = new S3FileIO(() -> S3Util.buildS3Client(endpointUri, region, credential));
io.initialize(properties);
return io;
}
}

View File

@ -314,29 +314,28 @@ public class PropertyConverter {
if (Strings.isNullOrEmpty(uid)) {
throw new IllegalArgumentException("Required dlf property: " + DataLakeConfig.CATALOG_USER_ID);
}
// access OSS by AWS client, so set s3 parameters
getAWSPropertiesFromDLFConf(props, hiveConf);
getOSSPropertiesFromDLFConf(props, hiveConf);
}
private static void getAWSPropertiesFromDLFConf(Map<String, String> props, HiveConf hiveConf) {
private static void getOSSPropertiesFromDLFConf(Map<String, String> props, HiveConf hiveConf) {
// get following properties from hive-site.xml
// 1. region and endpoint. eg: cn-beijing
String region = hiveConf.get(DataLakeConfig.CATALOG_REGION_ID);
if (!Strings.isNullOrEmpty(region)) {
// See: https://help.aliyun.com/document_detail/31837.html
// And add "-internal" to access oss within vpc
props.put(S3Properties.REGION, "oss-" + region);
props.put(OssProperties.REGION, "oss-" + region);
String publicAccess = hiveConf.get("dlf.catalog.accessPublic", "false");
props.put(S3Properties.ENDPOINT, getOssEndpoint(region, Boolean.parseBoolean(publicAccess)));
props.put(OssProperties.ENDPOINT, getOssEndpoint(region, Boolean.parseBoolean(publicAccess)));
}
// 2. ak and sk
String ak = hiveConf.get(DataLakeConfig.CATALOG_ACCESS_KEY_ID);
String sk = hiveConf.get(DataLakeConfig.CATALOG_ACCESS_KEY_SECRET);
if (!Strings.isNullOrEmpty(ak)) {
props.put(S3Properties.ACCESS_KEY, ak);
props.put(OssProperties.ACCESS_KEY, ak);
}
if (!Strings.isNullOrEmpty(sk)) {
props.put(S3Properties.SECRET_KEY, sk);
props.put(OssProperties.SECRET_KEY, sk);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Get properties for oss in hive-site.xml: {}", props);
@ -369,19 +368,19 @@ public class PropertyConverter {
if (Strings.isNullOrEmpty(uid)) {
throw new IllegalArgumentException("Required dlf property: " + DataLakeConfig.CATALOG_USER_ID);
}
// convert to s3 client property
// convert to oss property
if (credential.isWhole()) {
props.put(S3Properties.ACCESS_KEY, credential.getAccessKey());
props.put(S3Properties.SECRET_KEY, credential.getSecretKey());
props.put(OssProperties.ACCESS_KEY, credential.getAccessKey());
props.put(OssProperties.SECRET_KEY, credential.getSecretKey());
}
if (credential.isTemporary()) {
props.put(S3Properties.SESSION_TOKEN, credential.getSessionToken());
props.put(OssProperties.SESSION_TOKEN, credential.getSessionToken());
}
String publicAccess = props.getOrDefault(DLFProperties.Site.ACCESS_PUBLIC, "false");
String region = props.getOrDefault(DataLakeConfig.CATALOG_REGION_ID, props.get(DLFProperties.REGION));
if (!Strings.isNullOrEmpty(region)) {
props.put(S3Properties.REGION, "oss-" + region);
props.put(S3Properties.ENDPOINT, getOssEndpoint(region, Boolean.parseBoolean(publicAccess)));
props.put(OssProperties.REGION, "oss-" + region);
props.put(OssProperties.ENDPOINT, getOssEndpoint(region, Boolean.parseBoolean(publicAccess)));
}
}

View File

@ -126,6 +126,10 @@ public class S3Properties extends BaseProperties {
if (endpointSplit.length < 2) {
return null;
}
if (endpointSplit[0].startsWith("oss-")) {
// compatible with the endpoint: oss-cn-bejing.aliyuncs.com
return endpointSplit[0];
}
return endpointSplit[1];
}

View File

@ -56,7 +56,10 @@ public class FileSystemFactory {
// TODO: need optimize the method. the conf is converted many times.
Map<String, String> properties = new HashMap<>();
conf.iterator().forEachRemaining(e -> properties.put(e.getKey(), e.getValue()));
if (location.startsWith(FeConstants.FS_PREFIX_S3) || location.startsWith(FeConstants.FS_PREFIX_OBS)) {
if (location.startsWith(FeConstants.FS_PREFIX_S3)
|| location.startsWith(FeConstants.FS_PREFIX_OSS)
|| location.startsWith(FeConstants.FS_PREFIX_COS)
|| location.startsWith(FeConstants.FS_PREFIX_OBS)) {
return new S3FileSystem(properties);
} else if (location.startsWith(FeConstants.FS_PREFIX_HDFS) || location.startsWith(FeConstants.FS_PREFIX_GFS)) {
return new DFSFileSystem(properties);

View File

@ -21,6 +21,8 @@ import org.apache.doris.backup.Status;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3URI;
import org.apache.doris.common.util.S3Util;
import org.apache.doris.datasource.credentials.CloudCredential;
import org.apache.doris.datasource.property.PropertyConverter;
import org.apache.doris.datasource.property.constants.S3Properties;
@ -31,19 +33,8 @@ import org.apache.http.client.utils.URIBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.auth.signer.AwsS3V4Signer;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.core.retry.backoff.EqualJitterBackoffStrategy;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
@ -61,7 +52,6 @@ import software.amazon.awssdk.services.s3.model.S3Object;
import java.io.File;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -132,52 +122,15 @@ public class S3ObjStorage implements ObjStorage<S3Client> {
public S3Client getClient(String bucket) throws UserException {
if (client == null) {
URI tmpEndpoint = URI.create(properties.get(S3Properties.ENDPOINT));
StaticCredentialsProvider scp;
if (!properties.containsKey(S3Properties.SESSION_TOKEN)) {
AwsBasicCredentials awsBasic = AwsBasicCredentials.create(
properties.get(S3Properties.ACCESS_KEY),
properties.get(S3Properties.SECRET_KEY));
scp = StaticCredentialsProvider.create(awsBasic);
} else {
AwsSessionCredentials awsSession = AwsSessionCredentials.create(
properties.get(S3Properties.ACCESS_KEY),
properties.get(S3Properties.SECRET_KEY),
properties.get(S3Properties.SESSION_TOKEN));
scp = StaticCredentialsProvider.create(awsSession);
}
EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy
.builder()
.baseDelay(Duration.ofSeconds(1))
.maxBackoffTime(Duration.ofMinutes(1))
.build();
// retry 3 time with Equal backoff
RetryPolicy retryPolicy = RetryPolicy
.builder()
.numRetries(3)
.backoffStrategy(backoffStrategy)
.build();
ClientOverrideConfiguration clientConf = ClientOverrideConfiguration
.builder()
// set retry policy
.retryPolicy(retryPolicy)
// using AwsS3V4Signer
.putAdvancedOption(SdkAdvancedClientOption.SIGNER, AwsS3V4Signer.create())
.build();
URI endpoint = StringUtils.isEmpty(bucket) ? tmpEndpoint :
URI.create(new URIBuilder(tmpEndpoint).setHost(bucket + "." + tmpEndpoint.getHost()).toString());
client = S3Client.builder()
.httpClient(UrlConnectionHttpClient.create())
.endpointOverride(endpoint)
.credentialsProvider(scp)
.region(Region.of(properties.get(S3Properties.REGION)))
.overrideConfiguration(clientConf)
// disable chunkedEncoding because of bos not supported
// use virtual hosted-style access
.serviceConfiguration(S3Configuration.builder()
.chunkedEncodingEnabled(false)
.pathStyleAccessEnabled(false)
.build())
.build();
CloudCredential credential = new CloudCredential();
credential.setAccessKey(properties.get(S3Properties.ACCESS_KEY));
credential.setSecretKey(properties.get(S3Properties.SECRET_KEY));
if (properties.containsKey(S3Properties.SESSION_TOKEN)) {
credential.setSessionToken(properties.get(S3Properties.SESSION_TOKEN));
}
client = S3Util.buildS3Client(endpoint, properties.get(S3Properties.REGION), credential);
}
return client;
}

View File

@ -75,6 +75,10 @@ public class RemoteFile {
return path;
}
public void setPath(Path path) {
this.path = path;
}
public boolean isFile() {
return isFile;
}

View File

@ -120,7 +120,8 @@ public class IcebergScanNode extends FileQueryScanNode {
} else {
for (IcebergDeleteFileFilter filter : icebergSplit.getDeleteFileFilters()) {
TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc();
deleteFileDesc.setPath(filter.getDeleteFilePath());
String deleteFilePath = filter.getDeleteFilePath();
deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath).toString());
if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
fileDesc.setContent(FileContent.POSITION_DELETES.id());
IcebergDeleteFileFilter.PositionDelete positionDelete =
@ -182,8 +183,8 @@ public class IcebergScanNode extends FileQueryScanNode {
long fileSize = task.file().fileSizeInBytes();
for (FileScanTask splitTask : task.split(splitSize)) {
String dataFilePath = splitTask.file().path().toString();
String finalDataFilePath = S3Util.convertToS3IfNecessary(dataFilePath);
IcebergSplit split = new IcebergSplit(new Path(finalDataFilePath), splitTask.start(),
Path finalDataFilePath = S3Util.toScanRangeLocation(dataFilePath);
IcebergSplit split = new IcebergSplit(finalDataFilePath, splitTask.start(),
splitTask.length(), fileSize, new String[0]);
split.setFormatVersion(formatVersion);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {