[refactor](resource) unified resource user interface (#14842)

At present, there are multiple user interface to access hdfs and s3.
Each interface has its own configuration and is different, which causes confusion for users.
Create resource already supports remote storage resources and resource permission management,
but only `spark`/`odbc_catalog` are in use.
Cloud  storage resources need to be created and managed uniformly through create resource.

This PR contains the following changes:

1. Add `s3`, `hdfs` and `hms` resource, and each resource contains it's own configuration items, and delete configuration items scattered in other classes.

2. Use `resource` to create `storage` tools, and use `storage` tools to access the remote file system.
This commit is contained in:
Ashin Gau
2022-12-08 20:37:10 +08:00
committed by GitHub
parent 244bf84483
commit e8becaa562
48 changed files with 995 additions and 940 deletions

View File

@ -19,7 +19,9 @@ package org.apache.doris.analysis;
import org.apache.doris.backup.HdfsStorage;
import org.apache.doris.backup.S3Storage;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.S3Resource;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
@ -28,7 +30,6 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.ParseUtil;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.Util;
@ -46,7 +47,6 @@ import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -630,16 +630,14 @@ public class OutFileClause {
}
Map<String, String> brokerProps = Maps.newHashMap();
Iterator<Map.Entry<String, String>> iter = properties.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, String> entry = iter.next();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(BROKER_PROP_PREFIX) && !entry.getKey().equals(PROP_BROKER_NAME)) {
brokerProps.put(entry.getKey().substring(BROKER_PROP_PREFIX.length()), entry.getValue());
processedPropKeys.add(entry.getKey());
} else if (entry.getKey().toUpperCase().startsWith(S3Storage.S3_PROPERTIES_PREFIX)) {
} else if (entry.getKey().toUpperCase().startsWith(S3Resource.S3_PROPERTIES_PREFIX)) {
brokerProps.put(entry.getKey(), entry.getValue());
processedPropKeys.add(entry.getKey());
} else if (entry.getKey().contains(BrokerUtil.HADOOP_FS_NAME)
} else if (entry.getKey().contains(HdfsResource.HADOOP_FS_NAME)
&& storageType == StorageBackend.StorageType.HDFS) {
brokerProps.put(entry.getKey(), entry.getValue());
processedPropKeys.add(entry.getKey());
@ -651,9 +649,9 @@ public class OutFileClause {
}
}
if (storageType == StorageBackend.StorageType.S3) {
S3Storage.checkS3(new CaseInsensitiveMap(brokerProps));
S3Storage.checkS3(brokerProps);
} else if (storageType == StorageBackend.StorageType.HDFS) {
HdfsStorage.checkHDFS(new CaseInsensitiveMap(brokerProps));
HdfsStorage.checkHDFS(brokerProps);
}
brokerDesc = new BrokerDesc(brokerName, storageType, brokerProps);

View File

@ -17,7 +17,7 @@
package org.apache.doris.analysis;
import org.apache.doris.backup.S3Storage;
import org.apache.doris.catalog.S3Resource;
import org.apache.doris.common.Config;
import org.apache.commons.collections.map.CaseInsensitiveMap;
@ -32,25 +32,10 @@ import java.util.Map;
public abstract class StorageDesc {
private static final Logger LOG = LoggerFactory.getLogger(StorageBackend.class);
// for dfs
public static final String HADOOP_JOB_UGI = "hadoop.job.ugi";
public static final String HADOOP_JOB_GROUP_NAME = "hadoop.job.group.name";
public static final String USER_NAME_KEY = "username";
public static final String PASSWORD_KEY = "password";
public static final String FS_DEFAULT_NAME = "fs.default.name";
public static final String FS_HDFS_IMPL = "fs.hdfs.impl";
public static final String FS_AFS_IMPL = "fs.afs.impl";
public static final String DFS_AGENT_PORT = "dfs.agent.port";
public static final String DFS_CLIENT_AUTH_METHOD = "dfs.client.auth.method";
// for bos
public static final String BOS_ENDPOINT = "bos_endpoint";
public static final String BOS_ACCESS_KEY = "bos_accesskey";
public static final String BOS_SECRET_ACCESS_KEY = "bos_secret_accesskey";
public static final String FS_BOS_IMPL = "fs.bos.impl";
public static final String FS_BOS_ACCESS_KEY = "fs.bos.access.key";
public static final String FS_BOS_SECRET_ACCESS_KEY = "fs.bos.secret.access.key";
public static final String FS_BOS_ENDPOINT = "fs.bos.endpoint";
public static final String FS_BOS_MULTIPART_UPLOADS_BLOCK_SIZE = "fs.bos.multipart.uploads.block.size";
protected StorageBackend.StorageType storageType;
protected Map<String, String> properties;
@ -79,10 +64,10 @@ public abstract class StorageDesc {
String region = hostSplit[0];
String s3Endpoint = new URIBuilder(uri).setHost("s3." + host).build().toString();
properties.clear();
properties.put(S3Storage.S3_ENDPOINT, s3Endpoint);
properties.put(S3Storage.S3_REGION, region);
properties.put(S3Storage.S3_AK, ciProperties.get(BOS_ACCESS_KEY).toString());
properties.put(S3Storage.S3_SK, ciProperties.get(BOS_SECRET_ACCESS_KEY).toString());
properties.put(S3Resource.S3_ENDPOINT, s3Endpoint);
properties.put(S3Resource.S3_REGION, region);
properties.put(S3Resource.S3_ACCESS_KEY, ciProperties.get(BOS_ACCESS_KEY).toString());
properties.put(S3Resource.S3_SECRET_KEY, ciProperties.get(BOS_SECRET_ACCESS_KEY).toString());
storageType = StorageBackend.StorageType.S3;
convertedToS3 = true;
LOG.info("skip BROKER and access S3 directly.");

View File

@ -19,11 +19,15 @@ package org.apache.doris.backup;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.service.FrontendOptions;
import com.google.common.collect.Maps;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import java.io.DataInput;
import java.io.DataOutput;
@ -104,6 +108,10 @@ public abstract class BlobStorage implements Writable {
this.properties = properties;
}
public FileSystem getFileSystem(String remotePath) throws UserException {
throw new UserException("Not support to getFileSystem.");
}
public abstract Status downloadWithFileSize(String remoteFilePath, String localFilePath, long fileSize);
// directly upload the content to remote file
@ -115,10 +123,17 @@ public abstract class BlobStorage implements Writable {
public abstract Status delete(String remotePath);
// only for hdfs and s3
public RemoteIterator<LocatedFileStatus> listLocatedStatus(String remotePath) throws UserException {
throw new UserException("Not support to listLocatedStatus.");
}
// List files in remotePath
// The remote file name will only contains file name only(Not full path)
public abstract Status list(String remotePath, List<RemoteFile> result);
public abstract Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly);
public abstract Status makeDir(String remotePath);
public abstract Status checkPathExist(String remotePath);

View File

@ -506,10 +506,15 @@ public class BrokerStorage extends BlobStorage {
return Status.OK;
}
@Override
public Status list(String remotePath, List<RemoteFile> result) {
return list(remotePath, result, true);
}
// List files in remotePath
// The remote file name will only contains file name only(Not full path)
@Override
public Status list(String remotePath, List<RemoteFile> result) {
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
// get a proper broker
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBroker();
if (pair == null) {

View File

@ -19,8 +19,8 @@ package org.apache.doris.backup;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.AuthType;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.URI;
import org.apache.commons.collections.map.CaseInsensitiveMap;
@ -29,7 +29,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
@ -76,22 +78,25 @@ public class HdfsStorage extends BlobStorage {
}
public static void checkHDFS(Map<String, String> properties) throws UserException {
if (!properties.containsKey(BrokerUtil.HADOOP_FS_NAME)) {
throw new UserException(
String.format("The properties of hdfs is invalid. %s are needed", BrokerUtil.HADOOP_FS_NAME));
for (String field : HdfsResource.REQUIRED_FIELDS) {
if (!properties.containsKey(field)) {
throw new UserException(
String.format("The properties of hdfs is invalid. %s are needed", field));
}
}
}
private FileSystem getFileSystem(String remotePath) throws UserException {
@Override
public FileSystem getFileSystem(String remotePath) throws UserException {
if (dfsFileSystem == null) {
checkHDFS(caseInsensitiveProperties);
String hdfsFsName = caseInsensitiveProperties.get(BrokerUtil.HADOOP_FS_NAME).toString();
String username = caseInsensitiveProperties.get(BrokerUtil.HADOOP_USER_NAME).toString();
String hdfsFsName = caseInsensitiveProperties.get(HdfsResource.HADOOP_FS_NAME);
String username = caseInsensitiveProperties.get(HdfsResource.HADOOP_USER_NAME);
Configuration conf = new HdfsConfiguration();
boolean isSecurityEnabled = false;
for (Map.Entry<String, String> propEntry : caseInsensitiveProperties.entrySet()) {
conf.set(propEntry.getKey(), propEntry.getValue());
if (propEntry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION)
if (propEntry.getKey().equals(HdfsResource.HADOOP_SECURITY_AUTHENTICATION)
&& propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) {
isSecurityEnabled = true;
}
@ -101,8 +106,8 @@ public class HdfsStorage extends BlobStorage {
if (isSecurityEnabled) {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(
caseInsensitiveProperties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
caseInsensitiveProperties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
caseInsensitiveProperties.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL),
caseInsensitiveProperties.get(HdfsResource.HADOOP_KERBEROS_KEYTAB));
}
dfsFileSystem = FileSystem.get(java.net.URI.create(hdfsFsName), conf, username);
} catch (Exception e) {
@ -487,6 +492,16 @@ public class HdfsStorage extends BlobStorage {
return Status.OK;
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(String remotePath) throws UserException {
FileSystem fileSystem = getFileSystem(remotePath);
try {
return fileSystem.listLocatedStatus(new Path(remotePath));
} catch (IOException e) {
throw new UserException("Failed to list located status for path: " + remotePath, e);
}
}
@Override
public Status list(String remotePath, List<RemoteFile> result) {
return list(remotePath, result, true);
@ -500,6 +515,7 @@ public class HdfsStorage extends BlobStorage {
* @param fileNameOnly means get file only in remotePath if true.
* @return Status.OK if success.
*/
@Override
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
try {
URI pathUri = URI.create(remotePath);

View File

@ -18,6 +18,7 @@
package org.apache.doris.backup;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.catalog.S3Resource;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3URI;
@ -26,6 +27,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.http.HttpStatus;
import org.apache.http.client.utils.URIBuilder;
import org.apache.logging.log4j.LogManager;
@ -65,15 +68,9 @@ import java.util.List;
import java.util.Map;
public class S3Storage extends BlobStorage {
public static final String S3_PROPERTIES_PREFIX = "AWS";
public static final String S3_AK = "AWS_ACCESS_KEY";
public static final String S3_SK = "AWS_SECRET_KEY";
public static final String S3_ENDPOINT = "AWS_ENDPOINT";
public static final String S3_REGION = "AWS_REGION";
public static final String USE_PATH_STYLE = "use_path_style";
private static final Logger LOG = LogManager.getLogger(S3Storage.class);
private final CaseInsensitiveMap caseInsensitiveProperties;
private FileSystem dfsFileSystem = null;
private final Map<String, String> caseInsensitiveProperties;
private S3Client client;
// false: the s3 client will automatically convert endpoint to virtual-hosted style, eg:
// endpoint: http://s3.us-east-2.amazonaws.com
@ -115,39 +112,65 @@ public class S3Storage extends BlobStorage {
// That is, for S3 endpoint, ignore the `use_path_style` property, and the s3 client will automatically use
// virtual hosted-sytle.
// And for other endpoint, if `use_path_style` is true, use path style. Otherwise, use virtual hosted-sytle.
if (!caseInsensitiveProperties.get(S3_ENDPOINT).toString().toLowerCase().startsWith("s3")) {
if (caseInsensitiveProperties.getOrDefault(USE_PATH_STYLE, "false").toString().equalsIgnoreCase("true")) {
forceHostedStyle = false;
} else {
forceHostedStyle = true;
}
if (!caseInsensitiveProperties.get(S3Resource.S3_ENDPOINT).toLowerCase().startsWith("s3")) {
forceHostedStyle = !caseInsensitiveProperties.getOrDefault(S3Resource.USE_PATH_STYLE, "false")
.equalsIgnoreCase("true");
} else {
forceHostedStyle = false;
}
}
public static void checkS3(CaseInsensitiveMap caseInsensitiveProperties) throws UserException {
if (!caseInsensitiveProperties.containsKey(S3_REGION)) {
throw new UserException("AWS_REGION not found.");
public static void checkS3(Map<String, String> properties) throws UserException {
for (String field : S3Resource.REQUIRED_FIELDS) {
if (!properties.containsKey(field)) {
throw new UserException(field + " not found.");
}
}
if (!caseInsensitiveProperties.containsKey(S3_ENDPOINT)) {
throw new UserException("AWS_ENDPOINT not found.");
}
if (!caseInsensitiveProperties.containsKey(S3_AK)) {
throw new UserException("AWS_ACCESS_KEY not found.");
}
if (!caseInsensitiveProperties.containsKey(S3_SK)) {
throw new UserException("AWS_SECRET_KEY not found.");
}
@Override
public FileSystem getFileSystem(String remotePath) throws UserException {
if (dfsFileSystem == null) {
checkS3(caseInsensitiveProperties);
Configuration conf = new Configuration();
System.setProperty("com.amazonaws.services.s3.enableV4", "true");
conf.set("fs.s3a.access.key", caseInsensitiveProperties.get(S3Resource.S3_ACCESS_KEY));
conf.set("fs.s3a.secret.key", caseInsensitiveProperties.get(S3Resource.S3_SECRET_KEY));
conf.set("fs.s3a.endpoint", caseInsensitiveProperties.get(S3Resource.S3_ENDPOINT));
conf.set("fs.s3a.endpoint.region", caseInsensitiveProperties.get(S3Resource.S3_REGION));
if (caseInsensitiveProperties.containsKey(S3Resource.S3_MAX_CONNECTIONS)) {
conf.set("fs.s3a.connection.maximum",
caseInsensitiveProperties.get(S3Resource.S3_MAX_CONNECTIONS));
}
if (caseInsensitiveProperties.containsKey(S3Resource.S3_REQUEST_TIMEOUT_MS)) {
conf.set("fs.s3a.connection.request.timeout",
caseInsensitiveProperties.get(S3Resource.S3_REQUEST_TIMEOUT_MS));
}
if (caseInsensitiveProperties.containsKey(S3Resource.S3_CONNECTION_TIMEOUT_MS)) {
conf.set("fs.s3a.connection.timeout",
caseInsensitiveProperties.get(S3Resource.S3_CONNECTION_TIMEOUT_MS));
}
conf.set("fs.s3.impl.disable.cache", "true");
conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
// introducing in hadoop aws 2.8.0
conf.set("fs.s3a.path.style.access", forceHostedStyle ? "false" : "true");
conf.set("fs.s3a.attempts.maximum", "2");
try {
dfsFileSystem = FileSystem.get(new URI(remotePath), conf);
} catch (Exception e) {
throw new UserException("Failed to get S3 FileSystem for " + e.getMessage(), e);
}
}
return dfsFileSystem;
}
private S3Client getClient(String bucket) throws UserException {
if (client == null) {
checkS3(caseInsensitiveProperties);
URI tmpEndpoint = URI.create(caseInsensitiveProperties.get(S3_ENDPOINT).toString());
URI tmpEndpoint = URI.create(caseInsensitiveProperties.get(S3Resource.S3_ENDPOINT));
AwsBasicCredentials awsBasic = AwsBasicCredentials.create(
caseInsensitiveProperties.get(S3_AK).toString(),
caseInsensitiveProperties.get(S3_SK).toString());
caseInsensitiveProperties.get(S3Resource.S3_ACCESS_KEY),
caseInsensitiveProperties.get(S3Resource.S3_SECRET_KEY));
StaticCredentialsProvider scp = StaticCredentialsProvider.create(awsBasic);
EqualJitterBackoffStrategy backoffStrategy = EqualJitterBackoffStrategy
.builder()
@ -172,7 +195,7 @@ public class S3Storage extends BlobStorage {
client = S3Client.builder()
.endpointOverride(endpoint)
.credentialsProvider(scp)
.region(Region.of(caseInsensitiveProperties.get(S3_REGION).toString()))
.region(Region.of(caseInsensitiveProperties.get(S3Resource.S3_REGION)))
.overrideConfiguration(clientConf)
// disable chunkedEncoding because of bos not supported
// use virtual hosted-style access
@ -321,29 +344,26 @@ public class S3Storage extends BlobStorage {
}
}
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(String remotePath) throws UserException {
FileSystem fileSystem = getFileSystem(remotePath);
try {
return fileSystem.listLocatedStatus(new org.apache.hadoop.fs.Path(remotePath));
} catch (IOException e) {
throw new UserException("Failed to list located status for path: " + remotePath, e);
}
}
@Override
public Status list(String remotePath, List<RemoteFile> result) {
return list(remotePath, result, true);
}
// broker file pattern glob is too complex, so we use hadoop directly
@Override
public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
try {
checkS3(caseInsensitiveProperties);
Configuration conf = new Configuration();
String s3AK = caseInsensitiveProperties.get(S3_AK).toString();
String s3Sk = caseInsensitiveProperties.get(S3_SK).toString();
String s3Endpoint = caseInsensitiveProperties.get(S3_ENDPOINT).toString();
System.setProperty("com.amazonaws.services.s3.enableV4", "true");
conf.set("fs.s3a.access.key", s3AK);
conf.set("fs.s3a.secret.key", s3Sk);
conf.set("fs.s3a.endpoint", s3Endpoint);
conf.set("fs.s3.impl.disable.cache", "true");
conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
// introducing in hadoop aws 2.8.0
conf.set("fs.s3a.path.style.access", forceHostedStyle ? "false" : "true");
conf.set("fs.s3a.attempts.maximum", "2");
FileSystem s3AFileSystem = FileSystem.get(new URI(remotePath), conf);
FileSystem s3AFileSystem = getFileSystem(remotePath);
org.apache.hadoop.fs.Path pathPattern = new org.apache.hadoop.fs.Path(remotePath);
FileStatus[] files = s3AFileSystem.globStatus(pathPattern);
if (files == null) {

View File

@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.proc.BaseProcResult;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* HMS resource
* <p>
* Syntax:
* CREATE RESOURCE "hive"
* PROPERTIES
* (
* "type" = "hms",
* "hive.metastore.uris" = "thrift://172.21.0.44:7004"
* );
*/
public class HMSResource extends Resource {
// required
public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
@SerializedName(value = "properties")
private Map<String, String> properties;
public HMSResource(String name) {
super(name, ResourceType.HMS);
properties = Maps.newHashMap();
}
@Override
public void modifyProperties(Map<String, String> properties) throws DdlException {
for (Map.Entry<String, String> kv : properties.entrySet()) {
replaceIfEffectiveValue(this.properties, kv.getKey(), kv.getValue());
}
}
@Override
protected void setProperties(Map<String, String> properties) throws DdlException {
List<String> requiredFields = Collections.singletonList(HIVE_METASTORE_URIS);
for (String field : requiredFields) {
if (!properties.containsKey(field)) {
throw new DdlException("Missing [" + field + "] in properties.");
}
}
this.properties = properties;
}
@Override
public Map<String, String> getCopiedProperties() {
return Maps.newHashMap(properties);
}
@Override
protected void getProcNodeData(BaseProcResult result) {
String lowerCaseType = type.name().toLowerCase();
for (Map.Entry<String, String> entry : properties.entrySet()) {
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
}
}
}

View File

@ -0,0 +1,123 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.thrift.THdfsConf;
import org.apache.doris.thrift.THdfsParams;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* HDFS resource
* <p>
* Syntax:
* CREATE RESOURCE "remote_hdfs"
* PROPERTIES
* (
* "type" = "hdfs",
* "fs.defaultFS" = "hdfs://10.220.147.151:8020",
* "hadoop.username" = "root"
* );
*/
public class HdfsResource extends Resource {
public static final String HADOOP_FS_PREFIX = "dfs.";
public static String HADOOP_FS_NAME = "fs.defaultFS";
// simple or kerberos
public static String HADOOP_USER_NAME = "hadoop.username";
public static String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
public static List<String> REQUIRED_FIELDS = Collections.singletonList(HADOOP_FS_NAME);
@SerializedName(value = "properties")
private Map<String, String> properties;
public HdfsResource(String name) {
super(name, Resource.ResourceType.HDFS);
properties = Maps.newHashMap();
}
@Override
public void modifyProperties(Map<String, String> properties) throws DdlException {
for (Map.Entry<String, String> kv : properties.entrySet()) {
replaceIfEffectiveValue(this.properties, kv.getKey(), kv.getValue());
}
}
@Override
protected void setProperties(Map<String, String> properties) throws DdlException {
for (String field : REQUIRED_FIELDS) {
if (!properties.containsKey(field)) {
throw new DdlException("Missing [" + field + "] in properties.");
}
}
// `dfs.client.read.shortcircuit` and `dfs.domain.socket.path` should be both set to enable short circuit read.
// We should disable short circuit read if they are not both set because it will cause performance down.
if (!properties.containsKey(HADOOP_SHORT_CIRCUIT) || !properties.containsKey(HADOOP_SOCKET_PATH)) {
properties.put(HADOOP_SHORT_CIRCUIT, "false");
}
this.properties = properties;
}
@Override
public Map<String, String> getCopiedProperties() {
return Maps.newHashMap(properties);
}
@Override
protected void getProcNodeData(BaseProcResult result) {
String lowerCaseType = type.name().toLowerCase();
for (Map.Entry<String, String> entry : properties.entrySet()) {
result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
}
}
// Will be removed after BE unified storage params
public static THdfsParams generateHdfsParam(Map<String, String> properties) {
THdfsParams tHdfsParams = new THdfsParams();
tHdfsParams.setHdfsConf(new ArrayList<>());
for (Map.Entry<String, String> property : properties.entrySet()) {
if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) {
tHdfsParams.setFsName(property.getValue());
} else if (property.getKey().equalsIgnoreCase(HADOOP_USER_NAME)) {
tHdfsParams.setUser(property.getValue());
} else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_PRINCIPAL)) {
tHdfsParams.setHdfsKerberosPrincipal(property.getValue());
} else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_KEYTAB)) {
tHdfsParams.setHdfsKerberosKeytab(property.getValue());
} else {
THdfsConf hdfsConf = new THdfsConf();
hdfsConf.setKey(property.getKey());
hdfsConf.setValue(property.getValue());
tHdfsParams.hdfs_conf.add(hdfsConf);
}
}
return tHdfsParams;
}
}

View File

@ -31,9 +31,11 @@ import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.backup.BlobStorage;
import org.apache.doris.backup.S3Storage;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.UserException;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExprOpcode;
@ -42,12 +44,8 @@ import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@ -67,7 +65,6 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
@ -93,7 +90,6 @@ import java.util.stream.Collectors;
public class HiveMetaStoreClientHelper {
private static final Logger LOG = LogManager.getLogger(HiveMetaStoreClientHelper.class);
public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
public static final String HIVE_METASTORE_TYPE = "hive.metastore.type";
public static final String DLF_TYPE = "dlf";
public static final String COMMENT = "comment";
@ -178,88 +174,27 @@ public class HiveMetaStoreClientHelper {
public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDesc hivePartitionPredicate,
List<TBrokerFileStatus> fileStatuses, Table remoteHiveTbl, StorageBackend.StorageType type)
throws DdlException {
boolean onS3 = type.equals(StorageBackend.StorageType.S3);
Map<String, String> properties = hiveTable.getHiveProperties();
Configuration configuration = getConfiguration(properties, onS3);
boolean isSecurityEnabled = isSecurityEnabled(properties);
List<RemoteIterator<LocatedFileStatus>> remoteIterators;
if (remoteHiveTbl.getPartitionKeys().size() > 0) {
String metaStoreUris = hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS);
// hive partitioned table, get file iterator from table partition sd info
List<Partition> hivePartitions = getHivePartitions(metaStoreUris, remoteHiveTbl, hivePartitionPredicate);
remoteIterators = getRemoteIterator(hivePartitions, configuration, isSecurityEnabled, properties, onS3);
} else {
// hive non-partitioned table, get file iterator from table sd info
remoteIterators = getRemoteIterator(remoteHiveTbl, configuration, isSecurityEnabled, properties, onS3);
}
return getAllFileStatus(fileStatuses, remoteIterators, configuration, isSecurityEnabled, properties, onS3);
}
// create Configuration for the given properties
private static Configuration getConfiguration(Map<String, String> properties, boolean onS3) {
Configuration configuration = new HdfsConfiguration();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
configuration.set(entry.getKey(), entry.getValue());
}
}
if (onS3) {
setS3Configuration(configuration, properties);
}
return configuration;
}
// return true if it is kerberos
private static boolean isSecurityEnabled(Map<String, String> properties) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION) && entry.getValue()
.equals(AuthType.KERBEROS.getDesc())) {
return true;
}
}
return false;
}
// Get remote iterators for given partitions
private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(List<Partition> partitions,
Configuration configuration, boolean isSecurityEnabled, Map<String, String> properties, boolean onS3)
throws DdlException {
List<RemoteIterator<LocatedFileStatus>> allIterators = new ArrayList<>();
for (Partition p : partitions) {
String location = normalizeS3LikeSchema(p.getSd().getLocation());
Path path = new Path(location);
allIterators.addAll(getRemoteIterator(path, configuration, properties, isSecurityEnabled));
}
return allIterators;
}
// Get remote iterators for given table
private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(Table table, Configuration configuration,
boolean isSecurityEnabled, Map<String, String> properties, boolean onS3) throws DdlException {
String location = normalizeS3LikeSchema(table.getSd().getLocation());
Path path = new Path(location);
return getRemoteIterator(path, configuration, properties, isSecurityEnabled);
}
// Get remote iterators for given Path
private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(org.apache.hadoop.fs.Path path,
Configuration conf, Map<String, String> properties, boolean isSecurityEnabled) throws DdlException {
List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
BlobStorage storage = BlobStorage.create("HiveMetaStore", type, hiveTable.getHiveProperties());
List<RemoteIterator<LocatedFileStatus>> remoteIterators = new ArrayList<>();
try {
if (isSecurityEnabled) {
UserGroupInformation.setConfiguration(conf);
// login user from keytab
UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
if (remoteHiveTbl.getPartitionKeys().size() > 0) {
String metaStoreUris = hiveTable.getHiveProperties().get(HMSResource.HIVE_METASTORE_URIS);
// hive partitioned table, get file iterator from table partition sd info
List<Partition> hivePartitions = getHivePartitions(metaStoreUris, remoteHiveTbl,
hivePartitionPredicate);
for (Partition p : hivePartitions) {
String location = normalizeS3LikeSchema(p.getSd().getLocation());
remoteIterators.add(storage.listLocatedStatus(location));
}
} else {
// hive non-partitioned table, get file iterator from table sd info
String location = normalizeS3LikeSchema(remoteHiveTbl.getSd().getLocation());
remoteIterators.add(storage.listLocatedStatus(location));
}
FileSystem fileSystem = path.getFileSystem(conf);
iterators.add(fileSystem.listLocatedStatus(path));
} catch (IOException e) {
LOG.warn("Get HDFS file remote iterator failed. {}", e.getMessage());
throw new DdlException("Get HDFS file remote iterator failed. Error: " + e.getMessage());
return getAllFileStatus(fileStatuses, remoteIterators, storage);
} catch (UserException e) {
throw new DdlException(e.getMessage(), e);
}
return iterators;
}
public static String normalizeS3LikeSchema(String location) {
@ -274,8 +209,8 @@ public class HiveMetaStoreClientHelper {
}
private static String getAllFileStatus(List<TBrokerFileStatus> fileStatuses,
List<RemoteIterator<LocatedFileStatus>> remoteIterators, Configuration configuration,
boolean isSecurityEnabled, Map<String, String> properties, boolean onS3) throws DdlException {
List<RemoteIterator<LocatedFileStatus>> remoteIterators, BlobStorage storage) throws UserException {
boolean onS3 = storage instanceof S3Storage;
String hdfsUrl = "";
Queue<RemoteIterator<LocatedFileStatus>> queue = Queues.newArrayDeque(remoteIterators);
while (queue.peek() != null) {
@ -285,8 +220,7 @@ public class HiveMetaStoreClientHelper {
LocatedFileStatus fileStatus = iterator.next();
if (fileStatus.isDirectory()) {
// recursive visit the directory to get the file path.
queue.addAll(
getRemoteIterator(fileStatus.getPath(), configuration, properties, isSecurityEnabled));
queue.add(storage.listLocatedStatus(fileStatus.getPath().toString()));
continue;
}
TBrokerFileStatus brokerFileStatus = new TBrokerFileStatus();
@ -346,23 +280,8 @@ public class HiveMetaStoreClientHelper {
return hivePartitions;
}
private static void setS3Configuration(Configuration configuration, Map<String, String> properties) {
if (properties.containsKey(HiveTable.S3_AK)) {
configuration.set("fs.s3a.access.key", properties.get(HiveTable.S3_AK));
}
if (properties.containsKey(HiveTable.S3_SK)) {
configuration.set("fs.s3a.secret.key", properties.get(HiveTable.S3_SK));
}
if (properties.containsKey(HiveTable.S3_ENDPOINT)) {
configuration.set("fs.s3a.endpoint", properties.get(HiveTable.S3_ENDPOINT));
}
configuration.set("fs.s3.impl.disable.cache", "true");
configuration.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
configuration.set("fs.s3a.attempts.maximum", "2");
}
public static Table getTable(HiveTable hiveTable) throws DdlException {
IMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS));
IMetaStoreClient client = getClient(hiveTable.getHiveProperties().get(HMSResource.HIVE_METASTORE_URIS));
Table table;
try {
table = client.getTable(hiveTable.getHiveDb(), hiveTable.getHiveTable());
@ -912,18 +831,18 @@ public class HiveMetaStoreClientHelper {
// See: https://help.aliyun.com/document_detail/31837.html
// And add "-internal" to access oss within vpc
// TODO: find to way to access oss on public?
res.put(HiveTable.AWS_REGION, "oss-" + region);
res.put(HiveTable.S3_ENDPOINT, "http://oss-" + region + "-internal.aliyuncs.com");
res.put(S3Resource.S3_REGION, "oss-" + region);
res.put(S3Resource.S3_ENDPOINT, "http://oss-" + region + "-internal.aliyuncs.com");
}
// 2. ak and sk
String ak = hiveConf.get("dlf.catalog.accessKeyId");
String sk = hiveConf.get("dlf.catalog.accessKeySecret");
if (!Strings.isNullOrEmpty(ak)) {
res.put(HiveTable.S3_AK, ak);
res.put(S3Resource.S3_ACCESS_KEY, ak);
}
if (!Strings.isNullOrEmpty(sk)) {
res.put(HiveTable.S3_SK, sk);
res.put(S3Resource.S3_SECRET_KEY, sk);
}
if (LOG.isDebugEnabled()) {
LOG.debug("get properties for oss in hive-site.xml for catalog {}: {}", catalogName, res);

View File

@ -19,7 +19,6 @@ package org.apache.doris.catalog;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
@ -51,17 +50,6 @@ public class HiveTable extends Table {
public static final String HIVE_DB = "database";
public static final String HIVE_TABLE = "table";
public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
public static final String HIVE_HDFS_PREFIX = "dfs.";
public static final String S3_FS_PREFIX = "fs.s3";
public static final String S3_PROPERTIES_PREFIX = "AWS";
public static final String S3_AK = "AWS_ACCESS_KEY";
public static final String S3_SK = "AWS_SECRET_KEY";
public static final String S3_ENDPOINT = "AWS_ENDPOINT";
public static final String AWS_REGION = "AWS_REGION";
public static final String AWS_MAX_CONN_SIZE = "AWS_MAX_CONN_SIZE";
public static final String AWS_REQUEST_TIMEOUT_MS = "AWS_REQUEST_TIMEOUT_MS";
public static final String AWS_CONN_TIMEOUT_MS = "AWS_CONN_TIMEOUT_MS";
public HiveTable() {
super(TableType.HIVE);
@ -109,56 +97,56 @@ public class HiveTable extends Table {
// check hive properties
// hive.metastore.uris
String hiveMetaStoreUris = copiedProps.get(HIVE_METASTORE_URIS);
String hiveMetaStoreUris = copiedProps.get(HMSResource.HIVE_METASTORE_URIS);
if (Strings.isNullOrEmpty(hiveMetaStoreUris)) {
throw new DdlException(String.format(PROPERTY_MISSING_MSG, HIVE_METASTORE_URIS, HIVE_METASTORE_URIS));
throw new DdlException(String.format(
PROPERTY_MISSING_MSG, HMSResource.HIVE_METASTORE_URIS, HMSResource.HIVE_METASTORE_URIS));
}
copiedProps.remove(HIVE_METASTORE_URIS);
hiveProperties.put(HIVE_METASTORE_URIS, hiveMetaStoreUris);
copiedProps.remove(HMSResource.HIVE_METASTORE_URIS);
hiveProperties.put(HMSResource.HIVE_METASTORE_URIS, hiveMetaStoreUris);
// check auth type
String authType = copiedProps.get(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION);
String authType = copiedProps.get(HdfsResource.HADOOP_SECURITY_AUTHENTICATION);
if (Strings.isNullOrEmpty(authType)) {
authType = AuthType.SIMPLE.getDesc();
}
if (!AuthType.isSupportedAuthType(authType)) {
throw new DdlException(String.format(PROPERTY_ERROR_MSG,
BrokerUtil.HADOOP_SECURITY_AUTHENTICATION, authType));
HdfsResource.HADOOP_SECURITY_AUTHENTICATION, authType));
}
copiedProps.remove(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION);
hiveProperties.put(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION, authType);
copiedProps.remove(HdfsResource.HADOOP_SECURITY_AUTHENTICATION);
hiveProperties.put(HdfsResource.HADOOP_SECURITY_AUTHENTICATION, authType);
if (AuthType.KERBEROS.getDesc().equals(authType)) {
// check principal
String principal = copiedProps.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL);
String principal = copiedProps.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL);
if (Strings.isNullOrEmpty(principal)) {
throw new DdlException(String.format(PROPERTY_MISSING_MSG,
BrokerUtil.HADOOP_KERBEROS_PRINCIPAL, BrokerUtil.HADOOP_KERBEROS_PRINCIPAL));
HdfsResource.HADOOP_KERBEROS_PRINCIPAL, HdfsResource.HADOOP_KERBEROS_PRINCIPAL));
}
hiveProperties.put(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL, principal);
copiedProps.remove(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL);
hiveProperties.put(HdfsResource.HADOOP_KERBEROS_PRINCIPAL, principal);
copiedProps.remove(HdfsResource.HADOOP_KERBEROS_PRINCIPAL);
// check keytab
String keytabPath = copiedProps.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB);
String keytabPath = copiedProps.get(HdfsResource.HADOOP_KERBEROS_KEYTAB);
if (Strings.isNullOrEmpty(keytabPath)) {
throw new DdlException(String.format(PROPERTY_MISSING_MSG,
BrokerUtil.HADOOP_KERBEROS_KEYTAB, BrokerUtil.HADOOP_KERBEROS_KEYTAB));
}
if (!Strings.isNullOrEmpty(keytabPath)) {
hiveProperties.put(BrokerUtil.HADOOP_KERBEROS_KEYTAB, keytabPath);
copiedProps.remove(BrokerUtil.HADOOP_KERBEROS_KEYTAB);
HdfsResource.HADOOP_KERBEROS_KEYTAB, HdfsResource.HADOOP_KERBEROS_KEYTAB));
} else {
hiveProperties.put(HdfsResource.HADOOP_KERBEROS_KEYTAB, keytabPath);
copiedProps.remove(HdfsResource.HADOOP_KERBEROS_KEYTAB);
}
}
String hdfsUserName = copiedProps.get(BrokerUtil.HADOOP_USER_NAME);
String hdfsUserName = copiedProps.get(HdfsResource.HADOOP_USER_NAME);
if (!Strings.isNullOrEmpty(hdfsUserName)) {
hiveProperties.put(BrokerUtil.HADOOP_USER_NAME, hdfsUserName);
copiedProps.remove(BrokerUtil.HADOOP_USER_NAME);
hiveProperties.put(HdfsResource.HADOOP_USER_NAME, hdfsUserName);
copiedProps.remove(HdfsResource.HADOOP_USER_NAME);
}
if (!copiedProps.isEmpty()) {
Iterator<Map.Entry<String, String>> iter = copiedProps.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<String, String> entry = iter.next();
String key = entry.getKey();
if (key.startsWith(HIVE_HDFS_PREFIX) || key.startsWith(S3_PROPERTIES_PREFIX)) {
if (key.startsWith(HdfsResource.HADOOP_FS_PREFIX) || key.startsWith(S3Resource.S3_PROPERTIES_PREFIX)) {
hiveProperties.put(key, entry.getValue());
iter.remove();
}
@ -166,7 +154,7 @@ public class HiveTable extends Table {
}
if (!copiedProps.isEmpty()) {
throw new DdlException("Unknown table properties: " + copiedProps.toString());
throw new DdlException("Unknown table properties: " + copiedProps);
}
}

View File

@ -45,7 +45,9 @@ public abstract class Resource implements Writable {
SPARK,
ODBC_CATALOG,
S3,
JDBC;
JDBC,
HDFS,
HMS;
public static ResourceType fromString(String resourceType) {
for (ResourceType type : ResourceType.values()) {
@ -98,6 +100,12 @@ public abstract class Resource implements Writable {
case JDBC:
resource = new JdbcResource(name);
break;
case HDFS:
resource = new HdfsResource(name);
break;
case HMS:
resource = new HMSResource(name);
break;
default:
throw new DdlException("Unknown resource type: " + type);
}
@ -125,7 +133,7 @@ public abstract class Resource implements Writable {
* @param properties
* @throws AnalysisException
*/
public abstract void checkProperties(Map<String, String> properties) throws AnalysisException;
public void checkProperties(Map<String, String> properties) throws AnalysisException { }
protected void replaceIfEffectiveValue(Map<String, String> properties, String key, String value) {
if (!Strings.isNullOrEmpty(value)) {
@ -141,6 +149,8 @@ public abstract class Resource implements Writable {
public abstract Map<String, String> getCopiedProperties();
public void dropResource() throws DdlException { }
/**
* Fill BaseProcResult with different properties in child resources
* ResourceMgr.RESOURCE_PROC_NODE_TITLE_NAMES format:

View File

@ -70,11 +70,8 @@ public class ResourceMgr implements Writable {
}
public void createResource(CreateResourceStmt stmt) throws DdlException {
if (stmt.getResourceType() != ResourceType.SPARK
&& stmt.getResourceType() != ResourceType.ODBC_CATALOG
&& stmt.getResourceType() != ResourceType.S3
&& stmt.getResourceType() != ResourceType.JDBC) {
throw new DdlException("Only support SPARK, ODBC_CATALOG ,JDBC, and REMOTE_STORAGE resource.");
if (stmt.getResourceType() == ResourceType.UNKNOWN) {
throw new DdlException("Only support SPARK, ODBC_CATALOG ,JDBC, S3_COOLDOWN, S3, HDFS and HMS resource.");
}
Resource resource = Resource.fromStmt(stmt);
if (createResource(resource, stmt.isIfNotExists())) {
@ -110,12 +107,7 @@ public class ResourceMgr implements Writable {
}
Resource resource = nameToResource.get(resourceName);
if (resource.getType().equals(ResourceType.S3)
&& !((S3Resource) resource).getCopiedUsedByPolicySet().isEmpty()) {
LOG.warn("S3 resource used by policy {}, can't drop it",
((S3Resource) resource).getCopiedUsedByPolicySet());
throw new DdlException("S3 resource used by policy, can't drop it.");
}
resource.dropResource();
// Check whether the resource is in use before deleting it, except spark resource
StoragePolicy checkedStoragePolicy = StoragePolicy.ofCheck(null);
@ -136,7 +128,6 @@ public class ResourceMgr implements Writable {
String name = resource.getName();
if (nameToResource.remove(name) == null) {
LOG.info("resource " + name + " does not exists.");
return;
}
}

View File

@ -32,7 +32,6 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -41,40 +40,55 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* S3 resource for olap table
* S3 resource
*
* Syntax:
* CREATE RESOURCE "remote_s3"
* PROPERTIES
* (
* "type" = "s3",
* "s3_endpoint" = "bj",
* "s3_region" = "bj",
* "s3_root_path" = "/path/to/root",
* "s3_access_key" = "bbb",
* "s3_secret_key" = "aaaa",
* "s3_max_connections" = "50",
* "s3_request_timeout_ms" = "3000",
* "s3_connection_timeout_ms" = "1000"
* "AWS_ENDPOINT" = "bj",
* "AWS_REGION" = "bj",
* "AWS_ROOT_PATH" = "/path/to/root",
* "AWS_ACCESS_KEY" = "bbb",
* "AWS_SECRET_KEY" = "aaaa",
* "AWS_MAX_CONNECTION" = "50",
* "AWS_REQUEST_TIMEOUT_MS" = "3000",
* "AWS_CONNECTION_TIMEOUT_MS" = "1000"
* );
*/
public class S3Resource extends Resource {
public enum ReferenceType {
TVF, // table valued function
LOAD,
EXPORT,
REPOSITORY,
OUTFILE,
TABLE,
POLICY
}
private static final Logger LOG = LogManager.getLogger(S3Resource.class);
public static final String S3_PROPERTIES_PREFIX = "AWS";
public static final String S3_FS_PREFIX = "fs.s3";
// required
public static final String S3_ENDPOINT = "s3_endpoint";
public static final String S3_REGION = "s3_region";
public static final String S3_ROOT_PATH = "s3_root_path";
public static final String S3_ACCESS_KEY = "s3_access_key";
public static final String S3_SECRET_KEY = "s3_secret_key";
public static final String S3_BUCKET = "s3_bucket";
public static final String S3_ENDPOINT = "AWS_ENDPOINT";
public static final String S3_REGION = "AWS_REGION";
public static final String S3_ACCESS_KEY = "AWS_ACCESS_KEY";
public static final String S3_SECRET_KEY = "AWS_SECRET_KEY";
public static final List<String> REQUIRED_FIELDS =
Arrays.asList(S3_ENDPOINT, S3_REGION, S3_ACCESS_KEY, S3_SECRET_KEY);
// required by storage policy
public static final String S3_ROOT_PATH = "AWS_ROOT_PATH";
public static final String S3_BUCKET = "AWS_BUCKET";
// optional
public static final String S3_MAX_CONNECTIONS = "s3_max_connections";
public static final String S3_REQUEST_TIMEOUT_MS = "s3_request_timeout_ms";
public static final String S3_CONNECTION_TIMEOUT_MS = "s3_connection_timeout_ms";
public static final String USE_PATH_STYLE = "use_path_style";
public static final String S3_MAX_CONNECTIONS = "AWS_MAX_CONNECTIONS";
public static final String S3_REQUEST_TIMEOUT_MS = "AWS_REQUEST_TIMEOUT_MS";
public static final String S3_CONNECTION_TIMEOUT_MS = "AWS_CONNECTION_TIMEOUT_MS";
public static final String DEFAULT_S3_MAX_CONNECTIONS = "50";
public static final String DEFAULT_S3_REQUEST_TIMEOUT_MS = "3000";
public static final String DEFAULT_S3_CONNECTION_TIMEOUT_MS = "1000";
@ -82,27 +96,36 @@ public class S3Resource extends Resource {
@SerializedName(value = "properties")
private Map<String, String> properties;
@SerializedName(value = "usedByPolicySet")
private Set<String> usedByPolicySet;
@SerializedName(value = "referenceSet")
private Map<String, ReferenceType> references;
public boolean policyAddToSet(final String policeName) {
boolean flag = this.usedByPolicySet.add(policeName);
if (flag) {
public boolean addReference(String referenceName, ReferenceType type) throws AnalysisException {
if (type == ReferenceType.POLICY) {
if (!properties.containsKey(S3_ROOT_PATH)) {
throw new AnalysisException(String.format("Missing [%s] in '%s' resource", S3_ROOT_PATH, name));
}
if (!properties.containsKey(S3_BUCKET)) {
throw new AnalysisException(String.format("Missing [%s] in '%s' resource", S3_BUCKET, name));
}
}
if (references.put(referenceName, type) == null) {
// log set
Env.getCurrentEnv().getEditLog().logAlterResource(this);
LOG.info("{} policy add s3 resource, current set: {}", policeName, usedByPolicySet);
LOG.info("Reference(type={}, name={}) is added to s3 resource, current set: {}",
type, referenceName, references);
return true;
}
return flag;
return false;
}
public S3Resource(String name) {
this(name, Maps.newHashMap(), Sets.newHashSet());
this(name, Maps.newHashMap(), Maps.newHashMap());
}
public S3Resource(String name, Map<String, String> properties, Set<String> policySet) {
public S3Resource(String name, Map<String, String> properties, Map<String, ReferenceType> policySet) {
super(name, ResourceType.S3);
this.properties = properties;
this.usedByPolicySet = policySet;
this.references = policySet;
}
public String getProperty(String propertyKey) {
@ -117,10 +140,8 @@ public class S3Resource extends Resource {
// required
checkRequiredProperty(S3_ENDPOINT);
checkRequiredProperty(S3_REGION);
checkRequiredProperty(S3_ROOT_PATH);
checkRequiredProperty(S3_ACCESS_KEY);
checkRequiredProperty(S3_SECRET_KEY);
checkRequiredProperty(S3_BUCKET);
// optional
checkOptionalProperty(S3_MAX_CONNECTIONS, DEFAULT_S3_MAX_CONNECTIONS);
checkOptionalProperty(S3_REQUEST_TIMEOUT_MS, DEFAULT_S3_REQUEST_TIMEOUT_MS);
@ -141,48 +162,31 @@ public class S3Resource extends Resource {
@Override
public void modifyProperties(Map<String, String> properties) throws DdlException {
// can't change, because remote fs use it info to find data.
List<String> cantChangeProperties = Arrays.asList(S3_ENDPOINT, S3_REGION, S3_ROOT_PATH, S3_BUCKET);
Optional<String> any = cantChangeProperties.stream().filter(properties::containsKey).findAny();
if (any.isPresent()) {
throw new DdlException("current not support modify property : " + any.get());
if (references.containsValue(ReferenceType.POLICY)) {
// can't change, because remote fs use it info to find data.
List<String> cantChangeProperties = Arrays.asList(S3_ENDPOINT, S3_REGION, S3_ROOT_PATH, S3_BUCKET);
Optional<String> any = cantChangeProperties.stream().filter(properties::containsKey).findAny();
if (any.isPresent()) {
throw new DdlException("current not support modify property : " + any.get());
}
}
// modify properties
replaceIfEffectiveValue(this.properties, S3_ACCESS_KEY, properties.get(S3_ACCESS_KEY));
replaceIfEffectiveValue(this.properties, S3_SECRET_KEY, properties.get(S3_SECRET_KEY));
replaceIfEffectiveValue(this.properties, S3_MAX_CONNECTIONS, properties.get(S3_MAX_CONNECTIONS));
replaceIfEffectiveValue(this.properties, S3_REQUEST_TIMEOUT_MS, properties.get(S3_REQUEST_TIMEOUT_MS));
replaceIfEffectiveValue(this.properties, S3_CONNECTION_TIMEOUT_MS, properties.get(S3_CONNECTION_TIMEOUT_MS));
for (Map.Entry<String, String> kv : properties.entrySet()) {
replaceIfEffectiveValue(this.properties, kv.getKey(), kv.getValue());
}
notifyUpdate();
}
@Override
public void checkProperties(Map<String, String> properties) throws AnalysisException {
// check properties
Map<String, String> copiedProperties = Maps.newHashMap(properties);
copiedProperties.remove(S3_ENDPOINT);
copiedProperties.remove(S3_REGION);
copiedProperties.remove(S3_ROOT_PATH);
copiedProperties.remove(S3_ACCESS_KEY);
copiedProperties.remove(S3_SECRET_KEY);
copiedProperties.remove(S3_BUCKET);
copiedProperties.remove(S3_MAX_CONNECTIONS);
copiedProperties.remove(S3_REQUEST_TIMEOUT_MS);
copiedProperties.remove(S3_CONNECTION_TIMEOUT_MS);
if (!copiedProperties.isEmpty()) {
throw new AnalysisException("Unknown S3 resource properties: " + copiedProperties);
}
public Map<String, String> getCopiedProperties() {
return Maps.newHashMap(properties);
}
@Override
public Map<String, String> getCopiedProperties() {
Map<String, String> copiedProperties = Maps.newHashMap(properties);
return copiedProperties;
}
public Set<String> getCopiedUsedByPolicySet() {
return Sets.newHashSet(usedByPolicySet);
public void dropResource() throws DdlException {
if (references.containsValue(ReferenceType.POLICY)) {
throw new DdlException("S3 resource used by policy, can't drop it.");
}
}
@Override
@ -200,43 +204,50 @@ public class S3Resource extends Resource {
}
private void notifyUpdate() {
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
AgentBatchTask batchTask = new AgentBatchTask();
if (references.containsValue(ReferenceType.POLICY)) {
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
AgentBatchTask batchTask = new AgentBatchTask();
Map<String, String> copiedProperties = getCopiedProperties();
Map<String, String> copiedProperties = getCopiedProperties();
for (Long beId : systemInfoService.getBackendIds(true)) {
this.usedByPolicySet.forEach(
policy -> {
List<Policy> policiesByType = Env.getCurrentEnv().getPolicyMgr()
.getCopiedPoliciesByType(PolicyTypeEnum.STORAGE);
Optional<Policy> findPolicy = policiesByType.stream()
.filter(p -> p.getType() == PolicyTypeEnum.STORAGE && policy.equals(p.getPolicyName()))
.findAny();
LOG.info("find policy in {} ", policiesByType);
if (!findPolicy.isPresent()) {
return;
for (Long beId : systemInfoService.getBackendIds(true)) {
this.references.forEach(
(policy, type) -> {
if (type == ReferenceType.POLICY) {
List<Policy> policiesByType = Env.getCurrentEnv().getPolicyMgr()
.getCopiedPoliciesByType(PolicyTypeEnum.STORAGE);
Optional<Policy> findPolicy = policiesByType.stream()
.filter(p -> p.getType() == PolicyTypeEnum.STORAGE
&& policy.equals(p.getPolicyName()))
.findAny();
LOG.info("find policy in {} ", policiesByType);
if (!findPolicy.isPresent()) {
return;
}
// add policy's coolDown ttl、coolDown data、policy name to map
Map<String, String> tmpMap = Maps.newHashMap(copiedProperties);
StoragePolicy used = (StoragePolicy) findPolicy.get();
tmpMap.put(StoragePolicy.COOLDOWN_DATETIME,
String.valueOf(used.getCooldownTimestampMs()));
final String[] cooldownTtl = {"-1"};
Optional.ofNullable(used.getCooldownTtl())
.ifPresent(date -> cooldownTtl[0] = used.getCooldownTtl());
tmpMap.put(StoragePolicy.COOLDOWN_TTL, cooldownTtl[0]);
tmpMap.put(StoragePolicy.MD5_CHECKSUM, used.getMd5Checksum());
NotifyUpdateStoragePolicyTask modifyS3ResourcePropertiesTask =
new NotifyUpdateStoragePolicyTask(beId, used.getPolicyName(), tmpMap);
LOG.info("notify be: {}, policy name: {}, "
+ "properties: {} to modify S3 resource batch task.",
beId, used.getPolicyName(), tmpMap);
batchTask.addTask(modifyS3ResourcePropertiesTask);
}
}
// add policy's coolDown ttl、coolDown data、policy name to map
Map<String, String> tmpMap = Maps.newHashMap(copiedProperties);
StoragePolicy used = (StoragePolicy) findPolicy.get();
tmpMap.put(StoragePolicy.COOLDOWN_DATETIME, String.valueOf(used.getCooldownTimestampMs()));
final String[] cooldownTtl = {"-1"};
Optional.ofNullable(used.getCooldownTtl())
.ifPresent(date -> cooldownTtl[0] = used.getCooldownTtl());
tmpMap.put(StoragePolicy.COOLDOWN_TTL, cooldownTtl[0]);
tmpMap.put(StoragePolicy.MD5_CHECKSUM, used.getMd5Checksum());
NotifyUpdateStoragePolicyTask modifyS3ResourcePropertiesTask =
new NotifyUpdateStoragePolicyTask(beId, used.getPolicyName(), tmpMap);
LOG.info("notify be: {}, policy name: {}, properties: {} to modify S3 resource batch task.",
beId, used.getPolicyName(), tmpMap);
batchTask.addTask(modifyS3ResourcePropertiesTask);
}
);
);
}
AgentTaskExecutor.submit(batchTask);
}
AgentTaskExecutor.submit(batchTask);
}
}

View File

@ -18,11 +18,9 @@
package org.apache.doris.common.util;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.BlobStorage;
import org.apache.doris.backup.RemoteFile;
import org.apache.doris.backup.S3Storage;
import org.apache.doris.backup.Status;
import org.apache.doris.catalog.AuthType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
@ -52,73 +50,27 @@ import org.apache.doris.thrift.TBrokerPWriteRequest;
import org.apache.doris.thrift.TBrokerReadResponse;
import org.apache.doris.thrift.TBrokerRenamePathRequest;
import org.apache.doris.thrift.TBrokerVersion;
import org.apache.doris.thrift.THdfsConf;
import org.apache.doris.thrift.THdfsParams;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class BrokerUtil {
private static final Logger LOG = LogManager.getLogger(BrokerUtil.class);
private static final int READ_BUFFER_SIZE_B = 1024 * 1024;
public static String HADOOP_FS_NAME = "fs.defaultFS";
// simple or kerberos
public static String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
public static String HADOOP_USER_NAME = "hadoop.username";
public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
public static THdfsParams generateHdfsParam(Map<String, String> properties) {
THdfsParams tHdfsParams = new THdfsParams();
tHdfsParams.setHdfsConf(new ArrayList<>());
for (Map.Entry<String, String> property : properties.entrySet()) {
if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) {
tHdfsParams.setFsName(property.getValue());
} else if (property.getKey().equalsIgnoreCase(HADOOP_USER_NAME)) {
tHdfsParams.setUser(property.getValue());
} else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_PRINCIPAL)) {
tHdfsParams.setHdfsKerberosPrincipal(property.getValue());
} else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_KEYTAB)) {
tHdfsParams.setHdfsKerberosKeytab(property.getValue());
} else {
THdfsConf hdfsConf = new THdfsConf();
hdfsConf.setKey(property.getKey());
hdfsConf.setValue(property.getValue());
tHdfsParams.hdfs_conf.add(hdfsConf);
}
}
// `dfs.client.read.shortcircuit` and `dfs.domain.socket.path` should be both set to enable short circuit read.
// We should disable short circuit read if they are not both set because it will cause performance down.
if (!properties.containsKey(HADOOP_SHORT_CIRCUIT) || !properties.containsKey(HADOOP_SOCKET_PATH)) {
tHdfsParams.addToHdfsConf(new THdfsConf(HADOOP_SHORT_CIRCUIT, "false"));
}
return tHdfsParams;
}
/**
* Parse file status in path with broker, except directory
@ -129,102 +81,23 @@ public class BrokerUtil {
*/
public static void parseFile(String path, BrokerDesc brokerDesc, List<TBrokerFileStatus> fileStatuses)
throws UserException {
if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) {
TNetworkAddress address = getAddress(brokerDesc);
TPaloBrokerService.Client client = borrowClient(address);
boolean failed = true;
try {
TBrokerListPathRequest request = new TBrokerListPathRequest(
TBrokerVersion.VERSION_ONE, path, false, brokerDesc.getProperties());
TBrokerListResponse tBrokerListResponse = null;
try {
tBrokerListResponse = client.listPath(request);
} catch (TException e) {
reopenClient(client);
tBrokerListResponse = client.listPath(request);
}
if (tBrokerListResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
throw new UserException("Broker list path failed. path=" + path
+ ",broker=" + address + ",msg=" + tBrokerListResponse.getOpStatus().getMessage());
}
failed = false;
for (TBrokerFileStatus tBrokerFileStatus : tBrokerListResponse.getFiles()) {
if (tBrokerFileStatus.isDir) {
continue;
}
fileStatuses.add(tBrokerFileStatus);
}
} catch (TException e) {
LOG.warn("Broker list path exception, path={}, address={}", path, address, e);
throw new UserException("Broker list path exception. path=" + path + ", broker=" + address);
} finally {
returnClient(client, address, failed);
List<RemoteFile> rfiles = new ArrayList<>();
try {
BlobStorage storage = BlobStorage.create(
brokerDesc.getName(), brokerDesc.getStorageType(), brokerDesc.getProperties());
Status st = storage.list(path, rfiles, false);
if (!st.ok()) {
throw new UserException(brokerDesc.getName() + " list path failed. path=" + path
+ ",msg=" + st.getErrMsg());
}
} else if (brokerDesc.getStorageType() == StorageBackend.StorageType.S3) {
S3Storage s3 = new S3Storage(brokerDesc.getProperties());
List<RemoteFile> rfiles = new ArrayList<>();
try {
Status st = s3.list(path, rfiles, false);
if (!st.ok()) {
throw new UserException("S3 list path failed. path=" + path
+ ",msg=" + st.getErrMsg());
}
} catch (Exception e) {
LOG.warn("s3 list path exception, path={}", path, e);
throw new UserException("s3 list path exception. path=" + path + ", err: " + e.getMessage());
}
for (RemoteFile r : rfiles) {
if (r.isFile()) {
fileStatuses.add(new TBrokerFileStatus(r.getName(), !r.isFile(), r.getSize(), r.isFile()));
}
}
} else if (brokerDesc.getStorageType() == StorageBackend.StorageType.HDFS) {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
if (!brokerDesc.getProperties().containsKey(HADOOP_FS_NAME)
|| !brokerDesc.getProperties().containsKey(HADOOP_USER_NAME)) {
throw new UserException(String.format(
"The properties of hdfs is invalid. %s and %s are needed", HADOOP_FS_NAME,
HADOOP_USER_NAME));
}
String fsName = brokerDesc.getProperties().get(HADOOP_FS_NAME);
String userName = brokerDesc.getProperties().get(HADOOP_USER_NAME);
Configuration conf = new HdfsConfiguration();
boolean isSecurityEnabled = false;
for (Map.Entry<String, String> propEntry : brokerDesc.getProperties().entrySet()) {
conf.set(propEntry.getKey(), propEntry.getValue());
if (propEntry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION)
&& propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) {
isSecurityEnabled = true;
}
}
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
try {
if (isSecurityEnabled) {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(
brokerDesc.getProperties().get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
brokerDesc.getProperties().get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
}
FileSystem fs = FileSystem.get(new URI(fsName), conf, userName);
FileStatus[] statusList = fs.globStatus(new Path(path));
if (statusList == null) {
throw new UserException("failed to get files from path: " + path);
}
for (FileStatus status : statusList) {
if (status.isFile()) {
fileStatuses.add(new TBrokerFileStatus(status.getPath().toUri().getPath(),
status.isDirectory(), status.getLen(), status.isFile()));
}
}
} catch (IOException | InterruptedException | URISyntaxException e) {
LOG.warn("hdfs check error: ", e);
throw new UserException(e.getMessage());
}
} finally {
Thread.currentThread().setContextClassLoader(classLoader);
} catch (Exception e) {
LOG.warn("{} list path exception, path={}", brokerDesc.getName(), path, e);
throw new UserException(brokerDesc.getName() + " list path exception. path="
+ path + ", err: " + e.getMessage());
}
for (RemoteFile r : rfiles) {
if (r.isFile()) {
fileStatuses.add(new TBrokerFileStatus(r.getName(), !r.isFile(), r.getSize(), r.isFile()));
}
}
}

View File

@ -17,10 +17,10 @@
package org.apache.doris.datasource;
import org.apache.doris.catalog.HiveTable;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.S3Resource;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Maps;
@ -44,52 +44,55 @@ public class CatalogProperty implements Writable {
return properties.getOrDefault(key, defaultVal);
}
// todo: remove and use HdfsResource
public Map<String, String> getDfsProperties() {
Map<String, String> dfsProperties = Maps.newHashMap();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(HiveTable.HIVE_HDFS_PREFIX)
|| entry.getKey().equals(BrokerUtil.HADOOP_USER_NAME)) {
if (entry.getKey().startsWith(HdfsResource.HADOOP_FS_PREFIX)
|| entry.getKey().equals(HdfsResource.HADOOP_USER_NAME)) {
// todo: still missing properties like hadoop.xxx
dfsProperties.put(entry.getKey(), entry.getValue());
}
}
return dfsProperties;
}
// todo: remove and use S3Resource
public Map<String, String> getS3Properties() {
Map<String, String> s3Properties = Maps.newHashMap();
if (properties.containsKey(HiveTable.S3_AK)) {
s3Properties.put("fs.s3a.access.key", properties.get(HiveTable.S3_AK));
s3Properties.put(HiveTable.S3_AK, properties.get(HiveTable.S3_AK));
if (properties.containsKey(S3Resource.S3_ACCESS_KEY)) {
s3Properties.put("fs.s3a.access.key", properties.get(S3Resource.S3_ACCESS_KEY));
s3Properties.put(S3Resource.S3_ACCESS_KEY, properties.get(S3Resource.S3_ACCESS_KEY));
}
if (properties.containsKey(HiveTable.S3_SK)) {
s3Properties.put("fs.s3a.secret.key", properties.get(HiveTable.S3_SK));
s3Properties.put(HiveTable.S3_SK, properties.get(HiveTable.S3_SK));
if (properties.containsKey(S3Resource.S3_SECRET_KEY)) {
s3Properties.put("fs.s3a.secret.key", properties.get(S3Resource.S3_SECRET_KEY));
s3Properties.put(S3Resource.S3_SECRET_KEY, properties.get(S3Resource.S3_SECRET_KEY));
}
if (properties.containsKey(HiveTable.S3_ENDPOINT)) {
s3Properties.put("fs.s3a.endpoint", properties.get(HiveTable.S3_ENDPOINT));
s3Properties.put(HiveTable.S3_ENDPOINT, properties.get(HiveTable.S3_ENDPOINT));
if (properties.containsKey(S3Resource.S3_ENDPOINT)) {
s3Properties.put("fs.s3a.endpoint", properties.get(S3Resource.S3_ENDPOINT));
s3Properties.put(S3Resource.S3_ENDPOINT, properties.get(S3Resource.S3_ENDPOINT));
}
if (properties.containsKey(HiveTable.AWS_REGION)) {
s3Properties.put("fs.s3a.endpoint.region", properties.get(HiveTable.AWS_REGION));
s3Properties.put(HiveTable.AWS_REGION, properties.get(HiveTable.AWS_REGION));
if (properties.containsKey(S3Resource.S3_REGION)) {
s3Properties.put("fs.s3a.endpoint.region", properties.get(S3Resource.S3_REGION));
s3Properties.put(S3Resource.S3_REGION, properties.get(S3Resource.S3_REGION));
}
if (properties.containsKey(HiveTable.AWS_MAX_CONN_SIZE)) {
s3Properties.put("fs.s3a.connection.maximum", properties.get(HiveTable.AWS_MAX_CONN_SIZE));
s3Properties.put(HiveTable.AWS_MAX_CONN_SIZE, properties.get(HiveTable.AWS_MAX_CONN_SIZE));
if (properties.containsKey(S3Resource.S3_MAX_CONNECTIONS)) {
s3Properties.put("fs.s3a.connection.maximum", properties.get(S3Resource.S3_MAX_CONNECTIONS));
s3Properties.put(S3Resource.S3_MAX_CONNECTIONS, properties.get(S3Resource.S3_MAX_CONNECTIONS));
}
if (properties.containsKey(HiveTable.AWS_REQUEST_TIMEOUT_MS)) {
s3Properties.put("fs.s3a.connection.request.timeout", properties.get(HiveTable.AWS_REQUEST_TIMEOUT_MS));
s3Properties.put(HiveTable.AWS_REQUEST_TIMEOUT_MS, properties.get(HiveTable.AWS_REQUEST_TIMEOUT_MS));
if (properties.containsKey(S3Resource.S3_REQUEST_TIMEOUT_MS)) {
s3Properties.put("fs.s3a.connection.request.timeout", properties.get(S3Resource.S3_REQUEST_TIMEOUT_MS));
s3Properties.put(S3Resource.S3_REQUEST_TIMEOUT_MS, properties.get(S3Resource.S3_REQUEST_TIMEOUT_MS));
}
if (properties.containsKey(HiveTable.AWS_CONN_TIMEOUT_MS)) {
s3Properties.put("fs.s3a.connection.timeout", properties.get(HiveTable.AWS_CONN_TIMEOUT_MS));
s3Properties.put(HiveTable.AWS_CONN_TIMEOUT_MS, properties.get(HiveTable.AWS_CONN_TIMEOUT_MS));
if (properties.containsKey(S3Resource.S3_CONNECTION_TIMEOUT_MS)) {
s3Properties.put("fs.s3a.connection.timeout", properties.get(S3Resource.S3_CONNECTION_TIMEOUT_MS));
s3Properties.put(S3Resource.S3_CONNECTION_TIMEOUT_MS, properties.get(S3Resource.S3_CONNECTION_TIMEOUT_MS));
}
s3Properties.put("fs.s3.impl.disable.cache", "true");
s3Properties.put("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
s3Properties.put("fs.s3a.attempts.maximum", "2");
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().startsWith(HiveTable.S3_FS_PREFIX)) {
if (entry.getKey().startsWith(S3Resource.S3_FS_PREFIX)) {
s3Properties.put(entry.getKey(), entry.getValue());
}
}

View File

@ -74,8 +74,8 @@ import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.HiveTable;
import org.apache.doris.catalog.IcebergTable;
import org.apache.doris.catalog.Index;
@ -2306,8 +2306,8 @@ public class InternalCatalog implements CatalogIf<Database> {
hiveTable.setComment(stmt.getComment());
// check hive table whether exists in hive database
HiveConf hiveConf = new HiveConf();
hiveConf.set(HiveMetaStoreClientHelper.HIVE_METASTORE_URIS,
hiveTable.getHiveProperties().get(HiveMetaStoreClientHelper.HIVE_METASTORE_URIS));
hiveConf.set(HMSResource.HIVE_METASTORE_URIS,
hiveTable.getHiveProperties().get(HMSResource.HIVE_METASTORE_URIS));
PooledHiveMetaStoreClient client = new PooledHiveMetaStoreClient(hiveConf, 1);
if (!client.tableExists(hiveTable.getHiveDb(), hiveTable.getHiveTable())) {
throw new DdlException(String.format("Table [%s] dose not exist in Hive.", hiveTable.getHiveDbTable()));
@ -2329,7 +2329,7 @@ public class InternalCatalog implements CatalogIf<Database> {
HudiUtils.validateCreateTable(hudiTable);
// check hudi table whether exists in hive database
HiveConf hiveConf = new HiveConf();
hiveConf.set(HiveMetaStoreClientHelper.HIVE_METASTORE_URIS,
hiveConf.set(HMSResource.HIVE_METASTORE_URIS,
hudiTable.getTableProperties().get(HudiProperty.HUDI_HIVE_METASTORE_URIS));
PooledHiveMetaStoreClient client = new PooledHiveMetaStoreClient(hiveConf, 1);
if (!client.tableExists(hudiTable.getHmsDatabaseName(), hudiTable.getHmsTableName())) {

View File

@ -23,7 +23,9 @@ import org.apache.doris.alter.SchemaChangeJobV2;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.OdbcCatalogResource;
@ -133,7 +135,9 @@ public class GsonUtils {
.registerSubtype(SparkResource.class, SparkResource.class.getSimpleName())
.registerSubtype(OdbcCatalogResource.class, OdbcCatalogResource.class.getSimpleName())
.registerSubtype(S3Resource.class, S3Resource.class.getSimpleName())
.registerSubtype(JdbcResource.class, JdbcResource.class.getSimpleName());
.registerSubtype(JdbcResource.class, JdbcResource.class.getSimpleName())
.registerSubtype(HdfsResource.class, HdfsResource.class.getSimpleName())
.registerSubtype(HMSResource.class, HMSResource.class.getSimpleName());
// runtime adapter for class "AlterJobV2"
private static RuntimeTypeAdapterFactory<AlterJobV2> alterJobV2TypeAdapterFactory = RuntimeTypeAdapterFactory

View File

@ -31,6 +31,7 @@ import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
@ -590,7 +591,7 @@ public class BrokerScanNode extends LoadScanNode {
rangeDesc.setHeaderType(headerType);
// set hdfs params for hdfs file type.
if (brokerDesc.getFileType() == TFileType.FILE_HDFS) {
THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(brokerDesc.getProperties());
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(brokerDesc.getProperties());
rangeDesc.setHdfsParams(tHdfsParams);
}
return rangeDesc;

View File

@ -22,6 +22,7 @@ import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.HiveTable;
import org.apache.doris.common.UserException;
@ -174,7 +175,7 @@ public class HiveScanNode extends BrokerScanNode {
if (!isLoad()) {
output.append(prefix).append("TABLE: ").append(hiveTable.getName()).append("\n");
output.append(prefix).append("PATH: ")
.append(hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS)).append("\n");
.append(hiveTable.getHiveProperties().get(HMSResource.HIVE_METASTORE_URIS)).append("\n");
}
return output.toString();
}

View File

@ -24,6 +24,7 @@ import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
@ -311,7 +312,7 @@ public class HudiScanNode extends BrokerScanNode {
// set hdfs params for hdfs file type.
switch (brokerDesc.getFileType()) {
case FILE_HDFS:
THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(brokerDesc.getProperties());
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(brokerDesc.getProperties());
rangeDesc.setHdfsParams(tHdfsParams);
break;
default:

View File

@ -24,13 +24,13 @@ import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.load.BrokerFileGroup;
@ -105,7 +105,7 @@ public class LoadScanProvider implements FileScanProviderIf {
params.setStrictMode(fileGroupInfo.isStrictMode());
params.setProperties(fileGroupInfo.getBrokerDesc().getProperties());
if (fileGroupInfo.getBrokerDesc().getFileType() == TFileType.FILE_HDFS) {
THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(fileGroupInfo.getBrokerDesc().getProperties());
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(fileGroupInfo.getBrokerDesc().getProperties());
params.setHdfsParams(tHdfsParams);
}
TFileAttributes fileAttributes = new TFileAttributes();

View File

@ -17,6 +17,7 @@
package org.apache.doris.planner.external;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
@ -89,7 +90,7 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
// s3://buckets
fsName = fullPath.replace(filePath, "");
}
THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(locationProperties);
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
tHdfsParams.setFsName(fsName);
context.params.setHdfsParams(tHdfsParams);
} else if (locationType == TFileType.FILE_S3) {

View File

@ -197,7 +197,8 @@ public class StoragePolicy extends Policy {
}
Resource r = checkIsS3ResourceAndExist(this.storageResource);
if (!((S3Resource) r).policyAddToSet(super.getPolicyName()) && !ifNotExists) {
if (!((S3Resource) r).addReference(super.getPolicyName(), S3Resource.ReferenceType.POLICY)
&& !ifNotExists) {
throw new AnalysisException("this policy has been added to s3 resource once, policy has been created.");
}
this.md5Checksum = calcPropertiesMd5();
@ -210,7 +211,7 @@ public class StoragePolicy extends Policy {
.orElseThrow(() -> new AnalysisException("storage resource doesn't exist: " + storageResource));
if (resource.getType() != Resource.ResourceType.S3) {
throw new AnalysisException("current storage policy just support resource type S3");
throw new AnalysisException("current storage policy just support resource type S3_COOLDOWN");
}
return resource;
}
@ -318,8 +319,8 @@ public class StoragePolicy extends Policy {
// if md5Sum not eq previous value, be change its storage policy.
private String calcPropertiesMd5() {
List<String> calcKey = Arrays.asList(COOLDOWN_DATETIME, COOLDOWN_TTL, S3Resource.S3_MAX_CONNECTIONS,
S3Resource.S3_REQUEST_TIMEOUT_MS, S3Resource.S3_CONNECTION_TIMEOUT_MS, S3Resource.S3_ACCESS_KEY,
S3Resource.S3_SECRET_KEY);
S3Resource.S3_REQUEST_TIMEOUT_MS, S3Resource.S3_CONNECTION_TIMEOUT_MS,
S3Resource.S3_ACCESS_KEY, S3Resource.S3_SECRET_KEY);
Map<String, String> copiedStoragePolicyProperties = Env.getCurrentEnv().getResourceMgr()
.getResource(this.storageResource).getCopiedProperties();

View File

@ -1170,10 +1170,10 @@ public class FrontendServiceImpl implements FrontendService.Iface {
s3Info.setBucket(storagePolicyProperties.get(S3Resource.S3_BUCKET));
s3Info.setS3MaxConn(
Integer.parseInt(storagePolicyProperties.get(S3Resource.S3_MAX_CONNECTIONS)));
s3Info.setS3RequestTimeoutMs(
Integer.parseInt(storagePolicyProperties.get(S3Resource.S3_REQUEST_TIMEOUT_MS)));
s3Info.setS3ConnTimeoutMs(
Integer.parseInt(storagePolicyProperties.get(S3Resource.S3_CONNECTION_TIMEOUT_MS)));
s3Info.setS3RequestTimeoutMs(Integer.parseInt(
storagePolicyProperties.get(S3Resource.S3_REQUEST_TIMEOUT_MS)));
s3Info.setS3ConnTimeoutMs(Integer.parseInt(
storagePolicyProperties.get(S3Resource.S3_CONNECTION_TIMEOUT_MS)));
});
rEntry.setS3StorageParam(s3Info);

View File

@ -21,6 +21,7 @@ import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
@ -279,7 +280,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
fileScanRangeParams.setProperties(locationProperties);
fileScanRangeParams.setFileAttributes(getFileAttributes());
if (getTFileType() == TFileType.FILE_HDFS) {
THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(locationProperties);
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
String fsNmae = getLocationProperties().get(HdfsTableValuedFunction.HADOOP_FS_NAME);
tHdfsParams.setFsName(fsNmae);
fileScanRangeParams.setHdfsParams(tHdfsParams);

View File

@ -105,7 +105,7 @@ public class HdfsTableValuedFunction extends ExternalFileTableValuedFunction {
@Override
public BrokerDesc getBrokerDesc() {
return new BrokerDesc("S3TvfBroker", StorageType.HDFS, locationProperties);
return new BrokerDesc("HdfsTvfBroker", StorageType.HDFS, locationProperties);
}
// =========== implement abstract methods of TableValuedFunctionIf =================

View File

@ -48,9 +48,11 @@ public class NotifyUpdateStoragePolicyTask extends AgentTask {
ret.cooldown_ttl = Long.parseLong(properties.get(StoragePolicy.COOLDOWN_TTL));
ret.s3_storage_param = new TS3StorageParam();
ret.s3_storage_param.s3_max_conn = Integer.parseInt(
properties.getOrDefault(S3Resource.S3_MAX_CONNECTIONS, S3Resource.DEFAULT_S3_MAX_CONNECTIONS));
properties.getOrDefault(S3Resource.S3_MAX_CONNECTIONS,
S3Resource.DEFAULT_S3_MAX_CONNECTIONS));
ret.s3_storage_param.s3_request_timeout_ms = Integer.parseInt(
properties.getOrDefault(S3Resource.S3_REQUEST_TIMEOUT_MS, S3Resource.DEFAULT_S3_REQUEST_TIMEOUT_MS));
properties.getOrDefault(S3Resource.S3_REQUEST_TIMEOUT_MS,
S3Resource.DEFAULT_S3_REQUEST_TIMEOUT_MS));
ret.s3_storage_param.s3_conn_timeout_ms = Integer.parseInt(
properties.getOrDefault(S3Resource.S3_CONNECTION_TIMEOUT_MS,
S3Resource.DEFAULT_S3_CONNECTION_TIMEOUT_MS));

View File

@ -135,20 +135,20 @@ public class AlterTest {
// s3 resource
createRemoteStorageResource(
"create resource \"remote_s3\"\n" + "properties\n" + "(\n" + " \"type\" = \"s3\", \n"
+ " \"s3_endpoint\" = \"bj\",\n" + " \"s3_region\" = \"bj\",\n"
+ " \"s3_root_path\" = \"/path/to/root\",\n" + " \"s3_access_key\" = \"bbb\",\n"
+ " \"s3_secret_key\" = \"aaaa\",\n" + " \"s3_max_connections\" = \"50\",\n"
+ " \"s3_request_timeout_ms\" = \"3000\",\n" + " \"s3_connection_timeout_ms\" = \"1000\",\n"
+ " \"s3_bucket\" = \"test-bucket\"\n"
+ " \"AWS_ENDPOINT\" = \"bj\",\n" + " \"AWS_REGION\" = \"bj\",\n"
+ " \"AWS_ROOT_PATH\" = \"/path/to/root\",\n" + " \"AWS_ACCESS_KEY\" = \"bbb\",\n"
+ " \"AWS_SECRET_KEY\" = \"aaaa\",\n" + " \"AWS_MAX_CONNECTIONS\" = \"50\",\n"
+ " \"AWS_REQUEST_TIMEOUT_MS\" = \"3000\",\n" + " \"AWS_CONNECTION_TIMEOUT_MS\" = \"1000\",\n"
+ " \"AWS_BUCKET\" = \"test-bucket\"\n"
+ ");");
createRemoteStorageResource(
"create resource \"remote_s3_1\"\n" + "properties\n" + "(\n" + " \"type\" = \"s3\", \n"
+ " \"s3_endpoint\" = \"bj\",\n" + " \"s3_region\" = \"bj\",\n"
+ " \"s3_root_path\" = \"/path/to/root\",\n" + " \"s3_access_key\" = \"bbb\",\n"
+ " \"s3_secret_key\" = \"aaaa\",\n" + " \"s3_max_connections\" = \"50\",\n"
+ " \"s3_request_timeout_ms\" = \"3000\",\n" + " \"s3_connection_timeout_ms\" = \"1000\",\n"
+ " \"s3_bucket\" = \"test-bucket\"\n"
+ " \"AWS_ENDPOINT\" = \"bj\",\n" + " \"AWS_REGION\" = \"bj\",\n"
+ " \"AWS_ROOT_PATH\" = \"/path/to/root\",\n" + " \"AWS_ACCESS_KEY\" = \"bbb\",\n"
+ " \"AWS_SECRET_KEY\" = \"aaaa\",\n" + " \"AWS_MAX_CONNECTIONS\" = \"50\",\n"
+ " \"AWS_REQUEST_TIMEOUT_MS\" = \"3000\",\n" + " \"AWS_CONNECTION_TIMEOUT_MS\" = \"1000\",\n"
+ " \"AWS_BUCKET\" = \"test-bucket\"\n"
+ ");");
createRemoteStoragePolicy(

View File

@ -17,6 +17,8 @@
package org.apache.doris.backup;
import org.apache.doris.catalog.S3Resource;
import org.apache.commons.codec.digest.DigestUtils;
import org.junit.Assert;
import org.junit.Before;
@ -56,7 +58,7 @@ public class S3StorageTest {
properties.put("AWS_ACCESS_KEY", System.getenv().getOrDefault("AWS_AK", ""));
properties.put("AWS_SECRET_KEY", System.getenv().getOrDefault("AWS_SK", ""));
properties.put("AWS_ENDPOINT", "http://s3.bj.bcebos.com");
properties.put(S3Storage.USE_PATH_STYLE, "false");
properties.put(S3Resource.USE_PATH_STYLE, "false");
properties.put("AWS_REGION", "bj");
storage = new S3Storage(properties);

View File

@ -88,12 +88,12 @@ public class ResourceMgrTest {
s3ConnTimeoutMs = "1000";
s3Properties = new HashMap<>();
s3Properties.put("type", s3ResType);
s3Properties.put("s3_endpoint", s3Endpoint);
s3Properties.put("s3_region", s3Region);
s3Properties.put("s3_root_path", s3RootPath);
s3Properties.put("s3_access_key", s3AccessKey);
s3Properties.put("s3_secret_key", s3SecretKey);
s3Properties.put("s3_bucket", "test-bucket");
s3Properties.put("AWS_ENDPOINT", s3Endpoint);
s3Properties.put("AWS_REGION", s3Region);
s3Properties.put("AWS_ROOT_PATH", s3RootPath);
s3Properties.put("AWS_ACCESS_KEY", s3AccessKey);
s3Properties.put("AWS_SECRET_KEY", s3SecretKey);
s3Properties.put("AWS_BUCKET", "test-bucket");
analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
}
@ -152,11 +152,11 @@ public class ResourceMgrTest {
// alter
s3Region = "sh";
Map<String, String> copiedS3Properties = Maps.newHashMap(s3Properties);
copiedS3Properties.put("s3_region", s3Region);
copiedS3Properties.put("AWS_REGION", s3Region);
copiedS3Properties.remove("type");
// current not support modify s3 property
// mgr.alterResource(alterResourceStmt);
// Assert.assertEquals(s3Region, ((S3Resource) mgr.getResource(s3ResName)).getProperty("s3_region"));
// Assert.assertEquals(s3Region, ((S3Resource) mgr.getResource(s3ResName)).getProperty("AWS_REGION"));
// drop
dropStmt = new DropResourceStmt(false, s3ResName);

View File

@ -75,12 +75,12 @@ public class S3ResourceTest {
s3Bucket = "test-bucket";
s3Properties = new HashMap<>();
s3Properties.put("type", type);
s3Properties.put("s3_endpoint", s3Endpoint);
s3Properties.put("s3_region", s3Region);
s3Properties.put("s3_root_path", s3RootPath);
s3Properties.put("s3_access_key", s3AccessKey);
s3Properties.put("s3_secret_key", s3SecretKey);
s3Properties.put("s3_bucket", s3Bucket);
s3Properties.put("AWS_ENDPOINT", s3Endpoint);
s3Properties.put("AWS_REGION", s3Region);
s3Properties.put("AWS_ROOT_PATH", s3RootPath);
s3Properties.put("AWS_ACCESS_KEY", s3AccessKey);
s3Properties.put("AWS_SECRET_KEY", s3SecretKey);
s3Properties.put("AWS_BUCKET", s3Bucket);
analyzer = AccessTestUtil.fetchAdminAnalyzer(true);
}
@ -102,33 +102,33 @@ public class S3ResourceTest {
S3Resource s3Resource = (S3Resource) Resource.fromStmt(stmt);
Assert.assertEquals(name, s3Resource.getName());
Assert.assertEquals(type, s3Resource.getType().name().toLowerCase());
Assert.assertEquals(s3Endpoint, s3Resource.getProperty("s3_endpoint"));
Assert.assertEquals(s3Region, s3Resource.getProperty("s3_region"));
Assert.assertEquals(s3RootPath, s3Resource.getProperty("s3_root_path"));
Assert.assertEquals(s3AccessKey, s3Resource.getProperty("s3_access_key"));
Assert.assertEquals(s3SecretKey, s3Resource.getProperty("s3_secret_key"));
Assert.assertEquals(s3MaxConnections, s3Resource.getProperty("s3_max_connections"));
Assert.assertEquals(s3ReqTimeoutMs, s3Resource.getProperty("s3_request_timeout_ms"));
Assert.assertEquals(s3ConnTimeoutMs, s3Resource.getProperty("s3_connection_timeout_ms"));
Assert.assertEquals(s3Endpoint, s3Resource.getProperty("AWS_ENDPOINT"));
Assert.assertEquals(s3Region, s3Resource.getProperty("AWS_REGION"));
Assert.assertEquals(s3RootPath, s3Resource.getProperty("AWS_ROOT_PATH"));
Assert.assertEquals(s3AccessKey, s3Resource.getProperty("AWS_ACCESS_KEY"));
Assert.assertEquals(s3SecretKey, s3Resource.getProperty("AWS_SECRET_KEY"));
Assert.assertEquals(s3MaxConnections, s3Resource.getProperty("AWS_MAX_CONNECTIONS"));
Assert.assertEquals(s3ReqTimeoutMs, s3Resource.getProperty("AWS_REQUEST_TIMEOUT_MS"));
Assert.assertEquals(s3ConnTimeoutMs, s3Resource.getProperty("AWS_CONNECTION_TIMEOUT_MS"));
// with no default settings
s3Properties.put("s3_max_connections", "100");
s3Properties.put("s3_request_timeout_ms", "2000");
s3Properties.put("s3_connection_timeout_ms", "2000");
s3Properties.put("AWS_MAX_CONNECTIONS", "100");
s3Properties.put("AWS_REQUEST_TIMEOUT_MS", "2000");
s3Properties.put("AWS_CONNECTION_TIMEOUT_MS", "2000");
stmt = new CreateResourceStmt(true, false, name, s3Properties);
stmt.analyze(analyzer);
s3Resource = (S3Resource) Resource.fromStmt(stmt);
Assert.assertEquals(name, s3Resource.getName());
Assert.assertEquals(type, s3Resource.getType().name().toLowerCase());
Assert.assertEquals(s3Endpoint, s3Resource.getProperty("s3_endpoint"));
Assert.assertEquals(s3Region, s3Resource.getProperty("s3_region"));
Assert.assertEquals(s3RootPath, s3Resource.getProperty("s3_root_path"));
Assert.assertEquals(s3AccessKey, s3Resource.getProperty("s3_access_key"));
Assert.assertEquals(s3SecretKey, s3Resource.getProperty("s3_secret_key"));
Assert.assertEquals("100", s3Resource.getProperty("s3_max_connections"));
Assert.assertEquals("2000", s3Resource.getProperty("s3_request_timeout_ms"));
Assert.assertEquals("2000", s3Resource.getProperty("s3_connection_timeout_ms"));
Assert.assertEquals(s3Endpoint, s3Resource.getProperty("AWS_ENDPOINT"));
Assert.assertEquals(s3Region, s3Resource.getProperty("AWS_REGION"));
Assert.assertEquals(s3RootPath, s3Resource.getProperty("AWS_ROOT_PATH"));
Assert.assertEquals(s3AccessKey, s3Resource.getProperty("AWS_ACCESS_KEY"));
Assert.assertEquals(s3SecretKey, s3Resource.getProperty("AWS_SECRET_KEY"));
Assert.assertEquals("100", s3Resource.getProperty("AWS_MAX_CONNECTIONS"));
Assert.assertEquals("2000", s3Resource.getProperty("AWS_REQUEST_TIMEOUT_MS"));
Assert.assertEquals("2000", s3Resource.getProperty("AWS_CONNECTION_TIMEOUT_MS"));
}
@Test(expected = DdlException.class)
@ -141,7 +141,7 @@ public class S3ResourceTest {
result = true;
}
};
s3Properties.remove("s3_root_path");
s3Properties.remove("AWS_ENDPOINT");
CreateResourceStmt stmt = new CreateResourceStmt(true, false, name, s3Properties);
stmt.analyze(analyzer);
Resource.fromStmt(stmt);
@ -161,12 +161,12 @@ public class S3ResourceTest {
s3Resource1.write(s3Dos);
Map<String, String> properties = new HashMap<>();
properties.put("s3_endpoint", "aaa");
properties.put("s3_region", "bbb");
properties.put("s3_root_path", "/path/to/root");
properties.put("s3_access_key", "xxx");
properties.put("s3_secret_key", "yyy");
properties.put("s3_bucket", "test-bucket");
properties.put("AWS_ENDPOINT", "aaa");
properties.put("AWS_REGION", "bbb");
properties.put("AWS_ROOT_PATH", "/path/to/root");
properties.put("AWS_ACCESS_KEY", "xxx");
properties.put("AWS_SECRET_KEY", "yyy");
properties.put("AWS_BUCKET", "test-bucket");
S3Resource s3Resource2 = new S3Resource("s3_2");
s3Resource2.setProperties(properties);
s3Resource2.write(s3Dos);
@ -182,14 +182,14 @@ public class S3ResourceTest {
Assert.assertEquals("s3_1", rS3Resource1.getName());
Assert.assertEquals("s3_2", rS3Resource2.getName());
Assert.assertEquals(rS3Resource2.getProperty("s3_endpoint"), "aaa");
Assert.assertEquals(rS3Resource2.getProperty("s3_region"), "bbb");
Assert.assertEquals(rS3Resource2.getProperty("s3_root_path"), "/path/to/root");
Assert.assertEquals(rS3Resource2.getProperty("s3_access_key"), "xxx");
Assert.assertEquals(rS3Resource2.getProperty("s3_secret_key"), "yyy");
Assert.assertEquals(rS3Resource2.getProperty("s3_max_connections"), "50");
Assert.assertEquals(rS3Resource2.getProperty("s3_request_timeout_ms"), "3000");
Assert.assertEquals(rS3Resource2.getProperty("s3_connection_timeout_ms"), "1000");
Assert.assertEquals(rS3Resource2.getProperty("AWS_ENDPOINT"), "aaa");
Assert.assertEquals(rS3Resource2.getProperty("AWS_REGION"), "bbb");
Assert.assertEquals(rS3Resource2.getProperty("AWS_ROOT_PATH"), "/path/to/root");
Assert.assertEquals(rS3Resource2.getProperty("AWS_ACCESS_KEY"), "xxx");
Assert.assertEquals(rS3Resource2.getProperty("AWS_SECRET_KEY"), "yyy");
Assert.assertEquals(rS3Resource2.getProperty("AWS_MAX_CONNECTIONS"), "50");
Assert.assertEquals(rS3Resource2.getProperty("AWS_REQUEST_TIMEOUT_MS"), "3000");
Assert.assertEquals(rS3Resource2.getProperty("AWS_CONNECTION_TIMEOUT_MS"), "1000");
// 3. delete
s3Dis.close();