[fix](multi-catalog)fix compatible with hdfs HA empty prefix (#22424)
This commit is contained in:
@ -53,7 +53,7 @@ public class HdfsResource extends Resource {
|
||||
public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
|
||||
public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
|
||||
public static String DSF_NAMESERVICES = "dfs.nameservices";
|
||||
public static final String HDFS_PREFIX = "hdfs://";
|
||||
public static final String HDFS_PREFIX = "hdfs:";
|
||||
|
||||
@SerializedName(value = "properties")
|
||||
private Map<String, String> properties;
|
||||
|
||||
@ -81,20 +81,8 @@ public class S3Util {
|
||||
|
||||
private static String normalizedLocation(String location, Map<String, String> props) {
|
||||
try {
|
||||
URI normalizedUri = new URI(location);
|
||||
if (StringUtils.isEmpty(normalizedUri.getHost()) && location.startsWith(HdfsResource.HDFS_PREFIX)) {
|
||||
// Need add hdfs host to location
|
||||
String host = props.get(HdfsResource.DSF_NAMESERVICES);
|
||||
if (StringUtils.isNotEmpty(host)) {
|
||||
// Replace 'hdfs://' to 'hdfs://name_service', for example: hdfs:///abc to hdfs://name_service/abc
|
||||
return location.replace(HdfsResource.HDFS_PREFIX, HdfsResource.HDFS_PREFIX + host);
|
||||
} else {
|
||||
// If no hadoop HA config
|
||||
if (location.startsWith(HdfsResource.HDFS_PREFIX + '/')) {
|
||||
// Do not support hdfs:///location
|
||||
throw new RuntimeException("Invalid location with empty host: " + location);
|
||||
}
|
||||
}
|
||||
if (location.startsWith(HdfsResource.HDFS_PREFIX)) {
|
||||
return normalizedHdfsPath(location, props);
|
||||
}
|
||||
return location;
|
||||
} catch (URISyntaxException e) {
|
||||
@ -102,6 +90,35 @@ public class S3Util {
|
||||
}
|
||||
}
|
||||
|
||||
private static String normalizedHdfsPath(String location, Map<String, String> props) throws URISyntaxException {
|
||||
URI normalizedUri = new URI(location);
|
||||
// compatible with 'hdfs:///' or 'hdfs:/'
|
||||
if (StringUtils.isEmpty(normalizedUri.getHost())) {
|
||||
String normalizedPrefix = HdfsResource.HDFS_PREFIX + "//";
|
||||
String brokenPrefix = HdfsResource.HDFS_PREFIX + "/";
|
||||
if (location.startsWith(brokenPrefix) && !location.startsWith(normalizedPrefix)) {
|
||||
location = location.replace(brokenPrefix, normalizedPrefix);
|
||||
}
|
||||
// Need add hdfs host to location
|
||||
String host = props.get(HdfsResource.DSF_NAMESERVICES);
|
||||
if (StringUtils.isNotEmpty(host)) {
|
||||
// Replace 'hdfs://key/' to 'hdfs://name_service/key/'
|
||||
// Or hdfs:///abc to hdfs://name_service/abc
|
||||
return location.replace(normalizedPrefix, normalizedPrefix + host + "/");
|
||||
} else {
|
||||
// 'hdfs://null/' equals the 'hdfs:///'
|
||||
if (location.startsWith(HdfsResource.HDFS_PREFIX + "///")) {
|
||||
// Do not support hdfs:///location
|
||||
throw new RuntimeException("Invalid location with empty host: " + location);
|
||||
} else {
|
||||
// Replace 'hdfs://key/' to '/key/', try access local NameNode on BE.
|
||||
return location.replace(normalizedPrefix, "/");
|
||||
}
|
||||
}
|
||||
}
|
||||
return location;
|
||||
}
|
||||
|
||||
/**
|
||||
* The converted path is used for BE
|
||||
* @param location origin split path
|
||||
|
||||
Reference in New Issue
Block a user