[BugFix](MultiCatalog) Fix oss file location is not avaiable in iceberg hadoop catalog (#30761)

1, create iceberg hadoop catalog like below:
CREATE CATALOG iceberg_catalog PROPERTIES (
"warehouse" = "s3a://xxx/xxx",
"type" = "iceberg",
"s3.secret_key" = "*XXX",
"s3.region" = "region",
"s3.endpoint" = "http://xxx.jd.local",
"s3.bucket" = "xxx-test",
"s3.access_key" = "xxxxx",
"iceberg.catalog.type" = "hadoop",
"fs.s3a.impl" = "org.apache.hadoop.fs.s3a.S3AFileSystem",
"create_time" = "2024-02-02 11:15:28.570"
);

2, run select * from iceberg_catalog.table limit 1;

will get errCode = 2, detailMessage = Unknown file location nullnulls3a:/xxxx

expect:
OK

also need to bp to branch-2.0
This commit is contained in:
GoGoWen
2024-02-06 08:31:59 +08:00
committed by yiguolei
parent 92226c986a
commit 8e147f4c93
3 changed files with 127 additions and 87 deletions

View File

@ -39,6 +39,8 @@ import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.nio.file.InvalidPathException;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;
@ -49,7 +51,7 @@ public class LocationPath {
private final LocationType locationType;
private final String location;
enum LocationType {
public enum LocationType {
HDFS,
LOCAL, // Local File
BOS, // Baidu
@ -66,7 +68,8 @@ public class LocationPath {
S3A,
S3N,
VIEWFS,
UNKNOWN
UNKNOWN,
NOSCHEME // no scheme info
}
private LocationPath(String location) {
@ -75,107 +78,123 @@ public class LocationPath {
public LocationPath(String location, Map<String, String> props) {
String scheme = parseScheme(location).toLowerCase();
switch (scheme) {
case FeConstants.FS_PREFIX_HDFS:
locationType = LocationType.HDFS;
// Need add hdfs host to location
String host = props.get(HdfsResource.DSF_NAMESERVICES);
this.location = normalizedHdfsPath(location, host);
break;
case FeConstants.FS_PREFIX_S3:
locationType = LocationType.S3;
this.location = location;
break;
case FeConstants.FS_PREFIX_S3A:
locationType = LocationType.S3A;
this.location = convertToS3(location);
break;
case FeConstants.FS_PREFIX_S3N:
// include the check for multi locations and in a table, such as both s3 and hdfs are in a table.
locationType = LocationType.S3N;
this.location = convertToS3(location);
break;
case FeConstants.FS_PREFIX_BOS:
locationType = LocationType.BOS;
// use s3 client to access
this.location = convertToS3(location);
break;
case FeConstants.FS_PREFIX_GCS:
locationType = LocationType.GCS;
// use s3 client to access
this.location = convertToS3(location);
break;
case FeConstants.FS_PREFIX_OSS:
if (isHdfsOnOssEndpoint(location)) {
locationType = LocationType.OSS_HDFS;
if (scheme.isEmpty()) {
locationType = LocationType.NOSCHEME;
this.location = location;
} else {
switch (scheme) {
case FeConstants.FS_PREFIX_HDFS:
locationType = LocationType.HDFS;
// Need add hdfs host to location
String host = props.get(HdfsResource.DSF_NAMESERVICES);
this.location = normalizedHdfsPath(location, host);
break;
case FeConstants.FS_PREFIX_S3:
locationType = LocationType.S3;
this.location = location;
} else {
break;
case FeConstants.FS_PREFIX_S3A:
locationType = LocationType.S3A;
this.location = convertToS3(location);
break;
case FeConstants.FS_PREFIX_S3N:
// include the check for multi locations and in a table, such as both s3 and hdfs are in a table.
locationType = LocationType.S3N;
this.location = convertToS3(location);
break;
case FeConstants.FS_PREFIX_BOS:
locationType = LocationType.BOS;
// use s3 client to access
this.location = convertToS3(location);
break;
case FeConstants.FS_PREFIX_GCS:
locationType = LocationType.GCS;
// use s3 client to access
this.location = convertToS3(location);
break;
case FeConstants.FS_PREFIX_OSS:
if (isHdfsOnOssEndpoint(location)) {
locationType = LocationType.OSS_HDFS;
this.location = location;
} else {
if (useS3EndPoint(props)) {
this.location = convertToS3(location);
} else {
this.location = location;
}
locationType = LocationType.OSS;
}
break;
case FeConstants.FS_PREFIX_COS:
if (useS3EndPoint(props)) {
this.location = convertToS3(location);
} else {
this.location = location;
}
locationType = LocationType.OSS;
}
break;
case FeConstants.FS_PREFIX_COS:
if (useS3EndPoint(props)) {
this.location = convertToS3(location);
} else {
locationType = LocationType.COS;
break;
case FeConstants.FS_PREFIX_OBS:
if (useS3EndPoint(props)) {
this.location = convertToS3(location);
} else {
this.location = location;
}
locationType = LocationType.OBS;
break;
case FeConstants.FS_PREFIX_OFS:
locationType = LocationType.OFS;
this.location = location;
}
locationType = LocationType.COS;
break;
case FeConstants.FS_PREFIX_OBS:
if (useS3EndPoint(props)) {
this.location = convertToS3(location);
} else {
break;
case FeConstants.FS_PREFIX_JFS:
locationType = LocationType.JFS;
this.location = location;
}
locationType = LocationType.OBS;
break;
case FeConstants.FS_PREFIX_OFS:
locationType = LocationType.OFS;
this.location = location;
break;
case FeConstants.FS_PREFIX_JFS:
locationType = LocationType.JFS;
this.location = location;
break;
case FeConstants.FS_PREFIX_GFS:
locationType = LocationType.GFS;
this.location = location;
break;
case FeConstants.FS_PREFIX_COSN:
// if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues
locationType = LocationType.COSN;
this.location = location;
break;
case FeConstants.FS_PREFIX_VIEWFS:
locationType = LocationType.VIEWFS;
this.location = location;
break;
case FeConstants.FS_PREFIX_FILE:
locationType = LocationType.LOCAL;
this.location = location;
break;
default:
locationType = LocationType.UNKNOWN;
this.location = location;
break;
case FeConstants.FS_PREFIX_GFS:
locationType = LocationType.GFS;
this.location = location;
break;
case FeConstants.FS_PREFIX_COSN:
// if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues
locationType = LocationType.COSN;
this.location = location;
break;
case FeConstants.FS_PREFIX_VIEWFS:
locationType = LocationType.VIEWFS;
this.location = location;
break;
case FeConstants.FS_PREFIX_FILE:
locationType = LocationType.LOCAL;
this.location = location;
break;
default:
locationType = LocationType.UNKNOWN;
this.location = location;
}
}
}
private static String parseScheme(String location) {
String scheme = "";
String[] schemeSplit = location.split(SCHEME_DELIM);
if (schemeSplit.length > 1) {
return schemeSplit[0];
scheme = schemeSplit[0];
} else {
schemeSplit = location.split(NONSTANDARD_SCHEME_DELIM);
if (schemeSplit.length > 1) {
return schemeSplit[0];
scheme = schemeSplit[0];
}
throw new IllegalArgumentException("Fail to parse scheme, invalid location: " + location);
}
// if not get scheme, need consider /path/to/local to no scheme
if (scheme.isEmpty()) {
try {
Paths.get(location);
} catch (InvalidPathException exception) {
throw new IllegalArgumentException("Fail to parse scheme, invalid location: " + location);
}
}
return scheme;
}
private boolean useS3EndPoint(Map<String, String> props) {
@ -196,6 +215,7 @@ public class LocationPath {
/**
* The converted path is used for FE to get metadata
*
* @param location origin location
* @return metadata location path. just convert when storage is compatible with s3 client.
*/
@ -219,7 +239,7 @@ public class LocationPath {
// Need to encode these characters before creating URI.
// But doesn't encode '/' and ':' so that we can get the correct uri host.
location = URLEncoder.encode(location, StandardCharsets.UTF_8.name())
.replace("%2F", "/").replace("%3A", ":");
.replace("%2F", "/").replace("%3A", ":");
URI normalizedUri = new URI(location);
// compatible with 'hdfs:///' or 'hdfs:/'
if (StringUtils.isEmpty(normalizedUri.getHost())) {
@ -336,6 +356,7 @@ public class LocationPath {
/**
* The converted path is used for BE
*
* @return BE scan range path
*/
public Path toScanRangeLocation() {

View File

@ -352,11 +352,16 @@ public class IcebergScanNode extends FileQueryScanNode {
private String normalizeLocation(String location) {
Map<String, String> props = source.getCatalog().getProperties();
LocationPath locationPath = new LocationPath(location, props);
String icebergCatalogType = props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE);
if ("hadoop".equalsIgnoreCase(icebergCatalogType)) {
if (!location.startsWith(HdfsResource.HDFS_PREFIX)) {
// if no scheme info, fill will HADOOP_FS_NAME
// if no HADOOP_FS_NAME, then should be local file system
if (locationPath.getLocationType() == LocationPath.LocationType.NOSCHEME) {
String fsName = props.get(HdfsResource.HADOOP_FS_NAME);
location = fsName + location;
if (fsName != null) {
location = fsName + location;
}
}
}
return location;