[fix](cosn) use s3 client to read cosn on BE side (#30835)
This commit is contained in:
@ -297,10 +297,11 @@ public class LocationPath {
|
||||
|
||||
/**
|
||||
* provide file type for BE.
|
||||
*
|
||||
* @param location the location is from fs.listFile
|
||||
* @return on BE, we will use TFileType to get the suitable client to access storage.
|
||||
*/
|
||||
public static TFileType getTFileType(String location) {
|
||||
public static TFileType getTFileTypeForBE(String location) {
|
||||
if (location == null || location.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
@ -314,12 +315,13 @@ public class LocationPath {
|
||||
case OBS:
|
||||
case BOS:
|
||||
case GCS:
|
||||
// ATTN, for COSN, on FE side, use HadoopFS to access, but on BE, use S3 client to access.
|
||||
case COSN:
|
||||
// now we only support S3 client for object storage on BE
|
||||
return TFileType.FILE_S3;
|
||||
case HDFS:
|
||||
case OSS_HDFS: // if hdfs service is enabled on oss, use hdfs lib to access oss.
|
||||
case VIEWFS:
|
||||
case COSN:
|
||||
return TFileType.FILE_HDFS;
|
||||
case GFS:
|
||||
case JFS:
|
||||
@ -346,12 +348,12 @@ public class LocationPath {
|
||||
case OBS:
|
||||
case BOS:
|
||||
case GCS:
|
||||
case COSN:
|
||||
// All storage will use s3 client to access on BE, so need convert to s3
|
||||
return new Path(convertToS3(location));
|
||||
case HDFS:
|
||||
case OSS_HDFS:
|
||||
case VIEWFS:
|
||||
case COSN:
|
||||
case GFS:
|
||||
case JFS:
|
||||
case OFS:
|
||||
|
||||
@ -333,8 +333,8 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
if (bindBrokerName != null) {
|
||||
return TFileType.FILE_BROKER;
|
||||
}
|
||||
return Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() ->
|
||||
new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName()));
|
||||
return Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() ->
|
||||
new DdlException("Unknown file location " + location + " for hms table " + hmsTable.getName()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -346,7 +346,7 @@ public class IcebergScanNode extends FileQueryScanNode {
|
||||
@Override
|
||||
public TFileType getLocationType(String location) throws UserException {
|
||||
final String fLocation = normalizeLocation(location);
|
||||
return Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() ->
|
||||
return Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() ->
|
||||
new DdlException("Unknown file location " + fLocation + " for iceberg table " + icebergTable.name()));
|
||||
}
|
||||
|
||||
|
||||
@ -213,8 +213,8 @@ public class PaimonScanNode extends FileQueryScanNode {
|
||||
|
||||
@Override
|
||||
public TFileType getLocationType(String location) throws DdlException, MetaNotFoundException {
|
||||
return Optional.ofNullable(LocationPath.getTFileType(location)).orElseThrow(() ->
|
||||
new DdlException("Unknown file location " + location + " for paimon table "));
|
||||
return Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() ->
|
||||
new DdlException("Unknown file location " + location + " for paimon table "));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -131,8 +131,8 @@ public class LocationPathTest {
|
||||
Assertions.assertTrue(locationPath.get().startsWith("cosn://"));
|
||||
// BE
|
||||
beLocation = locationPath.toScanRangeLocation().toString();
|
||||
Assertions.assertTrue(beLocation.startsWith("cosn://"));
|
||||
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.OFS);
|
||||
Assertions.assertTrue(beLocation.startsWith("s3://"));
|
||||
Assertions.assertEquals(LocationPath.getFSIdentity(beLocation, null).first, FileSystemType.S3);
|
||||
|
||||
locationPath = new LocationPath("ofs://test.com", rangeProps);
|
||||
// FE
|
||||
|
||||
Reference in New Issue
Block a user