[Fix](multi-catalog) Fix read error in mixed partition locations. (#21399)
Issue Number: close #20948 Fix read error in mixed partition locations(for example, some partitions locations are on s3, other are on hdfs) by `getLocationType` of file split level instead of the table level.
This commit is contained in:
@ -230,51 +230,42 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
if (inputSplits.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
FileSplit inputSplit = (FileSplit) inputSplits.get(0);
|
||||
TFileType locationType = getLocationType();
|
||||
params.setFileType(locationType);
|
||||
TFileFormatType fileFormatType = getFileFormatType();
|
||||
params.setFormatType(fileFormatType);
|
||||
TFileCompressType fileCompressType = getFileCompressType(inputSplit);
|
||||
params.setCompressType(fileCompressType);
|
||||
boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON;
|
||||
if (isCsvOrJson) {
|
||||
params.setFileAttributes(getFileAttributes());
|
||||
}
|
||||
|
||||
// set hdfs params for hdfs file type.
|
||||
Map<String, String> locationProperties = getLocationProperties();
|
||||
if (fileFormatType == TFileFormatType.FORMAT_JNI || locationType == TFileType.FILE_S3) {
|
||||
params.setProperties(locationProperties);
|
||||
}
|
||||
if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
|
||||
String fsName = getFsName(inputSplit);
|
||||
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
|
||||
tHdfsParams.setFsName(fsName);
|
||||
params.setHdfsParams(tHdfsParams);
|
||||
|
||||
if (locationType == TFileType.FILE_BROKER) {
|
||||
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
|
||||
if (broker == null) {
|
||||
throw new UserException("No alive broker.");
|
||||
}
|
||||
params.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port));
|
||||
}
|
||||
}
|
||||
|
||||
List<String> pathPartitionKeys = getPathPartitionKeys();
|
||||
for (Split split : inputSplits) {
|
||||
TFileScanRangeParams scanRangeParams = new TFileScanRangeParams(params);
|
||||
FileSplit fileSplit = (FileSplit) split;
|
||||
|
||||
TFileScanRangeParams scanRangeParams;
|
||||
if (!isCsvOrJson) {
|
||||
scanRangeParams = params;
|
||||
} else {
|
||||
// If fileFormatType is csv/json format, uncompressed files may be coexists with compressed files
|
||||
// So we need set compressType separately
|
||||
scanRangeParams = new TFileScanRangeParams(params);
|
||||
scanRangeParams.setCompressType(getFileCompressType(fileSplit));
|
||||
TFileType locationType = getLocationType(fileSplit.getPath().toString());
|
||||
scanRangeParams.setFileType(locationType);
|
||||
TFileCompressType fileCompressType = getFileCompressType(fileSplit);
|
||||
scanRangeParams.setCompressType(fileCompressType);
|
||||
boolean isCsvOrJson = Util.isCsvFormat(fileFormatType) || fileFormatType == TFileFormatType.FORMAT_JSON;
|
||||
if (isCsvOrJson) {
|
||||
scanRangeParams.setFileAttributes(getFileAttributes());
|
||||
}
|
||||
|
||||
// set hdfs params for hdfs file type.
|
||||
Map<String, String> locationProperties = getLocationProperties();
|
||||
if (fileFormatType == TFileFormatType.FORMAT_JNI) {
|
||||
scanRangeParams.setProperties(locationProperties);
|
||||
} else if (locationType == TFileType.FILE_HDFS || locationType == TFileType.FILE_BROKER) {
|
||||
String fsName = getFsName(fileSplit);
|
||||
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
|
||||
tHdfsParams.setFsName(fsName);
|
||||
scanRangeParams.setHdfsParams(tHdfsParams);
|
||||
|
||||
if (locationType == TFileType.FILE_BROKER) {
|
||||
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
|
||||
if (broker == null) {
|
||||
throw new UserException("No alive broker.");
|
||||
}
|
||||
scanRangeParams.addToBrokerAddresses(new TNetworkAddress(broker.host, broker.port));
|
||||
}
|
||||
} else if (locationType == TFileType.FILE_S3) {
|
||||
scanRangeParams.setProperties(locationProperties);
|
||||
}
|
||||
|
||||
TScanRangeLocations curLocations = newLocations(scanRangeParams);
|
||||
|
||||
// If fileSplit has partition values, use the values collected from hive partitions.
|
||||
@ -288,7 +279,8 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false, isACID)
|
||||
: fileSplit.getPartitionValues();
|
||||
|
||||
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
|
||||
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys,
|
||||
locationType);
|
||||
if (isACID) {
|
||||
HiveSplit hiveSplit = (HiveSplit) split;
|
||||
hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
|
||||
@ -354,7 +346,7 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
}
|
||||
|
||||
private TFileRangeDesc createFileRangeDesc(FileSplit fileSplit, List<String> columnsFromPath,
|
||||
List<String> columnsFromPathKeys)
|
||||
List<String> columnsFromPathKeys, TFileType locationType)
|
||||
throws UserException {
|
||||
TFileRangeDesc rangeDesc = new TFileRangeDesc();
|
||||
rangeDesc.setStartOffset(fileSplit.getStart());
|
||||
@ -365,11 +357,11 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
rangeDesc.setColumnsFromPath(columnsFromPath);
|
||||
rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
|
||||
|
||||
if (getLocationType() == TFileType.FILE_HDFS) {
|
||||
if (locationType == TFileType.FILE_HDFS) {
|
||||
rangeDesc.setPath(fileSplit.getPath().toUri().getPath());
|
||||
} else if (getLocationType() == TFileType.FILE_S3
|
||||
|| getLocationType() == TFileType.FILE_BROKER
|
||||
|| getLocationType() == TFileType.FILE_NET) {
|
||||
} else if (locationType == TFileType.FILE_S3
|
||||
|| locationType == TFileType.FILE_BROKER
|
||||
|| locationType == TFileType.FILE_NET) {
|
||||
// need full path
|
||||
rangeDesc.setPath(fileSplit.getPath().toString());
|
||||
}
|
||||
@ -379,6 +371,8 @@ public abstract class FileQueryScanNode extends FileScanNode {
|
||||
|
||||
protected abstract TFileType getLocationType() throws UserException;
|
||||
|
||||
protected abstract TFileType getLocationType(String location) throws UserException;
|
||||
|
||||
protected abstract TFileFormatType getFileFormatType() throws UserException;
|
||||
|
||||
protected TFileCompressType getFileCompressType(FileSplit fileSplit) throws UserException {
|
||||
|
||||
@ -230,7 +230,11 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
|
||||
@Override
|
||||
protected TFileType getLocationType() throws UserException {
|
||||
String location = hmsTable.getRemoteTable().getSd().getLocation();
|
||||
return getLocationType(hmsTable.getRemoteTable().getSd().getLocation());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TFileType getLocationType(String location) throws UserException {
|
||||
return getTFileType(location).orElseThrow(() ->
|
||||
new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName()));
|
||||
}
|
||||
|
||||
@ -53,6 +53,11 @@ public class MaxComputeScanNode extends FileQueryScanNode {
|
||||
|
||||
@Override
|
||||
protected TFileType getLocationType() throws UserException {
|
||||
return getLocationType(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TFileType getLocationType(String location) throws UserException {
|
||||
return TFileType.FILE_NET;
|
||||
}
|
||||
|
||||
|
||||
@ -84,6 +84,11 @@ public class TVFScanNode extends FileQueryScanNode {
|
||||
|
||||
@Override
|
||||
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
|
||||
return getLocationType(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileType getLocationType(String location) throws DdlException, MetaNotFoundException {
|
||||
return tableValuedFunction.getTFileType();
|
||||
}
|
||||
|
||||
|
||||
@ -244,6 +244,12 @@ public class IcebergScanNode extends FileQueryScanNode {
|
||||
public TFileType getLocationType() throws UserException {
|
||||
Table icebergTable = source.getIcebergTable();
|
||||
String location = icebergTable.location();
|
||||
return getLocationType(location);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileType getLocationType(String location) throws UserException {
|
||||
Table icebergTable = source.getIcebergTable();
|
||||
return getTFileType(location).orElseThrow(() ->
|
||||
new DdlException("Unknown file location " + location + " for iceberg table " + icebergTable.name()));
|
||||
}
|
||||
|
||||
@ -141,7 +141,11 @@ public class PaimonScanNode extends FileQueryScanNode {
|
||||
|
||||
@Override
|
||||
public TFileType getLocationType() throws DdlException, MetaNotFoundException {
|
||||
String location = ((AbstractFileStoreTable) source.getPaimonTable()).location().toString();
|
||||
return getLocationType(((AbstractFileStoreTable) source.getPaimonTable()).location().toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TFileType getLocationType(String location) throws DdlException, MetaNotFoundException {
|
||||
if (location != null && !location.isEmpty()) {
|
||||
if (S3Util.isObjStorage(location)) {
|
||||
return TFileType.FILE_S3;
|
||||
|
||||
Reference in New Issue
Block a user