[Improvement](multi-catalog)make location easier to modified, decoupling all storage with single location class (#27874)
decoupling all storage with single location class
This commit is contained in:
@ -64,21 +64,21 @@ public class FeConstants {
|
||||
// use \N to indicate NULL
|
||||
public static String null_string = "\\N";
|
||||
|
||||
public static String FS_PREFIX_S3 = "s3";
|
||||
public static String FS_PREFIX_S3A = "s3a";
|
||||
public static String FS_PREFIX_S3N = "s3n";
|
||||
public static String FS_PREFIX_OSS = "oss";
|
||||
public static String FS_PREFIX_GCS = "gs";
|
||||
public static String FS_PREFIX_BOS = "bos";
|
||||
public static String FS_PREFIX_COS = "cos";
|
||||
public static String FS_PREFIX_COSN = "cosn";
|
||||
public static String FS_PREFIX_OBS = "obs";
|
||||
public static String FS_PREFIX_OFS = "ofs";
|
||||
public static String FS_PREFIX_GFS = "gfs";
|
||||
public static String FS_PREFIX_JFS = "jfs";
|
||||
public static String FS_PREFIX_HDFS = "hdfs";
|
||||
public static String FS_PREFIX_VIEWFS = "viewfs";
|
||||
public static String FS_PREFIX_FILE = "file";
|
||||
public static final String FS_PREFIX_S3 = "s3";
|
||||
public static final String FS_PREFIX_S3A = "s3a";
|
||||
public static final String FS_PREFIX_S3N = "s3n";
|
||||
public static final String FS_PREFIX_OSS = "oss";
|
||||
public static final String FS_PREFIX_GCS = "gs";
|
||||
public static final String FS_PREFIX_BOS = "bos";
|
||||
public static final String FS_PREFIX_COS = "cos";
|
||||
public static final String FS_PREFIX_COSN = "cosn";
|
||||
public static final String FS_PREFIX_OBS = "obs";
|
||||
public static final String FS_PREFIX_OFS = "ofs";
|
||||
public static final String FS_PREFIX_GFS = "gfs";
|
||||
public static final String FS_PREFIX_JFS = "jfs";
|
||||
public static final String FS_PREFIX_HDFS = "hdfs";
|
||||
public static final String FS_PREFIX_VIEWFS = "viewfs";
|
||||
public static final String FS_PREFIX_FILE = "file";
|
||||
|
||||
public static final String INTERNAL_DB_NAME = "__internal_schema";
|
||||
public static String TEMP_MATERIZLIZE_DVIEW_PREFIX = "internal_tmp_materialized_view_";
|
||||
|
||||
@ -0,0 +1,380 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.common.util;
|
||||
|
||||
import org.apache.doris.catalog.HdfsResource;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.datasource.property.constants.CosProperties;
|
||||
import org.apache.doris.datasource.property.constants.ObsProperties;
|
||||
import org.apache.doris.datasource.property.constants.OssProperties;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
import org.apache.doris.fs.FileSystemType;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URLDecoder;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class LocationPath {
|
||||
private static final Logger LOG = LogManager.getLogger(LocationPath.class);
|
||||
private static final String SCHEME_DELIM = "://";
|
||||
private static final String NONSTANDARD_SCHEME_DELIM = ":/";
|
||||
private final LocationType locationType;
|
||||
private final String location;
|
||||
|
||||
enum LocationType {
|
||||
HDFS,
|
||||
LOCAL, // Local File
|
||||
BOS, // Baidu
|
||||
GCS, // Google,
|
||||
OBS, // Huawei,
|
||||
COS, // Tencent
|
||||
COSN, // Tencent
|
||||
OFS, // Tencent CHDFS
|
||||
GFS, // Tencent GooseFs,
|
||||
OSS, // Alibaba,
|
||||
OSS_HDFS, // JindoFS on OSS
|
||||
JFS, // JuiceFS,
|
||||
S3,
|
||||
S3A,
|
||||
S3N,
|
||||
VIEWFS,
|
||||
UNKNOWN
|
||||
}
|
||||
|
||||
private LocationPath(String location) {
|
||||
this(location, new HashMap<>());
|
||||
}
|
||||
|
||||
public LocationPath(String location, Map<String, String> props) {
|
||||
String scheme = parseScheme(location).toLowerCase();
|
||||
switch (scheme) {
|
||||
case FeConstants.FS_PREFIX_HDFS:
|
||||
locationType = LocationType.HDFS;
|
||||
// Need add hdfs host to location
|
||||
String host = props.get(HdfsResource.DSF_NAMESERVICES);
|
||||
this.location = normalizedHdfsPath(location, host);
|
||||
break;
|
||||
case FeConstants.FS_PREFIX_S3:
|
||||
locationType = LocationType.S3;
|
||||
this.location = location;
|
||||
break;
|
||||
case FeConstants.FS_PREFIX_S3A:
|
||||
locationType = LocationType.S3A;
|
||||
this.location = convertToS3(location);
|
||||
break;
|
||||
case FeConstants.FS_PREFIX_S3N:
|
||||
// include the check for multi locations and in a table, such as both s3 and hdfs are in a table.
|
||||
locationType = LocationType.S3N;
|
||||
this.location = convertToS3(location);
|
||||
break;
|
||||
case FeConstants.FS_PREFIX_BOS:
|
||||
locationType = LocationType.BOS;
|
||||
// use s3 client to access
|
||||
this.location = convertToS3(location);
|
||||
break;
|
||||
case FeConstants.FS_PREFIX_GCS:
|
||||
locationType = LocationType.GCS;
|
||||
// use s3 client to access
|
||||
this.location = convertToS3(location);
|
||||
break;
|
||||
case FeConstants.FS_PREFIX_OSS:
|
||||
if (isHdfsOnOssEndpoint(location)) {
|
||||
locationType = LocationType.OSS_HDFS;
|
||||
this.location = location;
|
||||
} else {
|
||||
if (useS3EndPoint(props)) {
|
||||
this.location = convertToS3(location);
|
||||
} else {
|
||||
this.location = location;
|
||||
}
|
||||
locationType = LocationType.OSS;
|
||||
}
|
||||
break;
|
||||
case FeConstants.FS_PREFIX_COS:
|
||||
if (useS3EndPoint(props)) {
|
||||
this.location = convertToS3(location);
|
||||
} else {
|
||||
this.location = location;
|
||||
}
|
||||
locationType = LocationType.COS;
|
||||
break;
|
||||
case FeConstants.FS_PREFIX_OBS:
|
||||
if (useS3EndPoint(props)) {
|
||||
this.location = convertToS3(location);
|
||||
} else {
|
||||
this.location = location;
|
||||
}
|
||||
locationType = LocationType.OBS;
|
||||
break;
|
||||
case FeConstants.FS_PREFIX_OFS:
|
||||
locationType = LocationType.OFS;
|
||||
this.location = location;
|
||||
break;
|
||||
case FeConstants.FS_PREFIX_JFS:
|
||||
locationType = LocationType.JFS;
|
||||
this.location = location;
|
||||
break;
|
||||
case FeConstants.FS_PREFIX_GFS:
|
||||
locationType = LocationType.GFS;
|
||||
this.location = location;
|
||||
break;
|
||||
case FeConstants.FS_PREFIX_COSN:
|
||||
// if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues
|
||||
locationType = LocationType.COSN;
|
||||
this.location = location;
|
||||
break;
|
||||
case FeConstants.FS_PREFIX_VIEWFS:
|
||||
locationType = LocationType.VIEWFS;
|
||||
this.location = location;
|
||||
break;
|
||||
case FeConstants.FS_PREFIX_FILE:
|
||||
locationType = LocationType.LOCAL;
|
||||
this.location = location;
|
||||
break;
|
||||
default:
|
||||
locationType = LocationType.UNKNOWN;
|
||||
this.location = location;
|
||||
}
|
||||
}
|
||||
|
||||
private static String parseScheme(String location) {
|
||||
String[] schemeSplit = location.split(SCHEME_DELIM);
|
||||
if (schemeSplit.length > 1) {
|
||||
return schemeSplit[0];
|
||||
} else {
|
||||
schemeSplit = location.split(NONSTANDARD_SCHEME_DELIM);
|
||||
if (schemeSplit.length > 1) {
|
||||
return schemeSplit[0];
|
||||
}
|
||||
throw new IllegalArgumentException("Fail to parse scheme, invalid location: " + location);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean useS3EndPoint(Map<String, String> props) {
|
||||
if (props.containsKey(ObsProperties.ENDPOINT)
|
||||
|| props.containsKey(OssProperties.ENDPOINT)
|
||||
|| props.containsKey(CosProperties.ENDPOINT)) {
|
||||
return false;
|
||||
}
|
||||
// wide check range for the compatibility of s3 properties
|
||||
return (props.containsKey(S3Properties.ENDPOINT) || props.containsKey(S3Properties.Env.ENDPOINT));
|
||||
}
|
||||
|
||||
public static boolean isHdfsOnOssEndpoint(String location) {
|
||||
// example: cn-shanghai.oss-dls.aliyuncs.com contains the "oss-dls.aliyuncs".
|
||||
// https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
|
||||
return location.contains("oss-dls.aliyuncs");
|
||||
}
|
||||
|
||||
/**
|
||||
* The converted path is used for FE to get metadata
|
||||
* @param location origin location
|
||||
* @return metadata location path. just convert when storage is compatible with s3 client.
|
||||
*/
|
||||
private static String convertToS3(String location) {
|
||||
LOG.debug("try convert location to s3 prefix: " + location);
|
||||
int pos = findDomainPos(location);
|
||||
return "s3" + location.substring(pos);
|
||||
}
|
||||
|
||||
private static int findDomainPos(String rangeLocation) {
|
||||
int pos = rangeLocation.indexOf("://");
|
||||
if (pos == -1) {
|
||||
throw new RuntimeException("No '://' found in location: " + rangeLocation);
|
||||
}
|
||||
return pos;
|
||||
}
|
||||
|
||||
private static String normalizedHdfsPath(String location, String host) {
|
||||
try {
|
||||
// Hive partition may contain special characters such as ' ', '<', '>' and so on.
|
||||
// Need to encode these characters before creating URI.
|
||||
// But doesn't encode '/' and ':' so that we can get the correct uri host.
|
||||
location = URLEncoder.encode(location, StandardCharsets.UTF_8.name())
|
||||
.replace("%2F", "/").replace("%3A", ":");
|
||||
URI normalizedUri = new URI(location);
|
||||
// compatible with 'hdfs:///' or 'hdfs:/'
|
||||
if (StringUtils.isEmpty(normalizedUri.getHost())) {
|
||||
location = URLDecoder.decode(location, StandardCharsets.UTF_8.name());
|
||||
String normalizedPrefix = HdfsResource.HDFS_PREFIX + "//";
|
||||
String brokenPrefix = HdfsResource.HDFS_PREFIX + "/";
|
||||
if (location.startsWith(brokenPrefix) && !location.startsWith(normalizedPrefix)) {
|
||||
location = location.replace(brokenPrefix, normalizedPrefix);
|
||||
}
|
||||
if (StringUtils.isNotEmpty(host)) {
|
||||
// Replace 'hdfs://key/' to 'hdfs://name_service/key/'
|
||||
// Or hdfs:///abc to hdfs://name_service/abc
|
||||
return location.replace(normalizedPrefix, normalizedPrefix + host + "/");
|
||||
} else {
|
||||
// 'hdfs://null/' equals the 'hdfs:///'
|
||||
if (location.startsWith(HdfsResource.HDFS_PREFIX + "///")) {
|
||||
// Do not support hdfs:///location
|
||||
throw new RuntimeException("Invalid location with empty host: " + location);
|
||||
} else {
|
||||
// Replace 'hdfs://key/' to '/key/', try access local NameNode on BE.
|
||||
return location.replace(normalizedPrefix, "/");
|
||||
}
|
||||
}
|
||||
}
|
||||
return URLDecoder.decode(location, StandardCharsets.UTF_8.name());
|
||||
} catch (URISyntaxException | UnsupportedEncodingException e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
public static Pair<FileSystemType, String> getFSIdentity(String location, String bindBrokerName) {
|
||||
LocationPath locationPath = new LocationPath(location);
|
||||
FileSystemType fsType = (bindBrokerName != null) ? FileSystemType.BROKER : locationPath.getFileSystemType();
|
||||
URI uri = locationPath.getPath().toUri();
|
||||
String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" + Strings.nullToEmpty(uri.getAuthority());
|
||||
return Pair.of(fsType, fsIdent);
|
||||
}
|
||||
|
||||
private FileSystemType getFileSystemType() {
|
||||
FileSystemType fsType;
|
||||
switch (locationType) {
|
||||
case S3:
|
||||
case S3A:
|
||||
case S3N:
|
||||
case COS:
|
||||
case OSS:
|
||||
case OBS:
|
||||
case BOS:
|
||||
case GCS:
|
||||
// All storage will use s3 client to access on BE, so need convert to s3
|
||||
fsType = FileSystemType.S3;
|
||||
break;
|
||||
case COSN:
|
||||
case OFS:
|
||||
// ofs:// and cosn:// use the same underlying file system: Tencent Cloud HDFS, aka CHDFS)) {
|
||||
fsType = FileSystemType.OFS;
|
||||
break;
|
||||
case HDFS:
|
||||
case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib to access oss.
|
||||
case VIEWFS:
|
||||
case GFS:
|
||||
fsType = FileSystemType.DFS;
|
||||
break;
|
||||
case JFS:
|
||||
fsType = FileSystemType.JFS;
|
||||
break;
|
||||
case LOCAL:
|
||||
fsType = FileSystemType.FILE;
|
||||
break;
|
||||
default:
|
||||
throw new UnsupportedOperationException("Unknown file system for location: " + location);
|
||||
}
|
||||
return fsType;
|
||||
}
|
||||
|
||||
/**
|
||||
* provide file type for BE.
|
||||
* @param location the location is from fs.listFile
|
||||
* @return on BE, we will use TFileType to get the suitable client to access storage.
|
||||
*/
|
||||
public static TFileType getTFileType(String location) {
|
||||
if (location == null || location.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
LocationPath locationPath = new LocationPath(location);
|
||||
switch (locationPath.getLocationType()) {
|
||||
case S3:
|
||||
case S3A:
|
||||
case S3N:
|
||||
case COS:
|
||||
case OSS:
|
||||
case OBS:
|
||||
case BOS:
|
||||
case GCS:
|
||||
// now we only support S3 client for object storage on BE
|
||||
return TFileType.FILE_S3;
|
||||
case HDFS:
|
||||
case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib to access oss.
|
||||
case VIEWFS:
|
||||
case COSN:
|
||||
return TFileType.FILE_HDFS;
|
||||
case GFS:
|
||||
case JFS:
|
||||
case OFS:
|
||||
return TFileType.FILE_BROKER;
|
||||
case LOCAL:
|
||||
return TFileType.FILE_LOCAL;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The converted path is used for BE
|
||||
* @return BE scan range path
|
||||
*/
|
||||
public Path toScanRangeLocation() {
|
||||
switch (locationType) {
|
||||
case S3:
|
||||
case S3A:
|
||||
case S3N:
|
||||
case COS:
|
||||
case OSS:
|
||||
case OBS:
|
||||
case BOS:
|
||||
case GCS:
|
||||
// All storage will use s3 client to access on BE, so need convert to s3
|
||||
return new Path(convertToS3(location));
|
||||
case HDFS:
|
||||
case OSS_HDFS:
|
||||
case VIEWFS:
|
||||
case COSN:
|
||||
case GFS:
|
||||
case JFS:
|
||||
case OFS:
|
||||
case LOCAL:
|
||||
default:
|
||||
return getPath();
|
||||
}
|
||||
}
|
||||
|
||||
public LocationType getLocationType() {
|
||||
return locationType;
|
||||
}
|
||||
|
||||
public String get() {
|
||||
return location;
|
||||
}
|
||||
|
||||
public Path getPath() {
|
||||
return new Path(location);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return get();
|
||||
}
|
||||
}
|
||||
@ -17,18 +17,8 @@
|
||||
|
||||
package org.apache.doris.common.util;
|
||||
|
||||
import org.apache.doris.catalog.HdfsResource;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.datasource.credentials.CloudCredential;
|
||||
import org.apache.doris.datasource.property.constants.CosProperties;
|
||||
import org.apache.doris.datasource.property.constants.ObsProperties;
|
||||
import org.apache.doris.datasource.property.constants.OssProperties;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
|
||||
import software.amazon.awssdk.auth.credentials.AwsCredentials;
|
||||
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
|
||||
@ -43,138 +33,10 @@ import software.amazon.awssdk.regions.Region;
|
||||
import software.amazon.awssdk.services.s3.S3Client;
|
||||
import software.amazon.awssdk.services.s3.S3Configuration;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.URLDecoder;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
|
||||
public class S3Util {
|
||||
private static final Logger LOG = LogManager.getLogger(S3Util.class);
|
||||
|
||||
public static boolean isObjStorage(String location) {
|
||||
return isObjStorageUseS3Client(location)
|
||||
// if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues
|
||||
|| (location.startsWith(FeConstants.FS_PREFIX_COS) && !location.startsWith(FeConstants.FS_PREFIX_COSN))
|
||||
|| location.startsWith(FeConstants.FS_PREFIX_OSS)
|
||||
|| location.startsWith(FeConstants.FS_PREFIX_OBS);
|
||||
}
|
||||
|
||||
private static boolean isObjStorageUseS3Client(String location) {
|
||||
return location.startsWith(FeConstants.FS_PREFIX_S3)
|
||||
|| location.startsWith(FeConstants.FS_PREFIX_S3A)
|
||||
|| location.startsWith(FeConstants.FS_PREFIX_S3N)
|
||||
|| location.startsWith(FeConstants.FS_PREFIX_GCS)
|
||||
|| location.startsWith(FeConstants.FS_PREFIX_BOS);
|
||||
}
|
||||
|
||||
private static boolean isS3EndPoint(String location, Map<String, String> props) {
|
||||
if (props.containsKey(ObsProperties.ENDPOINT)
|
||||
|| props.containsKey(OssProperties.ENDPOINT)
|
||||
|| props.containsKey(CosProperties.ENDPOINT)) {
|
||||
return false;
|
||||
}
|
||||
// wide check range for the compatibility of s3 properties
|
||||
return (props.containsKey(S3Properties.ENDPOINT) || props.containsKey(S3Properties.Env.ENDPOINT))
|
||||
&& isObjStorage(location);
|
||||
}
|
||||
|
||||
/**
|
||||
* The converted path is used for FE to get metadata
|
||||
* @param location origin location
|
||||
* @return metadata location path. just convert when storage is compatible with s3 client.
|
||||
*/
|
||||
public static String convertToS3IfNecessary(String location, Map<String, String> props) {
|
||||
LOG.debug("try convert location to s3 prefix: " + location);
|
||||
// include the check for multi locations and in a table, such as both s3 and hdfs are in a table.
|
||||
if (isS3EndPoint(location, props) || isObjStorageUseS3Client(location)) {
|
||||
int pos = location.indexOf("://");
|
||||
if (pos == -1) {
|
||||
throw new RuntimeException("No '://' found in location: " + location);
|
||||
}
|
||||
return "s3" + location.substring(pos);
|
||||
}
|
||||
return normalizedLocation(location, props);
|
||||
}
|
||||
|
||||
private static String normalizedLocation(String location, Map<String, String> props) {
|
||||
try {
|
||||
if (location.startsWith(HdfsResource.HDFS_PREFIX)) {
|
||||
return normalizedHdfsPath(location, props);
|
||||
}
|
||||
return location;
|
||||
} catch (URISyntaxException | UnsupportedEncodingException e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private static String normalizedHdfsPath(String location, Map<String, String> props)
|
||||
throws URISyntaxException, UnsupportedEncodingException {
|
||||
// Hive partition may contain special characters such as ' ', '<', '>' and so on.
|
||||
// Need to encode these characters before creating URI.
|
||||
// But doesn't encode '/' and ':' so that we can get the correct uri host.
|
||||
location = URLEncoder.encode(location, StandardCharsets.UTF_8.name()).replace("%2F", "/").replace("%3A", ":");
|
||||
URI normalizedUri = new URI(location);
|
||||
// compatible with 'hdfs:///' or 'hdfs:/'
|
||||
if (StringUtils.isEmpty(normalizedUri.getHost())) {
|
||||
location = URLDecoder.decode(location, StandardCharsets.UTF_8.name());
|
||||
String normalizedPrefix = HdfsResource.HDFS_PREFIX + "//";
|
||||
String brokenPrefix = HdfsResource.HDFS_PREFIX + "/";
|
||||
if (location.startsWith(brokenPrefix) && !location.startsWith(normalizedPrefix)) {
|
||||
location = location.replace(brokenPrefix, normalizedPrefix);
|
||||
}
|
||||
// Need add hdfs host to location
|
||||
String host = props.get(HdfsResource.DSF_NAMESERVICES);
|
||||
if (StringUtils.isNotEmpty(host)) {
|
||||
// Replace 'hdfs://key/' to 'hdfs://name_service/key/'
|
||||
// Or hdfs:///abc to hdfs://name_service/abc
|
||||
// TODO: check host in path when the 'dfs.nameservices' has multiple hosts
|
||||
return location.replace(normalizedPrefix, normalizedPrefix + host + "/");
|
||||
} else {
|
||||
// 'hdfs://null/' equals the 'hdfs:///'
|
||||
if (location.startsWith(HdfsResource.HDFS_PREFIX + "///")) {
|
||||
// Do not support hdfs:///location
|
||||
throw new RuntimeException("Invalid location with empty host: " + location);
|
||||
} else {
|
||||
// Replace 'hdfs://key/' to '/key/', try access local NameNode on BE.
|
||||
return location.replace(normalizedPrefix, "/");
|
||||
}
|
||||
}
|
||||
}
|
||||
return URLDecoder.decode(location, StandardCharsets.UTF_8.name());
|
||||
}
|
||||
|
||||
/**
|
||||
* The converted path is used for BE
|
||||
* @param location origin split path
|
||||
* @return BE scan range path
|
||||
*/
|
||||
public static Path toScanRangeLocation(String location, Map<String, String> props) {
|
||||
// All storage will use s3 client on BE.
|
||||
if (isObjStorage(location)) {
|
||||
int pos = location.indexOf("://");
|
||||
if (pos == -1) {
|
||||
throw new RuntimeException("No '://' found in location: " + location);
|
||||
}
|
||||
if (isHdfsOnOssEndpoint(location)) {
|
||||
// if hdfs service is enabled on oss, use oss location
|
||||
// example: oss://examplebucket.cn-shanghai.oss-dls.aliyuncs.com/dir/file/0000.orc
|
||||
location = "oss" + location.substring(pos);
|
||||
} else {
|
||||
location = "s3" + location.substring(pos);
|
||||
}
|
||||
}
|
||||
return new Path(normalizedLocation(location, props));
|
||||
}
|
||||
|
||||
public static boolean isHdfsOnOssEndpoint(String location) {
|
||||
// example: cn-shanghai.oss-dls.aliyuncs.com contains the "oss-dls.aliyuncs".
|
||||
// https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
|
||||
return location.contains("oss-dls.aliyuncs");
|
||||
}
|
||||
|
||||
public static S3Client buildS3Client(URI endpoint, String region, CloudCredential credential) {
|
||||
StaticCredentialsProvider scp;
|
||||
|
||||
@ -34,14 +34,13 @@ import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.CacheBulkLoader;
|
||||
import org.apache.doris.common.util.S3Util;
|
||||
import org.apache.doris.common.util.LocationPath;
|
||||
import org.apache.doris.datasource.CacheException;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
|
||||
import org.apache.doris.datasource.property.PropertyConverter;
|
||||
import org.apache.doris.external.hive.util.HiveUtil;
|
||||
import org.apache.doris.fs.FileSystemCache;
|
||||
import org.apache.doris.fs.FileSystemFactory;
|
||||
import org.apache.doris.fs.RemoteFiles;
|
||||
import org.apache.doris.fs.remote.RemoteFile;
|
||||
import org.apache.doris.fs.remote.RemoteFileSystem;
|
||||
@ -361,7 +360,7 @@ public class HiveMetaStoreCache {
|
||||
String bindBrokerName) throws UserException {
|
||||
FileCacheValue result = new FileCacheValue();
|
||||
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
|
||||
new FileSystemCache.FileSystemCacheKey(FileSystemFactory.getFSIdentity(
|
||||
new FileSystemCache.FileSystemCacheKey(LocationPath.getFSIdentity(
|
||||
location, bindBrokerName), jobConf, bindBrokerName));
|
||||
result.setSplittable(HiveUtil.isSplittable(fs, inputFormat, location, jobConf));
|
||||
try {
|
||||
@ -374,9 +373,10 @@ public class HiveMetaStoreCache {
|
||||
// https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
|
||||
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, true);
|
||||
for (RemoteFile remoteFile : locatedFiles.files()) {
|
||||
Path srcPath = remoteFile.getPath();
|
||||
Path convertedPath = S3Util.toScanRangeLocation(srcPath.toString(), catalog.getProperties());
|
||||
if (!convertedPath.toString().equals(srcPath.toString())) {
|
||||
String srcPath = remoteFile.getPath().toString();
|
||||
LocationPath locationPath = new LocationPath(srcPath, catalog.getProperties());
|
||||
Path convertedPath = locationPath.toScanRangeLocation();
|
||||
if (!convertedPath.toString().equals(srcPath)) {
|
||||
remoteFile.setPath(convertedPath);
|
||||
}
|
||||
result.addFile(remoteFile);
|
||||
@ -400,13 +400,12 @@ public class HiveMetaStoreCache {
|
||||
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
|
||||
String finalLocation = S3Util.convertToS3IfNecessary(key.location,
|
||||
catalog.getCatalogProperty().getProperties());
|
||||
Map<String, String> props = catalog.getCatalogProperty().getProperties();
|
||||
LocationPath finalLocation = new LocationPath(key.location, props);
|
||||
// disable the fs cache in FileSystem, or it will always from new FileSystem
|
||||
// and save it in cache when calling FileInputFormat.setInputPaths().
|
||||
try {
|
||||
Path path = new Path(finalLocation);
|
||||
URI uri = path.toUri();
|
||||
URI uri = finalLocation.getPath().toUri();
|
||||
if (uri.getScheme() != null) {
|
||||
String scheme = uri.getScheme();
|
||||
updateJobConf("fs." + scheme + ".impl.disable.cache", "true");
|
||||
@ -419,13 +418,13 @@ public class HiveMetaStoreCache {
|
||||
} catch (Exception e) {
|
||||
LOG.warn("unknown scheme in path: " + finalLocation, e);
|
||||
}
|
||||
FileInputFormat.setInputPaths(jobConf, finalLocation);
|
||||
FileInputFormat.setInputPaths(jobConf, finalLocation.get());
|
||||
try {
|
||||
FileCacheValue result;
|
||||
InputFormat<?, ?> inputFormat = HiveUtil.getInputFormat(jobConf, key.inputFormat, false);
|
||||
// TODO: This is a temp config, will remove it after the HiveSplitter is stable.
|
||||
if (key.useSelfSplitter) {
|
||||
result = getFileCache(finalLocation, inputFormat, jobConf,
|
||||
result = getFileCache(finalLocation.get(), inputFormat, jobConf,
|
||||
key.getPartitionValues(), key.bindBrokerName);
|
||||
} else {
|
||||
InputSplit[] splits;
|
||||
@ -442,8 +441,9 @@ public class HiveMetaStoreCache {
|
||||
for (int i = 0; i < splits.length; i++) {
|
||||
org.apache.hadoop.mapred.FileSplit fs = ((org.apache.hadoop.mapred.FileSplit) splits[i]);
|
||||
// todo: get modification time
|
||||
Path splitFilePath = S3Util.toScanRangeLocation(fs.getPath().toString(),
|
||||
catalog.getProperties());
|
||||
String dataFilePath = fs.getPath().toString();
|
||||
LocationPath locationPath = new LocationPath(dataFilePath, catalog.getProperties());
|
||||
Path splitFilePath = locationPath.toScanRangeLocation();
|
||||
result.addSplit(new FileSplit(splitFilePath, fs.getStart(), fs.getLength(), -1, null, null));
|
||||
}
|
||||
}
|
||||
@ -812,7 +812,7 @@ public class HiveMetaStoreCache {
|
||||
String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString();
|
||||
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
|
||||
new FileSystemCache.FileSystemCacheKey(
|
||||
FileSystemFactory.getFSIdentity(baseOrDeltaPath.toUri().toString(),
|
||||
LocationPath.getFSIdentity(baseOrDeltaPath.toUri().toString(),
|
||||
bindBrokerName), jobConf, bindBrokerName));
|
||||
Status status = fs.exists(acidVersionPath);
|
||||
if (status != Status.OK) {
|
||||
@ -835,7 +835,7 @@ public class HiveMetaStoreCache {
|
||||
String location = delta.getPath().toString();
|
||||
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
|
||||
new FileSystemCache.FileSystemCacheKey(
|
||||
FileSystemFactory.getFSIdentity(location, bindBrokerName),
|
||||
LocationPath.getFSIdentity(location, bindBrokerName),
|
||||
jobConf, bindBrokerName));
|
||||
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
|
||||
if (delta.isDeleteDelta()) {
|
||||
@ -855,7 +855,7 @@ public class HiveMetaStoreCache {
|
||||
String location = directory.getBaseDirectory().toString();
|
||||
RemoteFileSystem fs = Env.getCurrentEnv().getExtMetaCacheMgr().getFsCache().getRemoteFileSystem(
|
||||
new FileSystemCache.FileSystemCacheKey(
|
||||
FileSystemFactory.getFSIdentity(location, bindBrokerName),
|
||||
LocationPath.getFSIdentity(location, bindBrokerName),
|
||||
jobConf, bindBrokerName));
|
||||
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
|
||||
locatedFiles.files().stream().filter(
|
||||
|
||||
@ -17,7 +17,7 @@
|
||||
|
||||
package org.apache.doris.datasource.property;
|
||||
|
||||
import org.apache.doris.common.util.S3Util;
|
||||
import org.apache.doris.common.util.LocationPath;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.CatalogMgr;
|
||||
import org.apache.doris.datasource.InitCatalogLog.Type;
|
||||
@ -301,7 +301,7 @@ public class PropertyConverter {
|
||||
ossProperties.put("fs.oss.impl.disable.cache", "true");
|
||||
ossProperties.put("fs.oss.impl", getHadoopFSImplByScheme("oss"));
|
||||
boolean hdfsEnabled = Boolean.parseBoolean(props.getOrDefault(OssProperties.OSS_HDFS_ENABLED, "false"));
|
||||
if (S3Util.isHdfsOnOssEndpoint(endpoint) || hdfsEnabled) {
|
||||
if (LocationPath.isHdfsOnOssEndpoint(endpoint) || hdfsEnabled) {
|
||||
// use endpoint or enable hdfs
|
||||
rewriteHdfsOnOssProperties(ossProperties, endpoint);
|
||||
}
|
||||
@ -321,7 +321,7 @@ public class PropertyConverter {
|
||||
}
|
||||
|
||||
private static void rewriteHdfsOnOssProperties(Map<String, String> ossProperties, String endpoint) {
|
||||
if (!S3Util.isHdfsOnOssEndpoint(endpoint)) {
|
||||
if (!LocationPath.isHdfsOnOssEndpoint(endpoint)) {
|
||||
// just for robustness here, avoid wrong endpoint when oss-hdfs is enabled.
|
||||
// convert "oss-cn-beijing.aliyuncs.com" to "cn-beijing.oss-dls.aliyuncs.com"
|
||||
// reference link: https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
|
||||
|
||||
@ -18,9 +18,6 @@
|
||||
package org.apache.doris.fs;
|
||||
|
||||
import org.apache.doris.analysis.StorageBackend;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.util.S3Util;
|
||||
import org.apache.doris.fs.remote.BrokerFileSystem;
|
||||
import org.apache.doris.fs.remote.RemoteFileSystem;
|
||||
import org.apache.doris.fs.remote.S3FileSystem;
|
||||
@ -28,12 +25,10 @@ import org.apache.doris.fs.remote.dfs.DFSFileSystem;
|
||||
import org.apache.doris.fs.remote.dfs.JFSFileSystem;
|
||||
import org.apache.doris.fs.remote.dfs.OFSFileSystem;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@ -56,35 +51,6 @@ public class FileSystemFactory {
|
||||
}
|
||||
}
|
||||
|
||||
public static Pair<FileSystemType, String> getFSIdentity(String location, String bindBrokerName) {
|
||||
FileSystemType fsType;
|
||||
if (bindBrokerName != null) {
|
||||
fsType = FileSystemType.BROKER;
|
||||
} else if (S3Util.isObjStorage(location)) {
|
||||
if (S3Util.isHdfsOnOssEndpoint(location)) {
|
||||
// if hdfs service is enabled on oss, use hdfs lib to access oss.
|
||||
fsType = FileSystemType.DFS;
|
||||
} else {
|
||||
fsType = FileSystemType.S3;
|
||||
}
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_HDFS) || location.startsWith(FeConstants.FS_PREFIX_GFS)
|
||||
|| location.startsWith(FeConstants.FS_PREFIX_VIEWFS)) {
|
||||
fsType = FileSystemType.DFS;
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_OFS) || location.startsWith(FeConstants.FS_PREFIX_COSN)) {
|
||||
// ofs:// and cosn:// use the same underlying file system: Tencent Cloud HDFS, aka CHDFS)) {
|
||||
fsType = FileSystemType.OFS;
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
|
||||
fsType = FileSystemType.JFS;
|
||||
} else {
|
||||
throw new UnsupportedOperationException("Unknown file system for location: " + location);
|
||||
}
|
||||
|
||||
Path path = new Path(location);
|
||||
URI uri = path.toUri();
|
||||
String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" + Strings.nullToEmpty(uri.getAuthority());
|
||||
return Pair.of(fsType, fsIdent);
|
||||
}
|
||||
|
||||
public static RemoteFileSystem getRemoteFileSystem(FileSystemType type, Configuration conf,
|
||||
String bindBrokerName) {
|
||||
Map<String, String> properties = new HashMap<>();
|
||||
|
||||
@ -30,11 +30,9 @@ import org.apache.doris.catalog.HdfsResource;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.external.ExternalTable;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.NotImplementedException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.BrokerUtil;
|
||||
import org.apache.doris.common.util.S3Util;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.hive.AcidInfo;
|
||||
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
|
||||
@ -83,7 +81,6 @@ import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
@ -492,33 +489,4 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
protected abstract TableIf getTargetTable() throws UserException;
|
||||
|
||||
protected abstract Map<String, String> getLocationProperties() throws UserException;
|
||||
|
||||
protected static Optional<TFileType> getTFileType(String location) {
|
||||
if (location != null && !location.isEmpty()) {
|
||||
if (S3Util.isObjStorage(location)) {
|
||||
if (S3Util.isHdfsOnOssEndpoint(location)) {
|
||||
// if hdfs service is enabled on oss, use hdfs lib to access oss.
|
||||
return Optional.of(TFileType.FILE_HDFS);
|
||||
}
|
||||
return Optional.of(TFileType.FILE_S3);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_HDFS)) {
|
||||
return Optional.of(TFileType.FILE_HDFS);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_VIEWFS)) {
|
||||
return Optional.of(TFileType.FILE_HDFS);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_COSN)) {
|
||||
return Optional.of(TFileType.FILE_HDFS);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
|
||||
return Optional.of(TFileType.FILE_LOCAL);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
|
||||
return Optional.of(TFileType.FILE_BROKER);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_GFS)) {
|
||||
return Optional.of(TFileType.FILE_BROKER);
|
||||
} else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
|
||||
return Optional.of(TFileType.FILE_BROKER);
|
||||
}
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -32,6 +32,7 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.LocationPath;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
|
||||
@ -66,6 +67,7 @@ import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -334,7 +336,7 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
if (bindBrokerName != null) {
|
||||
return TFileType.FILE_BROKER;
|
||||
}
|
||||
return getTFileType(location).orElseThrow(() ->
|
||||
return Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() ->
|
||||
new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName()));
|
||||
}
|
||||
|
||||
|
||||
@ -26,7 +26,7 @@ import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.catalog.external.ExternalTable;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.S3Util;
|
||||
import org.apache.doris.common.util.LocationPath;
|
||||
import org.apache.doris.datasource.hive.HivePartition;
|
||||
import org.apache.doris.planner.ListPartitionPrunerV2;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
@ -43,7 +43,6 @@ import org.apache.doris.thrift.THudiFileDesc;
|
||||
import org.apache.doris.thrift.TTableFormatFileDesc;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -287,9 +286,11 @@ public class HudiScanNode extends HiveScanNode {
|
||||
noLogsSplitNum.incrementAndGet();
|
||||
String filePath = baseFile.getPath();
|
||||
long fileSize = baseFile.getFileSize();
|
||||
splits.add(new FileSplit(S3Util.toScanRangeLocation(filePath, Maps.newHashMap()),
|
||||
0, fileSize, fileSize, new String[0],
|
||||
partition.getPartitionValues()));
|
||||
// Need add hdfs host to location
|
||||
LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties());
|
||||
Path splitFilePath = locationPath.toScanRangeLocation();
|
||||
splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize,
|
||||
new String[0], partition.getPartitionValues()));
|
||||
});
|
||||
} else {
|
||||
fileSystemView.getLatestMergedFileSlicesBeforeOrOn(partitionName, queryInstant).forEach(fileSlice -> {
|
||||
|
||||
@ -31,7 +31,7 @@ import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.catalog.external.IcebergExternalTable;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.S3Util;
|
||||
import org.apache.doris.common.util.LocationPath;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
|
||||
import org.apache.doris.external.iceberg.util.IcebergUtils;
|
||||
@ -141,7 +141,9 @@ public class IcebergScanNode extends FileQueryScanNode {
|
||||
for (IcebergDeleteFileFilter filter : icebergSplit.getDeleteFileFilters()) {
|
||||
TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc();
|
||||
String deleteFilePath = filter.getDeleteFilePath();
|
||||
deleteFileDesc.setPath(S3Util.toScanRangeLocation(deleteFilePath, icebergSplit.getConfig()).toString());
|
||||
LocationPath locationPath = new LocationPath(deleteFilePath, icebergSplit.getConfig());
|
||||
Path splitDeletePath = locationPath.toScanRangeLocation();
|
||||
deleteFileDesc.setPath(splitDeletePath.toString());
|
||||
if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
|
||||
fileDesc.setContent(FileContent.POSITION_DELETES.id());
|
||||
IcebergDeleteFileFilter.PositionDelete positionDelete =
|
||||
@ -221,8 +223,8 @@ public class IcebergScanNode extends FileQueryScanNode {
|
||||
// Counts the number of partitions read
|
||||
partitionPathSet.add(structLike.toString());
|
||||
}
|
||||
|
||||
Path finalDataFilePath = S3Util.toScanRangeLocation(dataFilePath, source.getCatalog().getProperties());
|
||||
LocationPath locationPath = new LocationPath(dataFilePath, source.getCatalog().getProperties());
|
||||
Path finalDataFilePath = locationPath.toScanRangeLocation();
|
||||
IcebergSplit split = new IcebergSplit(
|
||||
finalDataFilePath,
|
||||
splitTask.start(),
|
||||
@ -323,7 +325,7 @@ public class IcebergScanNode extends FileQueryScanNode {
|
||||
@Override
|
||||
public TFileType getLocationType(String location) throws UserException {
|
||||
final String fLocation = normalizeLocation(location);
|
||||
return getTFileType(fLocation).orElseThrow(() ->
|
||||
return Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() ->
|
||||
new DdlException("Unknown file location " + fLocation + " for iceberg table " + icebergTable.name()));
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,178 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.common.util;
|
||||
|
||||
import org.apache.doris.fs.FileSystemType;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class LocationPathTest {
|
||||
|
||||
@Test
|
||||
public void testHdfsLocationConvert() {
|
||||
// non HA
|
||||
Map<String, String> rangeProps = new HashMap<>();
|
||||
LocationPath locationPath = new LocationPath("hdfs://dir/file.path", rangeProps);
|
||||
Assertions.assertTrue(locationPath.get().startsWith("hdfs://"));
|
||||
|
||||
String beLocation = locationPath.toScanRangeLocation().toString();
|
||||
Assertions.assertTrue(beLocation.startsWith("hdfs://"));
|
||||
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.DFS);
|
||||
|
||||
// HA props
|
||||
Map<String, String> props = new HashMap<>();
|
||||
props.put("dfs.nameservices", "ns");
|
||||
locationPath = new LocationPath("hdfs:///dir/file.path", props);
|
||||
Assertions.assertTrue(locationPath.get().startsWith("hdfs://")
|
||||
&& !locationPath.get().startsWith("hdfs:///"));
|
||||
|
||||
beLocation = locationPath.toScanRangeLocation().toString();
|
||||
Assertions.assertTrue(beLocation.startsWith("hdfs://") && !beLocation.startsWith("hdfs:///"));
|
||||
|
||||
// nonstandard '/' for hdfs path
|
||||
locationPath = new LocationPath("hdfs:/dir/file.path", props);
|
||||
Assertions.assertTrue(locationPath.get().startsWith("hdfs://"));
|
||||
|
||||
beLocation = locationPath.toScanRangeLocation().toString();
|
||||
Assertions.assertTrue(beLocation.startsWith("hdfs://"));
|
||||
|
||||
// empty ha nameservices
|
||||
props.put("dfs.nameservices", "");
|
||||
locationPath = new LocationPath("hdfs:/dir/file.path", props);
|
||||
|
||||
beLocation = locationPath.toScanRangeLocation().toString();
|
||||
Assertions.assertTrue(locationPath.get().startsWith("/dir")
|
||||
&& !locationPath.get().startsWith("hdfs://"));
|
||||
Assertions.assertTrue(beLocation.startsWith("/dir") && !beLocation.startsWith("hdfs://"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testJFSLocationConvert() {
|
||||
String loc;
|
||||
Map<String, String> rangeProps = new HashMap<>();
|
||||
|
||||
LocationPath locationPath = new LocationPath("jfs://test.com", rangeProps);
|
||||
// FE
|
||||
Assertions.assertTrue(locationPath.get().startsWith("jfs://"));
|
||||
// BE
|
||||
loc = locationPath.toScanRangeLocation().toString();
|
||||
Assertions.assertTrue(loc.startsWith("jfs://"));
|
||||
Assertions.assertEquals(LocationPath.getFSIdentity(loc, null).first, FileSystemType.JFS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGSLocationConvert() {
|
||||
Map<String, String> rangeProps = new HashMap<>();
|
||||
|
||||
// use s3 client to access gs
|
||||
LocationPath locationPath = new LocationPath("gs://test.com", rangeProps);
|
||||
// FE
|
||||
Assertions.assertTrue(locationPath.get().startsWith("s3://"));
|
||||
// BE
|
||||
String beLoc = locationPath.toScanRangeLocation().toString();
|
||||
Assertions.assertTrue(beLoc.startsWith("s3://"));
|
||||
Assertions.assertEquals(LocationPath.getFSIdentity(beLoc, null).first, FileSystemType.S3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOSSLocationConvert() {
|
||||
Map<String, String> rangeProps = new HashMap<>();
|
||||
LocationPath locationPath = new LocationPath("oss://test.com", rangeProps);
|
||||
// FE
|
||||
Assertions.assertTrue(locationPath.get().startsWith("oss://"));
|
||||
// BE
|
||||
String beLocation = locationPath.toScanRangeLocation().toString();
|
||||
Assertions.assertTrue(beLocation.startsWith("s3://"));
|
||||
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3);
|
||||
|
||||
locationPath = new LocationPath("oss://test.oss-dls.aliyuncs.com/path", rangeProps);
|
||||
// FE
|
||||
Assertions.assertTrue(locationPath.get().startsWith("oss://test.oss-dls.aliyuncs"));
|
||||
// BE
|
||||
beLocation = locationPath.toScanRangeLocation().toString();
|
||||
Assertions.assertTrue(beLocation.startsWith("oss://test.oss-dls.aliyuncs"));
|
||||
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.DFS);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCOSLocationConvert() {
|
||||
Map<String, String> rangeProps = new HashMap<>();
|
||||
LocationPath locationPath = new LocationPath("cos://test.com", rangeProps);
|
||||
// FE
|
||||
Assertions.assertTrue(locationPath.get().startsWith("cos://"));
|
||||
String beLocation = locationPath.toScanRangeLocation().toString();
|
||||
// BE
|
||||
Assertions.assertTrue(beLocation.startsWith("s3://"));
|
||||
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3);
|
||||
|
||||
locationPath = new LocationPath("cosn://test.com", rangeProps);
|
||||
// FE
|
||||
Assertions.assertTrue(locationPath.get().startsWith("cosn://"));
|
||||
// BE
|
||||
beLocation = locationPath.toScanRangeLocation().toString();
|
||||
Assertions.assertTrue(beLocation.startsWith("cosn://"));
|
||||
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.OFS);
|
||||
|
||||
locationPath = new LocationPath("ofs://test.com", rangeProps);
|
||||
// FE
|
||||
Assertions.assertTrue(locationPath.get().startsWith("ofs://"));
|
||||
// BE
|
||||
beLocation = locationPath.toScanRangeLocation().toString();
|
||||
Assertions.assertTrue(beLocation.startsWith("ofs://"));
|
||||
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.OFS);
|
||||
|
||||
// GFS is now equals to DFS
|
||||
locationPath = new LocationPath("gfs://test.com", rangeProps);
|
||||
// FE
|
||||
Assertions.assertTrue(locationPath.get().startsWith("gfs://"));
|
||||
// BE
|
||||
beLocation = locationPath.toScanRangeLocation().toString();
|
||||
Assertions.assertTrue(beLocation.startsWith("gfs://"));
|
||||
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.DFS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOBSLocationConvert() {
|
||||
Map<String, String> rangeProps = new HashMap<>();
|
||||
LocationPath locationPath = new LocationPath("obs://test.com", rangeProps);
|
||||
// FE
|
||||
Assertions.assertTrue(locationPath.get().startsWith("obs://"));
|
||||
// BE
|
||||
String beLocation = locationPath.toScanRangeLocation().toString();
|
||||
Assertions.assertTrue(beLocation.startsWith("s3://"));
|
||||
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnsupportedLocationConvert() {
|
||||
// when use unknown location, pass to BE
|
||||
Map<String, String> rangeProps = new HashMap<>();
|
||||
LocationPath locationPath = new LocationPath("unknown://test.com", rangeProps);
|
||||
// FE
|
||||
Assertions.assertTrue(locationPath.get().startsWith("unknown://"));
|
||||
// BE
|
||||
String beLocation = locationPath.toScanRangeLocation().toString();
|
||||
Assertions.assertTrue(beLocation.startsWith("unknown://"));
|
||||
}
|
||||
}
|
||||
@ -1,104 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.common.util;
|
||||
|
||||
import org.apache.doris.fs.FileSystemFactory;
|
||||
import org.apache.doris.fs.FileSystemType;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class S3UtilTest {
|
||||
@Test
|
||||
public void testLocationConvert() {
|
||||
String loc;
|
||||
loc = S3Util.convertToS3IfNecessary("hdfs://dir/file.path", new HashMap<>());
|
||||
Assertions.assertTrue(loc.startsWith("hdfs://"));
|
||||
|
||||
Map<String, String> props = new HashMap<>();
|
||||
props.put("dfs.nameservices", "ns");
|
||||
loc = S3Util.convertToS3IfNecessary("hdfs:///dir/file.path", props);
|
||||
Assertions.assertTrue(loc.startsWith("hdfs://") && !loc.startsWith("hdfs:///"));
|
||||
loc = S3Util.convertToS3IfNecessary("hdfs:/dir/file.path", props);
|
||||
Assertions.assertTrue(loc.startsWith("hdfs://"));
|
||||
props.put("dfs.nameservices", "");
|
||||
loc = S3Util.convertToS3IfNecessary("hdfs:/dir/file.path", props);
|
||||
Assertions.assertTrue(loc.startsWith("/dir") && !loc.startsWith("hdfs://"));
|
||||
|
||||
loc = S3Util.convertToS3IfNecessary("oss://test.com", props);
|
||||
Assertions.assertTrue(loc.startsWith("oss://"));
|
||||
|
||||
loc = S3Util.convertToS3IfNecessary("gcs://test.com", props);
|
||||
Assertions.assertTrue(loc.startsWith("gcs://"));
|
||||
|
||||
loc = S3Util.convertToS3IfNecessary("cos://test.com", props);
|
||||
Assertions.assertTrue(loc.startsWith("cos://"));
|
||||
|
||||
loc = S3Util.convertToS3IfNecessary("cosn://test.com", props);
|
||||
Assertions.assertTrue(loc.startsWith("cosn://"));
|
||||
|
||||
loc = S3Util.convertToS3IfNecessary("obs://test.com", props);
|
||||
Assertions.assertTrue(loc.startsWith("obs://"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testScanRangeLocationConvert() throws Exception {
|
||||
String loc;
|
||||
Map<String, String> rangeProps = new HashMap<>();
|
||||
loc = S3Util.toScanRangeLocation("hdfs://dir/file.path", rangeProps).toString();
|
||||
Assertions.assertTrue(loc.startsWith("hdfs://"));
|
||||
Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.DFS);
|
||||
|
||||
Map<String, String> props = new HashMap<>();
|
||||
props.put("dfs.nameservices", "ns");
|
||||
loc = S3Util.toScanRangeLocation("hdfs:///dir/file.path", props).toString();
|
||||
Assertions.assertTrue(loc.startsWith("hdfs://") && !loc.startsWith("hdfs:///"));
|
||||
loc = S3Util.toScanRangeLocation("hdfs:/dir/file.path", props).toString();
|
||||
Assertions.assertTrue(loc.startsWith("hdfs://"));
|
||||
props.put("dfs.nameservices", "");
|
||||
loc = S3Util.toScanRangeLocation("hdfs:/dir/file.path", props).toString();
|
||||
Assertions.assertTrue(loc.startsWith("/dir") && !loc.startsWith("hdfs://"));
|
||||
|
||||
loc = S3Util.toScanRangeLocation("oss://test.com", rangeProps).toString();
|
||||
Assertions.assertTrue(loc.startsWith("s3://"));
|
||||
Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.S3);
|
||||
|
||||
loc = S3Util.toScanRangeLocation("oss://test.oss-dls.aliyuncs.com/path", rangeProps).toString();
|
||||
Assertions.assertTrue(loc.startsWith("oss://test.oss-dls.aliyuncs"));
|
||||
Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.DFS);
|
||||
|
||||
loc = S3Util.toScanRangeLocation("cos://test.com", rangeProps).toString();
|
||||
Assertions.assertTrue(loc.startsWith("s3://"));
|
||||
Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.S3);
|
||||
|
||||
loc = S3Util.toScanRangeLocation("cosn://test.com", rangeProps).toString();
|
||||
Assertions.assertTrue(loc.startsWith("cosn://"));
|
||||
Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.OFS);
|
||||
|
||||
loc = S3Util.toScanRangeLocation("obs://test.com", rangeProps).toString();
|
||||
Assertions.assertTrue(loc.startsWith("s3://"));
|
||||
Assertions.assertEquals(FileSystemFactory.getFSIdentity(loc, null).first, FileSystemType.S3);
|
||||
|
||||
loc = S3Util.toScanRangeLocation("unknown://test.com", rangeProps).toString();
|
||||
Assertions.assertTrue(loc.startsWith("unknown://"));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user