[fix](catalog) refactor location path and support default fs #39116 (#39203)

This commit is contained in:
Mingyu Chen
2024-08-24 16:05:13 +08:00
committed by GitHub
parent 76596e5f73
commit 9640e2de44
24 changed files with 433 additions and 530 deletions

View File

@ -20,6 +20,7 @@ package org.apache.doris.common.util;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.property.constants.CosProperties;
import org.apache.doris.datasource.property.constants.ObsProperties;
import org.apache.doris.datasource.property.constants.OssProperties;
@ -27,7 +28,9 @@ import org.apache.doris.datasource.property.constants.S3Properties;
import org.apache.doris.fs.FileSystemType;
import org.apache.doris.thrift.TFileType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
@ -49,10 +52,11 @@ public class LocationPath {
private static final Logger LOG = LogManager.getLogger(LocationPath.class);
private static final String SCHEME_DELIM = "://";
private static final String NONSTANDARD_SCHEME_DELIM = ":/";
private final LocationType locationType;
private final Scheme scheme;
private final String location;
private final boolean isBindBroker;
public enum LocationType {
public enum Scheme {
HDFS,
LOCAL, // Local File
BOS, // Baidu
@ -74,122 +78,230 @@ public class LocationPath {
NOSCHEME // no scheme info
}
private LocationPath(String location) {
this(location, Collections.emptyMap(), true);
@VisibleForTesting
public LocationPath(String location) {
this(location, Maps.newHashMap(), true);
}
public LocationPath(String location, Map<String, String> props) {
this(location, props, true);
}
public LocationPath(String location, Map<String, String> props, boolean convertPath) {
String scheme = parseScheme(location).toLowerCase();
if (scheme.isEmpty()) {
locationType = LocationType.NOSCHEME;
this.location = location;
} else {
switch (scheme) {
case FeConstants.FS_PREFIX_HDFS:
locationType = LocationType.HDFS;
// Need add hdfs host to location
String host = props.get(HdfsResource.DSF_NAMESERVICES);
this.location = convertPath ? normalizedHdfsPath(location, host) : location;
break;
case FeConstants.FS_PREFIX_S3:
locationType = LocationType.S3;
this.location = location;
break;
case FeConstants.FS_PREFIX_S3A:
locationType = LocationType.S3A;
this.location = convertPath ? convertToS3(location) : location;
break;
case FeConstants.FS_PREFIX_S3N:
// include the check for multi locations and in a table, such as both s3 and hdfs are in a table.
locationType = LocationType.S3N;
this.location = convertPath ? convertToS3(location) : location;
break;
case FeConstants.FS_PREFIX_BOS:
locationType = LocationType.BOS;
// use s3 client to access
this.location = convertPath ? convertToS3(location) : location;
break;
case FeConstants.FS_PREFIX_GCS:
locationType = LocationType.GCS;
// use s3 client to access
this.location = convertPath ? convertToS3(location) : location;
break;
case FeConstants.FS_PREFIX_OSS:
if (isHdfsOnOssEndpoint(location)) {
locationType = LocationType.OSS_HDFS;
this.location = location;
} else {
if (useS3EndPoint(props)) {
this.location = convertPath ? convertToS3(location) : location;
} else {
this.location = location;
}
locationType = LocationType.OSS;
}
break;
case FeConstants.FS_PREFIX_COS:
private LocationPath(String originLocation, Map<String, String> props, boolean convertPath) {
isBindBroker = props.containsKey(HMSExternalCatalog.BIND_BROKER_NAME);
String tmpLocation = originLocation;
if (!originLocation.contains(SCHEME_DELIM)) {
// Sometimes the file path does not contain scheme, need to add default fs
// eg, /path/to/file.parquet -> hdfs://nn/path/to/file.parquet
// the default fs is from the catalog properties
String defaultFS = props.getOrDefault(HdfsResource.HADOOP_FS_NAME, "");
tmpLocation = defaultFS + originLocation;
}
String scheme = parseScheme(tmpLocation).toLowerCase();
switch (scheme) {
case "":
this.scheme = Scheme.NOSCHEME;
break;
case FeConstants.FS_PREFIX_HDFS:
this.scheme = Scheme.HDFS;
// Need add hdfs host to location
String host = props.get(HdfsResource.DSF_NAMESERVICES);
tmpLocation = convertPath ? normalizedHdfsPath(tmpLocation, host) : tmpLocation;
break;
case FeConstants.FS_PREFIX_S3:
this.scheme = Scheme.S3;
break;
case FeConstants.FS_PREFIX_S3A:
this.scheme = Scheme.S3A;
tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation;
break;
case FeConstants.FS_PREFIX_S3N:
// include the check for multi locations and in a table, such as both s3 and hdfs are in a table.
this.scheme = Scheme.S3N;
tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation;
break;
case FeConstants.FS_PREFIX_BOS:
this.scheme = Scheme.BOS;
// use s3 client to access
tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation;
break;
case FeConstants.FS_PREFIX_GCS:
this.scheme = Scheme.GCS;
// use s3 client to access
tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation;
break;
case FeConstants.FS_PREFIX_OSS:
if (isHdfsOnOssEndpoint(tmpLocation)) {
this.scheme = Scheme.OSS_HDFS;
} else {
if (useS3EndPoint(props)) {
this.location = convertPath ? convertToS3(location) : location;
} else {
this.location = location;
tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation;
}
locationType = LocationType.COS;
break;
case FeConstants.FS_PREFIX_OBS:
if (useS3EndPoint(props)) {
this.location = convertPath ? convertToS3(location) : location;
} else {
this.location = location;
}
locationType = LocationType.OBS;
break;
case FeConstants.FS_PREFIX_OFS:
locationType = LocationType.OFS;
this.location = location;
break;
case FeConstants.FS_PREFIX_JFS:
locationType = LocationType.JFS;
this.location = location;
break;
case FeConstants.FS_PREFIX_GFS:
locationType = LocationType.GFS;
this.location = location;
break;
case FeConstants.FS_PREFIX_COSN:
// if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues
locationType = LocationType.COSN;
this.location = location;
break;
case FeConstants.FS_PREFIX_LAKEFS:
locationType = LocationType.COSN;
this.location = normalizedLakefsPath(location);
break;
case FeConstants.FS_PREFIX_VIEWFS:
locationType = LocationType.VIEWFS;
this.location = location;
break;
case FeConstants.FS_PREFIX_FILE:
locationType = LocationType.LOCAL;
this.location = location;
break;
default:
locationType = LocationType.UNKNOWN;
this.location = location;
}
this.scheme = Scheme.OSS;
}
break;
case FeConstants.FS_PREFIX_COS:
if (useS3EndPoint(props)) {
tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation;
}
this.scheme = Scheme.COS;
break;
case FeConstants.FS_PREFIX_OBS:
if (useS3EndPoint(props)) {
tmpLocation = convertPath ? convertToS3(tmpLocation) : tmpLocation;
}
this.scheme = Scheme.OBS;
break;
case FeConstants.FS_PREFIX_OFS:
this.scheme = Scheme.OFS;
break;
case FeConstants.FS_PREFIX_JFS:
this.scheme = Scheme.JFS;
break;
case FeConstants.FS_PREFIX_GFS:
this.scheme = Scheme.GFS;
break;
case FeConstants.FS_PREFIX_COSN:
// if treat cosn(tencent hadoop-cos) as a s3 file system, may bring incompatible issues
this.scheme = Scheme.COSN;
break;
case FeConstants.FS_PREFIX_LAKEFS:
this.scheme = Scheme.COSN;
tmpLocation = normalizedLakefsPath(tmpLocation);
break;
case FeConstants.FS_PREFIX_VIEWFS:
this.scheme = Scheme.VIEWFS;
break;
case FeConstants.FS_PREFIX_FILE:
this.scheme = Scheme.LOCAL;
break;
default:
this.scheme = Scheme.UNKNOWN;
break;
}
this.location = tmpLocation;
}
// Return true if this location is with oss-hdfs
public static boolean isHdfsOnOssEndpoint(String location) {
// example: cn-shanghai.oss-dls.aliyuncs.com contains the "oss-dls.aliyuncs".
// https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
return location.contains("oss-dls.aliyuncs");
}
// Return the file system type and the file system identity.
// The file system identity is the scheme and authority of the URI, eg. "hdfs://host:port" or "s3://bucket".
public static Pair<FileSystemType, String> getFSIdentity(String location, String bindBrokerName) {
LocationPath locationPath = new LocationPath(location, Collections.emptyMap(), true);
FileSystemType fsType = (bindBrokerName != null) ? FileSystemType.BROKER : locationPath.getFileSystemType();
URI uri = locationPath.getPath().toUri();
String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" + Strings.nullToEmpty(uri.getAuthority());
return Pair.of(fsType, fsIdent);
}
/**
* provide file type for BE.
*
* @param location the location is from fs.listFile
* @return on BE, we will use TFileType to get the suitable client to access storage.
*/
public static TFileType getTFileTypeForBE(String location) {
if (location == null || location.isEmpty()) {
return null;
}
LocationPath locationPath = new LocationPath(location, Collections.emptyMap(), false);
return locationPath.getTFileTypeForBE();
}
public static String getTempWritePath(String loc, String prefix) {
Path tempRoot = new Path(loc, prefix);
Path tempPath = new Path(tempRoot, UUID.randomUUID().toString().replace("-", ""));
return tempPath.toString();
}
public TFileType getTFileTypeForBE() {
switch (scheme) {
case S3:
case S3A:
case S3N:
case COS:
case OSS:
case OBS:
case BOS:
case GCS:
// ATTN, for COSN, on FE side, use HadoopFS to access, but on BE, use S3 client to access.
case COSN:
case LAKEFS:
// now we only support S3 client for object storage on BE
return TFileType.FILE_S3;
case HDFS:
case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib to access oss.
case VIEWFS:
return TFileType.FILE_HDFS;
case GFS:
case JFS:
case OFS:
return TFileType.FILE_BROKER;
case LOCAL:
return TFileType.FILE_LOCAL;
default:
return null;
}
}
private static String parseScheme(String location) {
/**
* The converted path is used for BE
*
* @return BE scan range path
*/
public Path toStorageLocation() {
switch (scheme) {
case S3:
case S3A:
case S3N:
case COS:
case OSS:
case OBS:
case BOS:
case GCS:
case COSN:
// All storage will use s3 client to access on BE, so need convert to s3
return new Path(convertToS3(location));
case HDFS:
case OSS_HDFS:
case VIEWFS:
case GFS:
case JFS:
case OFS:
case LOCAL:
default:
return getPath();
}
}
public Scheme getScheme() {
return scheme;
}
public String get() {
return location;
}
public Path getPath() {
return new Path(location);
}
public boolean isBindBroker() {
return isBindBroker;
}
private static String parseScheme(String finalLocation) {
String scheme = "";
String[] schemeSplit = location.split(SCHEME_DELIM);
String[] schemeSplit = finalLocation.split(SCHEME_DELIM);
if (schemeSplit.length > 1) {
scheme = schemeSplit[0];
} else {
schemeSplit = location.split(NONSTANDARD_SCHEME_DELIM);
schemeSplit = finalLocation.split(NONSTANDARD_SCHEME_DELIM);
if (schemeSplit.length > 1) {
scheme = schemeSplit[0];
}
@ -198,9 +310,9 @@ public class LocationPath {
// if not get scheme, need consider /path/to/local to no scheme
if (scheme.isEmpty()) {
try {
Paths.get(location);
Paths.get(finalLocation);
} catch (InvalidPathException exception) {
throw new IllegalArgumentException("Fail to parse scheme, invalid location: " + location);
throw new IllegalArgumentException("Fail to parse scheme, invalid location: " + finalLocation);
}
}
@ -217,14 +329,9 @@ public class LocationPath {
return (props.containsKey(S3Properties.ENDPOINT) || props.containsKey(S3Properties.Env.ENDPOINT));
}
public static boolean isHdfsOnOssEndpoint(String location) {
// example: cn-shanghai.oss-dls.aliyuncs.com contains the "oss-dls.aliyuncs".
// https://www.alibabacloud.com/help/en/e-mapreduce/latest/oss-kusisurumen
return location.contains("oss-dls.aliyuncs");
}
/**
* The converted path is used for FE to get metadata
* The converted path is used for FE to get metadata.
* Change http://xxxx to s3://xxxx
*
* @param location origin location
* @return metadata location path. just convert when storage is compatible with s3 client.
@ -291,17 +398,9 @@ public class LocationPath {
}
}
public static Pair<FileSystemType, String> getFSIdentity(String location, String bindBrokerName) {
LocationPath locationPath = new LocationPath(location);
FileSystemType fsType = (bindBrokerName != null) ? FileSystemType.BROKER : locationPath.getFileSystemType();
URI uri = locationPath.getPath().toUri();
String fsIdent = Strings.nullToEmpty(uri.getScheme()) + "://" + Strings.nullToEmpty(uri.getAuthority());
return Pair.of(fsType, fsIdent);
}
private FileSystemType getFileSystemType() {
FileSystemType fsType;
switch (locationType) {
switch (scheme) {
case S3:
case S3A:
case S3N:
@ -339,98 +438,6 @@ public class LocationPath {
return fsType;
}
/**
* provide file type for BE.
*
* @param location the location is from fs.listFile
* @return on BE, we will use TFileType to get the suitable client to access storage.
*/
public static TFileType getTFileTypeForBE(String location) {
if (location == null || location.isEmpty()) {
return null;
}
LocationPath locationPath = new LocationPath(location, Collections.emptyMap(), false);
return locationPath.getTFileTypeForBE();
}
public TFileType getTFileTypeForBE() {
switch (this.getLocationType()) {
case S3:
case S3A:
case S3N:
case COS:
case OSS:
case OBS:
case BOS:
case GCS:
// ATTN, for COSN, on FE side, use HadoopFS to access, but on BE, use S3 client to access.
case COSN:
case LAKEFS:
// now we only support S3 client for object storage on BE
return TFileType.FILE_S3;
case HDFS:
case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib to access oss.
case VIEWFS:
return TFileType.FILE_HDFS;
case GFS:
case JFS:
case OFS:
return TFileType.FILE_BROKER;
case LOCAL:
return TFileType.FILE_LOCAL;
default:
return null;
}
}
/**
* The converted path is used for BE
*
* @return BE scan range path
*/
public Path toStorageLocation() {
switch (locationType) {
case S3:
case S3A:
case S3N:
case COS:
case OSS:
case OBS:
case BOS:
case GCS:
case COSN:
// All storage will use s3 client to access on BE, so need convert to s3
return new Path(convertToS3(location));
case HDFS:
case OSS_HDFS:
case VIEWFS:
case GFS:
case JFS:
case OFS:
case LOCAL:
default:
return getPath();
}
}
public LocationType getLocationType() {
return locationType;
}
public String get() {
return location;
}
public Path getPath() {
return new Path(location);
}
public static String getTempWritePath(String loc, String prefix) {
Path tempRoot = new Path(loc, prefix);
Path tempPath = new Path(tempRoot, UUID.randomUUID().toString().replace("-", ""));
return tempPath.toString();
}
@Override
public String toString() {
return get();

View File

@ -35,10 +35,8 @@ import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.hive.AcidInfo;
import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hive.source.HiveSplit;
import org.apache.doris.datasource.iceberg.source.IcebergSplit;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
@ -268,7 +266,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
boolean isWal = fileFormatType == TFileFormatType.FORMAT_WAL;
if (isCsvOrJson || isWal) {
params.setFileAttributes(getFileAttributes());
if (getLocationType() == TFileType.FILE_STREAM) {
if (isFileStreamType()) {
params.setFileType(TFileType.FILE_STREAM);
FunctionGenTable table = (FunctionGenTable) this.desc.getTable();
ExternalFileTableValuedFunction tableValuedFunction = (ExternalFileTableValuedFunction) table.getTvf();
@ -309,19 +307,13 @@ public abstract class FileQueryScanNode extends FileScanNode {
if (ConnectContext.get().getExecutor() != null) {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
if (splitAssignment.getSampleSplit() == null && !(getLocationType() == TFileType.FILE_STREAM)) {
if (splitAssignment.getSampleSplit() == null && !isFileStreamType()) {
return;
}
selectedSplitNum = numApproximateSplits();
TFileType locationType;
FileSplit fileSplit = (FileSplit) splitAssignment.getSampleSplit();
if (fileSplit instanceof IcebergSplit
&& ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
locationType = TFileType.FILE_BROKER;
} else {
locationType = getLocationType(fileSplit.getPath().toString());
}
TFileType locationType = fileSplit.getLocationType();
totalFileSize = fileSplit.getLength() * selectedSplitNum;
long maxWaitTime = ConnectContext.get().getSessionVariable().getFetchSplitsMaxWaitTime();
// Not accurate, only used to estimate concurrency.
@ -351,7 +343,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
ConnectContext.get().getExecutor().getSummaryProfile().setGetSplitsFinishTime();
}
selectedSplitNum = inputSplits.size();
if (inputSplits.isEmpty() && !(getLocationType() == TFileType.FILE_STREAM)) {
if (inputSplits.isEmpty() && !isFileStreamType()) {
return;
}
Multimap<Backend, Split> assignment = backendPolicy.computeScanRangeAssignment(inputSplits);
@ -379,14 +371,6 @@ public abstract class FileQueryScanNode extends FileScanNode {
Split split,
List<String> pathPartitionKeys) throws UserException {
FileSplit fileSplit = (FileSplit) split;
TFileType locationType;
if (fileSplit instanceof IcebergSplit
&& ((IcebergSplit) fileSplit).getConfig().containsKey(HMSExternalCatalog.BIND_BROKER_NAME)) {
locationType = TFileType.FILE_BROKER;
} else {
locationType = getLocationType(fileSplit.getPath().toString());
}
TScanRangeLocations curLocations = newLocations();
// If fileSplit has partition values, use the values collected from hive partitions.
// Otherwise, use the values in file path.
@ -396,41 +380,42 @@ public abstract class FileQueryScanNode extends FileScanNode {
isACID = hiveSplit.isACID();
}
List<String> partitionValuesFromPath = fileSplit.getPartitionValues() == null
? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys,
? BrokerUtil.parseColumnsFromPath(fileSplit.getPathString(), pathPartitionKeys,
false, isACID) : fileSplit.getPartitionValues();
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys,
locationType);
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
TFileCompressType fileCompressType = getFileCompressType(fileSplit);
rangeDesc.setCompressType(fileCompressType);
if (isACID) {
HiveSplit hiveSplit = (HiveSplit) fileSplit;
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc();
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>();
for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) {
TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc();
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
deleteDeltaDescs.add(deleteDeltaDesc);
if (fileSplit instanceof HiveSplit) {
if (isACID) {
HiveSplit hiveSplit = (HiveSplit) fileSplit;
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc();
transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>();
for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) {
TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc();
deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
deleteDeltaDescs.add(deleteDeltaDesc);
}
transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
} else {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(TableFormatType.HIVE.value());
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
} else if (fileSplit instanceof HiveSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(TableFormatType.HIVE.value());
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
setScanParams(rangeDesc, fileSplit);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
TScanRangeLocation location = new TScanRangeLocation();
setLocationPropertiesIfNecessary(backend, locationType, locationProperties);
setLocationPropertiesIfNecessary(backend, fileSplit.getLocationType(), locationProperties);
location.setBackendId(backend.getId());
location.setServer(new TNetworkAddress(backend.getHost(), backend.getBePort()));
curLocations.addToLocations(location);
@ -493,8 +478,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
}
private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath,
List<String> columnsFromPathKeys, TFileType locationType)
throws UserException {
List<String> columnsFromPathKeys) {
TFileRangeDesc rangeDesc = new TFileRangeDesc();
rangeDesc.setStartOffset(fileSplit.getStart());
rangeDesc.setSize(fileSplit.getLength());
@ -504,10 +488,10 @@ public abstract class FileQueryScanNode extends FileScanNode {
rangeDesc.setColumnsFromPath(columnsFromPath);
rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
rangeDesc.setFileType(locationType);
rangeDesc.setPath(fileSplit.getPath().toString());
if (locationType == TFileType.FILE_HDFS) {
URI fileUri = fileSplit.getPath().toUri();
rangeDesc.setFileType(fileSplit.getLocationType());
rangeDesc.setPath(fileSplit.getPath().toStorageLocation().toString());
if (fileSplit.getLocationType() == TFileType.FILE_HDFS) {
URI fileUri = fileSplit.getPath().getPath().toUri();
rangeDesc.setFsName(fileUri.getScheme() + "://" + fileUri.getAuthority());
}
rangeDesc.setModificationTime(fileSplit.getModificationTime());
@ -555,14 +539,16 @@ public abstract class FileQueryScanNode extends FileScanNode {
return scanRangeLocations.size();
}
protected abstract TFileType getLocationType() throws UserException;
protected abstract TFileType getLocationType(String location) throws UserException;
// Return true if this is a TFileType.FILE_STREAM type.
// Currently, only TVFScanNode may be TFileType.FILE_STREAM type.
protected boolean isFileStreamType() throws UserException {
return false;
}
protected abstract TFileFormatType getFileFormatType() throws UserException;
protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException {
return Util.inferFileCompressTypeByPath(fileSplit.getPath().toString());
return Util.inferFileCompressTypeByPath(fileSplit.getPathString());
}
protected TFileAttributes getFileAttributes() throws UserException {

View File

@ -25,6 +25,7 @@ import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.qe.ConnectContext;
@ -46,7 +47,6 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -241,14 +241,14 @@ public abstract class FileScanNode extends ExternalScanNode {
}
}
protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length,
protected List<Split> splitFile(LocationPath path, long blockSize, BlockLocation[] blockLocations, long length,
long modificationTime, boolean splittable, List<String> partitionValues, SplitCreator splitCreator)
throws IOException {
if (blockLocations == null) {
blockLocations = new BlockLocation[0];
}
List<Split> result = Lists.newArrayList();
TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.toString());
TFileCompressType compressType = Util.inferFileCompressTypeByPath(path.get());
if (!splittable || compressType != TFileCompressType.PLAIN) {
if (LOG.isDebugEnabled()) {
LOG.debug("Path {} is not splittable.", path);

View File

@ -17,16 +17,17 @@
package org.apache.doris.datasource;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.spi.Split;
import org.apache.doris.thrift.TFileType;
import lombok.Data;
import org.apache.hadoop.fs.Path;
import java.util.List;
@Data
public class FileSplit implements Split {
public Path path;
public LocationPath path;
public long start;
// length of this split, in bytes
public long length;
@ -43,27 +44,30 @@ public class FileSplit implements Split {
public List<String> partitionValues;
public List<String> alternativeHosts;
// the location type for BE, eg: HDFS, LOCAL, S3
protected TFileType locationType;
public FileSplit(Path path, long start, long length, long fileLength,
public FileSplit(LocationPath path, long start, long length, long fileLength,
long modificationTime, String[] hosts, List<String> partitionValues) {
this.path = path;
this.start = start;
this.length = length;
this.fileLength = fileLength;
this.modificationTime = modificationTime;
// BE requires modification time to be non-negative.
this.modificationTime = modificationTime < 0 ? 0 : modificationTime;
this.hosts = hosts == null ? new String[0] : hosts;
this.partitionValues = partitionValues;
}
public FileSplit(Path path, long start, long length, long fileLength,
String[] hosts, List<String> partitionValues) {
this(path, start, length, fileLength, 0, hosts, partitionValues);
this.locationType = path.isBindBroker() ? TFileType.FILE_BROKER : path.getTFileTypeForBE();
}
public String[] getHosts() {
return hosts;
}
public TFileType getLocationType() {
return locationType;
}
@Override
public Object getInfo() {
return null;
@ -79,7 +83,8 @@ public class FileSplit implements Split {
public static final FileSplitCreator DEFAULT = new FileSplitCreator();
@Override
public Split create(Path path, long start, long length, long fileLength, long modificationTime, String[] hosts,
public Split create(LocationPath path, long start, long length, long fileLength,
long modificationTime, String[] hosts,
List<String> partitionValues) {
return new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues);
}

View File

@ -17,13 +17,12 @@
package org.apache.doris.datasource;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.spi.Split;
import org.apache.hadoop.fs.Path;
import java.util.List;
public interface SplitCreator {
Split create(Path path, long start, long length, long fileLength,
Split create(LocationPath path, long start, long length, long fileLength,
long modificationTime, String[] hosts, List<String> partitionValues);
}

View File

@ -369,11 +369,7 @@ public class HiveMetaStoreCache {
for (RemoteFile remoteFile : remoteFiles) {
String srcPath = remoteFile.getPath().toString();
LocationPath locationPath = new LocationPath(srcPath, catalog.getProperties());
Path convertedPath = locationPath.toStorageLocation();
if (!convertedPath.toString().equals(srcPath)) {
remoteFile.setPath(convertedPath);
}
result.addFile(remoteFile);
result.addFile(remoteFile, locationPath);
}
} else if (status.getErrCode().equals(ErrCode.NOT_FOUND)) {
// User may manually remove partition under HDFS, in this case,
@ -813,14 +809,17 @@ public class HiveMetaStoreCache {
if (status.ok()) {
if (delta.isDeleteDelta()) {
List<String> deleteDeltaFileNames = remoteFiles.stream().map(f -> f.getName()).filter(
name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.collect(Collectors.toList());
deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames));
continue;
}
remoteFiles.stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(fileCacheValue::addFile);
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX)).forEach(file -> {
LocationPath path = new LocationPath(file.getPath().toString(),
catalog.getProperties());
fileCacheValue.addFile(file, path);
});
} else {
throw new RuntimeException(status.getErrMsg());
}
@ -837,8 +836,12 @@ public class HiveMetaStoreCache {
Status status = fs.listFiles(location, false, remoteFiles);
if (status.ok()) {
remoteFiles.stream().filter(
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(fileCacheValue::addFile);
f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(file -> {
LocationPath path = new LocationPath(file.getPath().toString(),
catalog.getProperties());
fileCacheValue.addFile(file, path);
});
} else {
throw new RuntimeException(status.getErrMsg());
}
@ -998,11 +1001,11 @@ public class HiveMetaStoreCache {
private AcidInfo acidInfo;
public void addFile(RemoteFile file) {
public void addFile(RemoteFile file, LocationPath locationPath) {
if (isFileVisible(file.getPath())) {
HiveFileStatus status = new HiveFileStatus();
status.setBlockLocations(file.getBlockLocations());
status.setPath(file.getPath());
status.setPath(locationPath);
status.length = file.getSize();
status.blockSize = file.getBlockSize();
status.modificationTime = file.getModificationTime();
@ -1014,7 +1017,6 @@ public class HiveMetaStoreCache {
return partitionValues == null ? 0 : partitionValues.size();
}
public AcidInfo getAcidInfo() {
return acidInfo;
}
@ -1062,7 +1064,7 @@ public class HiveMetaStoreCache {
@Data
public static class HiveFileStatus {
BlockLocation[] blockLocations;
Path path;
LocationPath path;
long length;
long blockSize;
long modificationTime;

View File

@ -27,10 +27,8 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.FileSplit;
@ -52,7 +50,6 @@ import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TFileType;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@ -343,7 +340,7 @@ public class HiveScanNode extends FileQueryScanNode {
allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(),
status.getBlockLocations(), status.getLength(), status.getModificationTime(),
status.isSplittable(), status.getPartitionValues(),
new HiveSplitCreator(status.getAcidInfo())));
new HiveSplitCreator(status.getAcidInfo())));
}
}
@ -409,21 +406,6 @@ public class HiveScanNode extends FileQueryScanNode {
return hmsTable;
}
@Override
protected TFileType getLocationType() throws UserException {
return getLocationType(hmsTable.getRemoteTable().getSd().getLocation());
}
@Override
protected TFileType getLocationType(String location) throws UserException {
String bindBrokerName = hmsTable.getCatalog().bindBrokerName();
if (bindBrokerName != null) {
return TFileType.FILE_BROKER;
}
return Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() ->
new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName()));
}
@Override
public TFileFormatType getFileFormatType() throws UserException {
TFileFormatType type = null;

View File

@ -17,18 +17,17 @@
package org.apache.doris.datasource.hive.source;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.SplitCreator;
import org.apache.doris.datasource.hive.AcidInfo;
import org.apache.doris.spi.Split;
import org.apache.hadoop.fs.Path;
import java.util.List;
public class HiveSplit extends FileSplit {
public HiveSplit(Path path, long start, long length, long fileLength,
private HiveSplit(LocationPath path, long start, long length, long fileLength,
long modificationTime, String[] hosts, List<String> partitionValues, AcidInfo acidInfo) {
super(path, start, length, fileLength, modificationTime, hosts, partitionValues);
this.acidInfo = acidInfo;
@ -53,12 +52,9 @@ public class HiveSplit extends FileSplit {
this.acidInfo = acidInfo;
}
public HiveSplitCreator() {
this(null);
}
@Override
public Split create(Path path, long start, long length, long fileLength, long modificationTime, String[] hosts,
public Split create(LocationPath path, long start, long length, long fileLength,
long modificationTime, String[] hosts,
List<String> partitionValues) {
return new HiveSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues, acidInfo);
}

View File

@ -17,6 +17,7 @@
package org.apache.doris.datasource.hudi.source;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.spi.Split;
@ -210,14 +211,16 @@ public class COWIncrementalRelation implements IncrementalRelation {
: Collections.emptyList();
for (String baseFile : filteredMetaBootstrapFullPaths) {
HoodieWriteStat stat = fileToWriteStat.get(baseFile);
splits.add(new FileSplit(new Path(baseFile), 0, stat.getFileSizeInBytes(), stat.getFileSizeInBytes(),
new String[0],
splits.add(new FileSplit(new LocationPath(baseFile, optParams), 0,
stat.getFileSizeInBytes(), stat.getFileSizeInBytes(),
0, new String[0],
HudiPartitionProcessor.parsePartitionValues(partitionNames, stat.getPartitionPath())));
}
for (String baseFile : filteredRegularFullPaths) {
HoodieWriteStat stat = fileToWriteStat.get(baseFile);
splits.add(new FileSplit(new Path(baseFile), 0, stat.getFileSizeInBytes(), stat.getFileSizeInBytes(),
new String[0],
splits.add(new FileSplit(new LocationPath(baseFile, optParams), 0,
stat.getFileSizeInBytes(), stat.getFileSizeInBytes(),
0, new String[0],
HudiPartitionProcessor.parsePartitionValues(partitionNames, stat.getPartitionPath())));
}
return splits;

View File

@ -240,7 +240,7 @@ public class HudiScanNode extends HiveScanNode {
}
}
public void setHudiParams(TFileRangeDesc rangeDesc, HudiSplit hudiSplit) {
private void setHudiParams(TFileRangeDesc rangeDesc, HudiSplit hudiSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(hudiSplit.getTableFormatType().value());
THudiFileDesc fileDesc = new THudiFileDesc();
@ -351,8 +351,7 @@ public class HudiScanNode extends HiveScanNode {
long fileSize = baseFile.getFileSize();
// Need add hdfs host to location
LocationPath locationPath = new LocationPath(filePath, hmsTable.getCatalogProperties());
Path splitFilePath = locationPath.toStorageLocation();
splits.add(new FileSplit(splitFilePath, 0, fileSize, fileSize,
splits.add(new FileSplit(locationPath, 0, fileSize, fileSize, 0,
new String[0], partition.getPartitionValues()));
});
} else {
@ -362,7 +361,7 @@ public class HudiScanNode extends HiveScanNode {
}
}
private void getPartitionSplits(List<HivePartition> partitions, List<Split> splits) {
private void getPartitionsSplits(List<HivePartition> partitions, List<Split> splits) {
Executor executor = Env.getCurrentEnv().getExtMetaCacheMgr().getFileListingExecutor();
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
AtomicReference<Throwable> throwable = new AtomicReference<>();
@ -397,7 +396,7 @@ public class HudiScanNode extends HiveScanNode {
partitionInit = true;
}
List<Split> splits = Collections.synchronizedList(new ArrayList<>());
getPartitionSplits(prunedPartitions, splits);
getPartitionsSplits(prunedPartitions, splits);
return splits;
}
@ -482,8 +481,8 @@ public class HudiScanNode extends HiveScanNode {
// no base file, use log file to parse file type
String agencyPath = filePath.isEmpty() ? logs.get(0) : filePath;
HudiSplit split = new HudiSplit(new Path(agencyPath), 0, fileSize, fileSize,
new String[0], partitionValues);
HudiSplit split = new HudiSplit(new LocationPath(agencyPath, hmsTable.getCatalogProperties()),
0, fileSize, fileSize, new String[0], partitionValues);
split.setTableFormatType(TableFormatType.HUDI);
split.setDataFilePath(filePath);
split.setHudiDeltaLogs(logs);

View File

@ -17,18 +17,18 @@
package org.apache.doris.datasource.hudi.source;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileSplit;
import lombok.Data;
import org.apache.hadoop.fs.Path;
import java.util.List;
@Data
public class HudiSplit extends FileSplit {
public HudiSplit(Path file, long start, long length, long fileLength, String[] hosts,
public HudiSplit(LocationPath file, long start, long length, long fileLength, String[] hosts,
List<String> partitionValues) {
super(file, start, length, fileLength, hosts, partitionValues);
super(file, start, length, fileLength, 0, hosts, partitionValues);
}
private String instantTime;

View File

@ -22,7 +22,6 @@ import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.TableSnapshot;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
@ -42,7 +41,6 @@ import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TIcebergDeleteFileDesc;
import org.apache.doris.thrift.TIcebergFileDesc;
import org.apache.doris.thrift.TPlanNode;
@ -51,7 +49,6 @@ import org.apache.doris.thrift.TTableFormatFileDesc;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DeleteFile;
@ -133,7 +130,7 @@ public class IcebergScanNode extends FileQueryScanNode {
}
}
public void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) {
private void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
TIcebergFileDesc fileDesc = new TIcebergFileDesc();
@ -147,8 +144,7 @@ public class IcebergScanNode extends FileQueryScanNode {
TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc();
String deleteFilePath = filter.getDeleteFilePath();
LocationPath locationPath = new LocationPath(deleteFilePath, icebergSplit.getConfig());
Path splitDeletePath = locationPath.toStorageLocation();
deleteFileDesc.setPath(splitDeletePath.toString());
deleteFileDesc.setPath(locationPath.toStorageLocation().toString());
if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
IcebergDeleteFileFilter.PositionDelete positionDelete =
(IcebergDeleteFileFilter.PositionDelete) filter;
@ -211,8 +207,6 @@ public class IcebergScanNode extends FileQueryScanNode {
try (CloseableIterable<CombinedScanTask> combinedScanTasks =
TableScanUtil.planTasks(fileScanTasks, fileSplitSize, 1, 0)) {
combinedScanTasks.forEach(taskGrp -> taskGrp.files().forEach(splitTask -> {
String dataFilePath = normalizeLocation(splitTask.file().path().toString());
List<String> partitionValues = new ArrayList<>();
if (isPartitionedTable) {
StructLike structLike = splitTask.file().partition();
@ -238,10 +232,10 @@ public class IcebergScanNode extends FileQueryScanNode {
// Counts the number of partitions read
partitionPathSet.add(structLike.toString());
}
LocationPath locationPath = new LocationPath(dataFilePath, source.getCatalog().getProperties());
Path finalDataFilePath = locationPath.toStorageLocation();
String originalPath = splitTask.file().path().toString();
LocationPath locationPath = new LocationPath(originalPath, source.getCatalog().getProperties());
IcebergSplit split = new IcebergSplit(
finalDataFilePath,
locationPath,
splitTask.start(),
splitTask.length(),
splitTask.file().fileSizeInBytes(),
@ -249,7 +243,7 @@ public class IcebergScanNode extends FileQueryScanNode {
formatVersion,
source.getCatalog().getProperties(),
partitionValues,
splitTask.file().path().toString());
originalPath);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
}
@ -311,36 +305,6 @@ public class IcebergScanNode extends FileQueryScanNode {
return filters;
}
@Override
public TFileType getLocationType() throws UserException {
String location = icebergTable.location();
return getLocationType(location);
}
@Override
public TFileType getLocationType(String location) throws UserException {
final String fLocation = normalizeLocation(location);
return Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() ->
new DdlException("Unknown file location " + fLocation + " for iceberg table " + icebergTable.name()));
}
private String normalizeLocation(String location) {
Map<String, String> props = source.getCatalog().getProperties();
LocationPath locationPath = new LocationPath(location, props);
String icebergCatalogType = props.get(IcebergExternalCatalog.ICEBERG_CATALOG_TYPE);
if ("hadoop".equalsIgnoreCase(icebergCatalogType)) {
// if no scheme info, fill will HADOOP_FS_NAME
// if no HADOOP_FS_NAME, then should be local file system
if (locationPath.getLocationType() == LocationPath.LocationType.NOSCHEME) {
String fsName = props.get(HdfsResource.HADOOP_FS_NAME);
if (fsName != null) {
location = fsName + location;
}
}
}
return location;
}
@Override
public TFileFormatType getFileFormatType() throws UserException {
TFileFormatType type;

View File

@ -17,10 +17,10 @@
package org.apache.doris.datasource.iceberg.source;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileSplit;
import lombok.Data;
import org.apache.hadoop.fs.Path;
import java.util.List;
import java.util.Map;
@ -28,21 +28,23 @@ import java.util.Map;
@Data
public class IcebergSplit extends FileSplit {
// Doris will convert the schema in FileSystem to achieve the function of natively reading files.
// For example, s3a:// will be converted to s3://.
// The position delete file of iceberg will record the full path of the datafile, which includes the schema.
// When comparing datafile with position delete, the converted path cannot be used,
// but the original datafile path must be used.
private final String originalPath;
private Integer formatVersion;
private List<IcebergDeleteFileFilter> deleteFileFilters;
private Map<String, String> config;
// File path will be changed if the file is modified, so there's no need to get modification time.
public IcebergSplit(Path file, long start, long length, long fileLength, String[] hosts,
public IcebergSplit(LocationPath file, long start, long length, long fileLength, String[] hosts,
Integer formatVersion, Map<String, String> config,
List<String> partitionList, String originalPath) {
super(file, start, length, fileLength, hosts, partitionList);
super(file, start, length, fileLength, 0, hosts, partitionList);
this.formatVersion = formatVersion;
this.config = config;
this.originalPath = originalPath;
}
private Integer formatVersion;
private List<IcebergDeleteFileFilter> deleteFileFilters;
private Map<String, String> config;
}

View File

@ -23,11 +23,10 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.TableFormatType;
import org.apache.doris.datasource.TablePartitionValues;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
@ -35,13 +34,12 @@ import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TMaxComputeFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
import com.aliyun.odps.Table;
import com.aliyun.odps.tunnel.TunnelException;
import org.apache.hadoop.fs.Path;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collection;
@ -53,8 +51,8 @@ import java.util.Map;
public class MaxComputeScanNode extends FileQueryScanNode {
private final MaxComputeExternalTable table;
private final MaxComputeExternalCatalog catalog;
public static final int MIN_SPLIT_SIZE = 4096;
private static final int MIN_SPLIT_SIZE = 4096;
private static final LocationPath VIRTUAL_SLICE_PART = new LocationPath("/virtual_slice_part", Maps.newHashMap());
public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE, needCheckColumnPriv);
@ -64,7 +62,6 @@ public class MaxComputeScanNode extends FileQueryScanNode {
StatisticalType statisticalType, boolean needCheckColumnPriv) {
super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
table = (MaxComputeExternalTable) desc.getTable();
catalog = (MaxComputeExternalCatalog) table.getCatalog();
}
@Override
@ -74,7 +71,7 @@ public class MaxComputeScanNode extends FileQueryScanNode {
}
}
public void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeSplit) {
private void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(TableFormatType.MAX_COMPUTE.value());
TMaxComputeFileDesc fileDesc = new TMaxComputeFileDesc();
@ -85,16 +82,6 @@ public class MaxComputeScanNode extends FileQueryScanNode {
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
@Override
protected TFileType getLocationType() throws UserException {
return getLocationType(null);
}
@Override
protected TFileType getLocationType(String location) throws UserException {
return TFileType.FILE_NET;
}
@Override
public TFileFormatType getFileFormatType() {
return TFileFormatType.FORMAT_JNI;
@ -144,10 +131,8 @@ public class MaxComputeScanNode extends FileQueryScanNode {
private static void addPartitionSplits(List<Split> result, Table odpsTable, String partitionSpec) {
long modificationTime = odpsTable.getLastDataModifiedTime().getTime();
// use '-1' to read whole partition, avoid expending too much time on calling table.getTotalRows()
Pair<Long, Long> range = Pair.of(0L, -1L);
FileSplit rangeSplit = new FileSplit(new Path("/virtual_slice_part"),
range.first, range.second, -1, modificationTime, null, Collections.emptyList());
result.add(new MaxComputeSplit(partitionSpec, rangeSplit));
result.add(new MaxComputeSplit(VIRTUAL_SLICE_PART,
0, -1L, -1, modificationTime, null, Collections.emptyList(), null));
}
private static void addBatchSplits(List<Split> result, Table odpsTable, long totalRows) {
@ -171,9 +156,8 @@ public class MaxComputeScanNode extends FileQueryScanNode {
if (!sliceRange.isEmpty()) {
for (int i = 0; i < sliceRange.size(); i++) {
Pair<Long, Long> range = sliceRange.get(i);
FileSplit rangeSplit = new FileSplit(new Path("/virtual_slice_" + i),
range.first, range.second, totalRows, modificationTime, null, Collections.emptyList());
result.add(new MaxComputeSplit(rangeSplit));
result.add(new MaxComputeSplit(new LocationPath("/virtual_slice_" + i, Maps.newHashMap()),
range.first, range.second, totalRows, modificationTime, null, Collections.emptyList(), null));
}
}
}

View File

@ -17,23 +17,22 @@
package org.apache.doris.datasource.maxcompute.source;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.thrift.TFileType;
import java.util.List;
import java.util.Optional;
public class MaxComputeSplit extends FileSplit {
private final Optional<String> partitionSpec;
public MaxComputeSplit(FileSplit rangeSplit) {
super(rangeSplit.path, rangeSplit.start, rangeSplit.length, rangeSplit.fileLength,
rangeSplit.hosts, rangeSplit.partitionValues);
this.partitionSpec = Optional.empty();
}
public MaxComputeSplit(String partitionSpec, FileSplit rangeSplit) {
super(rangeSplit.path, rangeSplit.start, rangeSplit.length, rangeSplit.fileLength,
rangeSplit.hosts, rangeSplit.partitionValues);
this.partitionSpec = Optional.of(partitionSpec);
public MaxComputeSplit(LocationPath path, long start, long length, long fileLength,
long modificationTime, String[] hosts, List<String> partitionValues, String partitionSpec) {
super(path, start, length, fileLength, modificationTime, hosts, partitionValues);
this.partitionSpec = Optional.ofNullable(partitionSpec);
// MC always use FILE_NET type
this.locationType = TFileType.FILE_NET;
}
public Optional<String> getPartitionSpec() {

View File

@ -35,19 +35,16 @@ import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TPaimonDeletionFileDesc;
import org.apache.doris.thrift.TPaimonFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.apache.hadoop.fs.Path;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.RawFile;
@ -147,7 +144,7 @@ public class PaimonScanNode extends FileQueryScanNode {
}
}
public void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) {
private void setPaimonParams(TFileRangeDesc rangeDesc, PaimonSplit paimonSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(paimonSplit.getTableFormatType().value());
TPaimonFileDesc fileDesc = new TPaimonFileDesc();
@ -214,10 +211,9 @@ public class PaimonScanNode extends FileQueryScanNode {
DeletionFile deletionFile = deletionFiles.get(i);
LocationPath locationPath = new LocationPath(file.path(),
source.getCatalog().getProperties());
Path finalDataFilePath = locationPath.toStorageLocation();
try {
List<Split> dorisSplits = splitFile(
finalDataFilePath,
locationPath,
0,
null,
file.length(),
@ -242,11 +238,10 @@ public class PaimonScanNode extends FileQueryScanNode {
for (RawFile file : rawFiles) {
LocationPath locationPath = new LocationPath(file.path(),
source.getCatalog().getProperties());
Path finalDataFilePath = locationPath.toStorageLocation();
try {
splits.addAll(
splitFile(
finalDataFilePath,
locationPath,
0,
null,
file.length(),
@ -286,17 +281,6 @@ public class PaimonScanNode extends FileQueryScanNode {
}
}
@Override
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
return getLocationType(((FileStoreTable) source.getPaimonTable()).location().toString());
}
@Override
public TFileType getLocationType(String location) throws DdlException, MetaNotFoundException {
return Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() ->
new DdlException("Unknown file location " + location + " for paimon table "));
}
@Override
public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException {
return TFileFormatType.FORMAT_JNI;

View File

@ -33,7 +33,6 @@ import java.util.Map;
public class PaimonSource {
private final PaimonExternalTable paimonExtTable;
private final Table originTable;
private final TupleDescriptor desc;
public PaimonSource(PaimonExternalTable table, TupleDescriptor desc,

View File

@ -17,11 +17,12 @@
package org.apache.doris.datasource.paimon.source;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.datasource.FileSplit;
import org.apache.doris.datasource.SplitCreator;
import org.apache.doris.datasource.TableFormatType;
import org.apache.hadoop.fs.Path;
import com.google.common.collect.Maps;
import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.table.source.Split;
@ -29,20 +30,21 @@ import java.util.List;
import java.util.Optional;
public class PaimonSplit extends FileSplit {
private static final LocationPath DUMMY_PATH = new LocationPath("/dummyPath", Maps.newHashMap());
private Split split;
private TableFormatType tableFormatType;
private Optional<DeletionFile> optDeletionFile;
public PaimonSplit(Split split) {
super(new Path("hdfs://dummyPath"), 0, 0, 0, null, null);
super(DUMMY_PATH, 0, 0, 0, 0, null, null);
this.split = split;
this.tableFormatType = TableFormatType.PAIMON;
this.optDeletionFile = Optional.empty();
}
public PaimonSplit(Path file, long start, long length, long fileLength, String[] hosts,
List<String> partitionList) {
super(file, start, length, fileLength, hosts, partitionList);
private PaimonSplit(LocationPath file, long start, long length, long fileLength, long modificationTime,
String[] hosts, List<String> partitionList) {
super(file, start, length, fileLength, modificationTime, hosts, partitionList);
this.tableFormatType = TableFormatType.PAIMON;
this.optDeletionFile = Optional.empty();
}
@ -51,10 +53,6 @@ public class PaimonSplit extends FileSplit {
return split;
}
public void setSplit(Split split) {
this.split = split;
}
public TableFormatType getTableFormatType() {
return tableFormatType;
}
@ -76,14 +74,14 @@ public class PaimonSplit extends FileSplit {
static final PaimonSplitCreator DEFAULT = new PaimonSplitCreator();
@Override
public org.apache.doris.spi.Split create(Path path,
public org.apache.doris.spi.Split create(LocationPath path,
long start,
long length,
long fileLength,
long modificationTime,
String[] hosts,
List<String> partitionValues) {
return new PaimonSplit(path, start, length, fileLength, hosts, partitionValues);
return new PaimonSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues);
}
}
}

View File

@ -24,6 +24,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LocationPath;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.FileQueryScanNode;
import org.apache.doris.datasource.FileSplit;
@ -41,7 +42,7 @@ import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.Path;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -101,21 +102,16 @@ public class TVFScanNode extends FileQueryScanNode {
@Override
protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException {
TFileCompressType fileCompressType = tableValuedFunction.getTFileCompressType();
return Util.getOrInferCompressType(fileCompressType, fileSplit.getPath().toString());
return Util.getOrInferCompressType(fileCompressType, fileSplit.getPathString());
}
@Override
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
return getLocationType(null);
protected boolean isFileStreamType() {
return tableValuedFunction.getTFileType() == TFileType.FILE_STREAM;
}
@Override
public TFileType getLocationType(String location) throws DdlException, MetaNotFoundException {
return tableValuedFunction.getTFileType();
}
@Override
public Map<String, String> getLocationProperties() throws MetaNotFoundException, DdlException {
public Map<String, String> getLocationProperties() {
return tableValuedFunction.getLocationProperties();
}
@ -137,13 +133,14 @@ public class TVFScanNode extends FileQueryScanNode {
}
List<TBrokerFileStatus> fileStatuses = tableValuedFunction.getFileStatuses();
for (TBrokerFileStatus fileStatus : fileStatuses) {
Path path = new Path(fileStatus.getPath());
Map<String, String> prop = Maps.newHashMap();
try {
splits.addAll(splitFile(path, fileStatus.getBlockSize(), null, fileStatus.getSize(),
splits.addAll(splitFile(new LocationPath(fileStatus.getPath(), prop), fileStatus.getBlockSize(),
null, fileStatus.getSize(),
fileStatus.getModificationTime(), fileStatus.isSplitable, null,
FileSplitCreator.DEFAULT));
} catch (IOException e) {
LOG.warn("get file split failed for TVF: {}", path, e);
LOG.warn("get file split failed for TVF: {}", fileStatus.getPath(), e);
throw new UserException(e);
}
}

View File

@ -118,7 +118,7 @@ public class HiveTableSink extends BaseExternalTableDataSink {
THiveLocationParams locationParams = new THiveLocationParams();
LocationPath locationPath = new LocationPath(sd.getLocation(), targetTable.getHadoopProperties());
String location = locationPath.toString();
String location = locationPath.getPath().toString();
String storageLocation = locationPath.toStorageLocation().toString();
TFileType fileType = locationPath.getTFileTypeForBE();
if (fileType == TFileType.FILE_S3) {

View File

@ -133,7 +133,7 @@ public class IcebergTableSink extends BaseExternalTableDataSink {
// location
LocationPath locationPath = new LocationPath(IcebergUtils.dataLocation(icebergTable), catalogProps);
tSink.setOutputPath(locationPath.toStorageLocation().toString());
tSink.setOriginalOutputPath(locationPath.toString());
tSink.setOriginalOutputPath(locationPath.getPath().toString());
tSink.setFileType(locationPath.getTFileTypeForBE());
if (insertCtx.isPresent()) {

View File

@ -71,7 +71,6 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTextSerdeType;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
@ -97,23 +96,6 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
public static final String PROP_TABLE_ID = "table_id";
protected static final ImmutableSet<String> FILE_FORMAT_PROPERTIES = new ImmutableSet.Builder<String>()
.add(FileFormatConstants.PROP_FORMAT)
.add(FileFormatConstants.PROP_JSON_ROOT)
.add(FileFormatConstants.PROP_JSON_PATHS)
.add(FileFormatConstants.PROP_STRIP_OUTER_ARRAY)
.add(FileFormatConstants.PROP_READ_JSON_BY_LINE)
.add(FileFormatConstants.PROP_NUM_AS_STRING)
.add(FileFormatConstants.PROP_FUZZY_PARSE)
.add(FileFormatConstants.PROP_COLUMN_SEPARATOR)
.add(FileFormatConstants.PROP_LINE_DELIMITER)
.add(FileFormatConstants.PROP_TRIM_DOUBLE_QUOTES)
.add(FileFormatConstants.PROP_SKIP_LINES)
.add(FileFormatConstants.PROP_CSV_SCHEMA)
.add(FileFormatConstants.PROP_COMPRESS_TYPE)
.add(FileFormatConstants.PROP_PATH_PARTITION_KEYS)
.build();
// Columns got from file and path(if has)
protected List<Column> columns = null;
// User specified csv columns, it will override columns got from file