[fix](s3) fix invalid s3 properties checking logic (#35757)
Introduced from #35747 pick part of #35762
This commit is contained in:
@ -23,7 +23,6 @@ import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.proc.BaseProcResult;
|
||||
import org.apache.doris.common.util.PrintableMap;
|
||||
import org.apache.doris.datasource.credentials.CloudCredentialWithEndpoint;
|
||||
import org.apache.doris.datasource.property.PropertyConverter;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
import org.apache.doris.fs.remote.S3FileSystem;
|
||||
|
||||
@ -121,15 +120,6 @@ public class S3Resource extends Resource {
|
||||
private static void pingS3(CloudCredentialWithEndpoint credential, String bucketName, String rootPath,
|
||||
Map<String, String> properties) throws DdlException {
|
||||
String bucket = "s3://" + bucketName + "/";
|
||||
Map<String, String> propertiesPing = new HashMap<>();
|
||||
propertiesPing.put(S3Properties.Env.ACCESS_KEY, credential.getAccessKey());
|
||||
propertiesPing.put(S3Properties.Env.SECRET_KEY, credential.getSecretKey());
|
||||
propertiesPing.put(S3Properties.Env.TOKEN, credential.getSessionToken());
|
||||
propertiesPing.put(S3Properties.Env.ENDPOINT, credential.getEndpoint());
|
||||
propertiesPing.put(S3Properties.Env.REGION, credential.getRegion());
|
||||
propertiesPing.put(PropertyConverter.USE_PATH_STYLE,
|
||||
properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false"));
|
||||
properties.putAll(propertiesPing);
|
||||
S3FileSystem fileSystem = new S3FileSystem(properties);
|
||||
String testFile = bucket + rootPath + "/test-object-valid.txt";
|
||||
String content = "doris will be better";
|
||||
@ -142,14 +132,14 @@ public class S3Resource extends Resource {
|
||||
if (status != Status.OK) {
|
||||
throw new DdlException(
|
||||
"ping s3 failed(upload), status: " + status + ", properties: " + new PrintableMap<>(
|
||||
propertiesPing, "=", true, false, true, false));
|
||||
properties, "=", true, false, true, false));
|
||||
}
|
||||
} finally {
|
||||
if (status.ok()) {
|
||||
Status delete = fileSystem.delete(testFile);
|
||||
if (delete != Status.OK) {
|
||||
LOG.warn("delete test file failed, status: {}, properties: {}", delete, new PrintableMap<>(
|
||||
propertiesPing, "=", true, false, true, false));
|
||||
properties, "=", true, false, true, false));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -250,3 +240,4 @@ public class S3Resource extends Resource {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -60,7 +60,10 @@ public class S3FileSystem extends ObjFileSystem {
|
||||
if (dfsFileSystem == null) {
|
||||
Configuration conf = new Configuration();
|
||||
System.setProperty("com.amazonaws.services.s3.enableV4", "true");
|
||||
PropertyConverter.convertToHadoopFSProperties(properties).forEach(conf::set);
|
||||
// the entry value in properties may be null, and
|
||||
PropertyConverter.convertToHadoopFSProperties(properties).entrySet().stream()
|
||||
.filter(entry -> entry.getKey() != null && entry.getValue() != null)
|
||||
.forEach(entry -> conf.set(entry.getKey(), entry.getValue()));
|
||||
try {
|
||||
dfsFileSystem = FileSystem.get(new Path(remotePath).toUri(), conf);
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -45,8 +45,7 @@ public class Hdfs extends TableValuedFunction {
|
||||
Map<String, String> arguments = getTVFProperties().getMap();
|
||||
return new HdfsTableValuedFunction(arguments);
|
||||
} catch (Throwable t) {
|
||||
throw new AnalysisException("Can not build HdfsTableValuedFunction by "
|
||||
+ this + ": " + t.getMessage(), t);
|
||||
throw new AnalysisException("Can not build hdfs(): " + t.getMessage(), t);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -46,8 +46,7 @@ public class Local extends TableValuedFunction {
|
||||
Map<String, String> arguments = getTVFProperties().getMap();
|
||||
return new LocalTableValuedFunction(arguments);
|
||||
} catch (Throwable t) {
|
||||
throw new AnalysisException("Can not build LocalTableValuedFunction by "
|
||||
+ this + ": " + t.getMessage(), t);
|
||||
throw new AnalysisException("Can not build local(): " + t.getMessage(), t);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -44,8 +44,7 @@ public class S3 extends TableValuedFunction {
|
||||
Map<String, String> arguments = getTVFProperties().getMap();
|
||||
return new S3TableValuedFunction(arguments);
|
||||
} catch (Throwable t) {
|
||||
throw new AnalysisException("Can not build S3TableValuedFunction by "
|
||||
+ this + ": " + t.getMessage(), t);
|
||||
throw new AnalysisException("Can not build s3(): " + t.getMessage(), t);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -73,8 +73,10 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
|
||||
|
||||
S3URI s3uri = getS3Uri(uriStr, Boolean.parseBoolean(usePathStyle.toLowerCase()),
|
||||
Boolean.parseBoolean(forceParsingByStandardUri.toLowerCase()));
|
||||
String endpoint = getOrDefaultAndRemove(otherProps, S3Properties.ENDPOINT, s3uri.getEndpoint().orElseThrow(() ->
|
||||
new AnalysisException(String.format("Properties '%s' is required.", S3Properties.ENDPOINT))));
|
||||
String endpoint = getOrDefaultAndRemove(otherProps, S3Properties.ENDPOINT, s3uri.getEndpoint().orElse(""));
|
||||
if (Strings.isNullOrEmpty(endpoint)) {
|
||||
throw new AnalysisException(String.format("Properties '%s' is required.", S3Properties.ENDPOINT));
|
||||
}
|
||||
if (!otherProps.containsKey(S3Properties.REGION)) {
|
||||
String region = s3uri.getRegion().orElseThrow(() ->
|
||||
new AnalysisException(String.format("Properties '%s' is required.", S3Properties.REGION)));
|
||||
@ -151,3 +153,4 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
|
||||
return "S3TableValuedFunction";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user