[fix](multi-catalog)fix old s3 properties check (#18430)
fix old s3 properties check fix for #18005 (comment)
This commit is contained in:
@ -205,6 +205,9 @@ public class Repository implements Writable {
|
||||
|
||||
// create repository dir and repo info file
|
||||
public Status initRepository() {
|
||||
if (FeConstants.runningUnitTest) {
|
||||
return Status.OK;
|
||||
}
|
||||
String repoInfoFilePath = assembleRepoInfoFilePath();
|
||||
// check if the repo is already exist in remote
|
||||
List<RemoteFile> remoteFiles = Lists.newArrayList();
|
||||
|
||||
@ -97,11 +97,6 @@ public class S3Storage extends BlobStorage {
|
||||
public void setProperties(Map<String, String> properties) {
|
||||
super.setProperties(properties);
|
||||
caseInsensitiveProperties.putAll(properties);
|
||||
if (!caseInsensitiveProperties.containsKey(S3Properties.ENDPOINT)) {
|
||||
// try to get new properties from old version
|
||||
// compatible with old version
|
||||
S3Properties.convertToStdProperties(caseInsensitiveProperties);
|
||||
}
|
||||
try {
|
||||
S3Properties.requiredS3Properties(caseInsensitiveProperties);
|
||||
} catch (DdlException e) {
|
||||
@ -136,7 +131,6 @@ public class S3Storage extends BlobStorage {
|
||||
@Override
|
||||
public FileSystem getFileSystem(String remotePath) throws UserException {
|
||||
if (dfsFileSystem == null) {
|
||||
S3Properties.requiredS3Properties(caseInsensitiveProperties);
|
||||
Configuration conf = new Configuration();
|
||||
System.setProperty("com.amazonaws.services.s3.enableV4", "true");
|
||||
PropertyConverter.convertToHadoopFSProperties(caseInsensitiveProperties).forEach(conf::set);
|
||||
@ -151,7 +145,6 @@ public class S3Storage extends BlobStorage {
|
||||
|
||||
private S3Client getClient(String bucket) throws UserException {
|
||||
if (client == null) {
|
||||
S3Properties.requiredS3Properties(caseInsensitiveProperties);
|
||||
URI tmpEndpoint = URI.create(caseInsensitiveProperties.get(S3Properties.ENDPOINT));
|
||||
StaticCredentialsProvider scp;
|
||||
if (!caseInsensitiveProperties.containsKey(S3Properties.SESSION_TOKEN)) {
|
||||
|
||||
@ -86,9 +86,6 @@ public class S3Resource extends Resource {
|
||||
@Override
|
||||
protected void setProperties(Map<String, String> properties) throws DdlException {
|
||||
Preconditions.checkState(properties != null);
|
||||
|
||||
// compatible with old version
|
||||
S3Properties.convertToStdProperties(properties);
|
||||
// check properties
|
||||
S3Properties.requiredS3PingProperties(properties);
|
||||
// default need check resource conf valid, so need fix ut and regression case
|
||||
@ -164,7 +161,7 @@ public class S3Resource extends Resource {
|
||||
throw new DdlException("current not support modify property : " + any.get());
|
||||
}
|
||||
}
|
||||
// compatible with old version
|
||||
// compatible with old version, Need convert if modified properties map uses old properties.
|
||||
S3Properties.convertToStdProperties(properties);
|
||||
boolean needCheck = isNeedCheck(properties);
|
||||
LOG.debug("s3 info need check validity : {}", needCheck);
|
||||
|
||||
@ -129,7 +129,9 @@ public class PropertyConverter {
|
||||
properties.put(S3Properties.REGION, credential.getRegion());
|
||||
properties.put(S3Properties.ACCESS_KEY, credential.getAccessKey());
|
||||
properties.put(S3Properties.SECRET_KEY, credential.getSecretKey());
|
||||
properties.put(S3Properties.SESSION_TOKEN, credential.getSessionToken());
|
||||
if (properties.containsKey(S3Properties.Env.TOKEN)) {
|
||||
properties.put(S3Properties.SESSION_TOKEN, credential.getSessionToken());
|
||||
}
|
||||
if (properties.containsKey(S3Properties.Env.MAX_CONNECTIONS)) {
|
||||
properties.put(S3Properties.MAX_CONNECTIONS, properties.get(S3Properties.Env.MAX_CONNECTIONS));
|
||||
}
|
||||
@ -158,6 +160,7 @@ public class PropertyConverter {
|
||||
s3Properties.put(Constants.SOCKET_TIMEOUT, properties.get(S3Properties.CONNECTION_TIMEOUT_MS));
|
||||
}
|
||||
setS3FsAccess(s3Properties, properties, credential);
|
||||
s3Properties.putAll(properties);
|
||||
return s3Properties;
|
||||
}
|
||||
|
||||
@ -315,7 +318,6 @@ public class PropertyConverter {
|
||||
String endpoint = props.get(GlueProperties.ENDPOINT);
|
||||
props.put(AWSGlueConfig.AWS_GLUE_ENDPOINT, endpoint);
|
||||
String region = S3Properties.getRegionOfEndpoint(endpoint);
|
||||
props.put(GlueProperties.REGION, region);
|
||||
props.put(AWSGlueConfig.AWS_REGION, region);
|
||||
if (credential.isWhole()) {
|
||||
props.put(AWSGlueConfig.AWS_GLUE_ACCESS_KEY, credential.getAccessKey());
|
||||
@ -357,7 +359,7 @@ public class PropertyConverter {
|
||||
// "s3.secret_key" = "yy"
|
||||
// )
|
||||
String endpoint = props.get(GlueProperties.ENDPOINT);
|
||||
String region = props.getOrDefault(GlueProperties.REGION, S3Properties.getRegionOfEndpoint(endpoint));
|
||||
String region = S3Properties.getRegionOfEndpoint(endpoint);
|
||||
if (!Strings.isNullOrEmpty(region)) {
|
||||
props.put(S3Properties.REGION, region);
|
||||
String s3Endpoint = "s3." + region + ".amazonaws.com";
|
||||
|
||||
@ -39,4 +39,14 @@ public class GlueProperties extends BaseProperties {
|
||||
public static CloudCredential getCredential(Map<String, String> props) {
|
||||
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
|
||||
}
|
||||
|
||||
public static CloudCredential getCompatibleCredential(Map<String, String> props) {
|
||||
// Compatible with older versions.
|
||||
CloudCredential credential = getCloudCredential(props, AWSGlueConfig.AWS_GLUE_ACCESS_KEY,
|
||||
AWSGlueConfig.AWS_GLUE_SECRET_KEY, AWSGlueConfig.AWS_GLUE_SESSION_TOKEN);
|
||||
if (!credential.isWhole()) {
|
||||
credential = BaseProperties.getCompatibleCredential(props);
|
||||
}
|
||||
return credential;
|
||||
}
|
||||
}
|
||||
|
||||
@ -97,7 +97,7 @@ public class S3Properties extends BaseProperties {
|
||||
public static final String DEFAULT_MAX_CONNECTIONS = "50";
|
||||
public static final String DEFAULT_REQUEST_TIMEOUT_MS = "3000";
|
||||
public static final String DEFAULT_CONNECTION_TIMEOUT_MS = "1000";
|
||||
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT, REGION, ACCESS_KEY, SECRET_KEY);
|
||||
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT, ACCESS_KEY, SECRET_KEY);
|
||||
public static final List<String> FS_KEYS = Arrays.asList(ENDPOINT, REGION, ACCESS_KEY, SECRET_KEY, TOKEN,
|
||||
ROOT_PATH, BUCKET, MAX_CONNECTIONS, REQUEST_TIMEOUT_MS, CONNECTION_TIMEOUT_MS);
|
||||
}
|
||||
@ -155,8 +155,17 @@ public class S3Properties extends BaseProperties {
|
||||
}
|
||||
|
||||
public static void requiredS3Properties(Map<String, String> properties) throws DdlException {
|
||||
for (String field : S3Properties.REQUIRED_FIELDS) {
|
||||
checkRequiredProperty(properties, field);
|
||||
// Try to convert env properties to uniform properties
|
||||
// compatible with old version
|
||||
S3Properties.convertToStdProperties(properties);
|
||||
if (properties.containsKey(S3Properties.Env.ENDPOINT)) {
|
||||
for (String field : S3Properties.Env.REQUIRED_FIELDS) {
|
||||
checkRequiredProperty(properties, field);
|
||||
}
|
||||
} else {
|
||||
for (String field : S3Properties.REQUIRED_FIELDS) {
|
||||
checkRequiredProperty(properties, field);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -315,6 +315,9 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
|
||||
|
||||
@Override
|
||||
public List<Column> getTableColumns() throws AnalysisException {
|
||||
if (FeConstants.runningUnitTest) {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
if (!csvSchema.isEmpty()) {
|
||||
return csvSchema;
|
||||
}
|
||||
|
||||
@ -18,8 +18,11 @@
|
||||
package org.apache.doris.tablefunction;
|
||||
|
||||
import org.apache.doris.analysis.BrokerDesc;
|
||||
import org.apache.doris.analysis.StorageBackend;
|
||||
import org.apache.doris.analysis.StorageBackend.StorageType;
|
||||
import org.apache.doris.backup.BlobStorage;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.S3URI;
|
||||
import org.apache.doris.datasource.credentials.CloudCredentialWithEndpoint;
|
||||
@ -27,6 +30,7 @@ import org.apache.doris.datasource.property.PropertyConverter;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
import java.util.HashMap;
|
||||
@ -83,7 +87,17 @@ public class S3TableValuedFunction extends ExternalFileTableValuedFunction {
|
||||
locationProperties = S3Properties.credentialToMap(credential);
|
||||
String usePathStyle = tvfParams.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false");
|
||||
locationProperties.put(PropertyConverter.USE_PATH_STYLE, usePathStyle);
|
||||
parseFile();
|
||||
if (FeConstants.runningUnitTest) {
|
||||
// Just check
|
||||
BlobStorage.create(null, StorageBackend.StorageType.S3, locationProperties);
|
||||
} else {
|
||||
parseFile();
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static Map<String, String> getParams(Map<String, String> params) throws AnalysisException {
|
||||
return getValidParams(params);
|
||||
}
|
||||
|
||||
private static Map<String, String> getValidParams(Map<String, String> params) throws AnalysisException {
|
||||
|
||||
@ -0,0 +1,360 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.datasource.property;
|
||||
|
||||
import org.apache.doris.analysis.CreateCatalogStmt;
|
||||
import org.apache.doris.analysis.CreateRepositoryStmt;
|
||||
import org.apache.doris.analysis.CreateResourceStmt;
|
||||
import org.apache.doris.analysis.OutFileClause;
|
||||
import org.apache.doris.analysis.QueryStmt;
|
||||
import org.apache.doris.analysis.SelectStmt;
|
||||
import org.apache.doris.analysis.TableValuedFunctionRef;
|
||||
import org.apache.doris.backup.Repository;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Resource;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.jmockit.Deencapsulation;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.tablefunction.S3TableValuedFunction;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class PropertyConverterTest extends TestWithFeService {
|
||||
|
||||
@Override
|
||||
protected void runBeforeAll() throws Exception {
|
||||
createDorisCluster();
|
||||
createDatabase("mock_db");
|
||||
useDatabase("mock_db");
|
||||
createTable("create table mock_tbl1 \n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n"
|
||||
+ "properties('replication_num' = '1');");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOutFileS3PropertiesConverter() throws Exception {
|
||||
String query = "select * from mock_tbl1 \n"
|
||||
+ "into outfile 's3://bucket/mock_dir'\n"
|
||||
+ "format as csv\n"
|
||||
+ "properties(\n"
|
||||
+ " 'AWS_ENDPOINT' = 'http://127.0.0.1:9000',\n"
|
||||
+ " 'AWS_ACCESS_KEY' = 'akk',\n"
|
||||
+ " 'AWS_SECRET_KEY'='akk',\n"
|
||||
+ " 'AWS_REGION' = 'mock',\n"
|
||||
+ " 'use_path_style' = 'true'\n"
|
||||
+ ");";
|
||||
QueryStmt analyzedOutStmt = createStmt(query);
|
||||
Assertions.assertTrue(analyzedOutStmt.hasOutFileClause());
|
||||
|
||||
OutFileClause outFileClause = analyzedOutStmt.getOutFileClause();
|
||||
boolean isOutFileClauseAnalyzed = Deencapsulation.getField(outFileClause, "isAnalyzed");
|
||||
Assertions.assertTrue(isOutFileClauseAnalyzed);
|
||||
|
||||
Assertions.assertEquals(outFileClause.getFileFormatType(), TFileFormatType.FORMAT_CSV_PLAIN);
|
||||
|
||||
String queryNew = "select * from mock_tbl1 \n"
|
||||
+ "into outfile 's3://bucket/mock_dir'\n"
|
||||
+ "format as csv\n"
|
||||
+ "properties(\n"
|
||||
+ " 's3.endpoint' = 'http://127.0.0.1:9000',\n"
|
||||
+ " 's3.access_key' = 'akk',\n"
|
||||
+ " 's3.secret_key'='akk',\n"
|
||||
+ " 'use_path_style' = 'true'\n"
|
||||
+ ");";
|
||||
QueryStmt analyzedOutStmtNew = createStmt(queryNew);
|
||||
Assertions.assertTrue(analyzedOutStmtNew.hasOutFileClause());
|
||||
|
||||
OutFileClause outFileClauseNew = analyzedOutStmtNew.getOutFileClause();
|
||||
boolean isNewAnalyzed = Deencapsulation.getField(outFileClauseNew, "isAnalyzed");
|
||||
Assertions.assertTrue(isNewAnalyzed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testS3SourcePropertiesConverter() throws Exception {
|
||||
String queryOld = "CREATE RESOURCE 'remote_s3'\n"
|
||||
+ "PROPERTIES\n"
|
||||
+ "(\n"
|
||||
+ " 'type' = 's3',\n"
|
||||
+ " 'AWS_ENDPOINT' = 's3.us-east-1.amazonaws.com',\n"
|
||||
+ " 'AWS_REGION' = 'us-east-1',\n"
|
||||
+ " 'AWS_ACCESS_KEY' = 'akk',\n"
|
||||
+ " 'AWS_SECRET_KEY' = 'skk',\n"
|
||||
+ " 'AWS_ROOT_PATH' = '/',\n"
|
||||
+ " 'AWS_BUCKET' = 'bucket',\n"
|
||||
+ " 's3_validity_check' = 'false'"
|
||||
+ ");";
|
||||
CreateResourceStmt analyzedResourceStmt = createStmt(queryOld);
|
||||
Assertions.assertEquals(analyzedResourceStmt.getProperties().size(), 8);
|
||||
Resource resource = Resource.fromStmt(analyzedResourceStmt);
|
||||
// will add converted properties
|
||||
Assertions.assertEquals(resource.getCopiedProperties().size(), 20);
|
||||
|
||||
String queryNew = "CREATE RESOURCE 'remote_new_s3'\n"
|
||||
+ "PROPERTIES\n"
|
||||
+ "(\n"
|
||||
+ " 'type' = 's3',\n"
|
||||
+ " 's3.endpoint' = 'http://s3.us-east-1.amazonaws.com',\n"
|
||||
+ " 's3.region' = 'us-east-1',\n"
|
||||
+ " 's3.access_key' = 'akk',\n"
|
||||
+ " 's3.secret_key' = 'skk',\n"
|
||||
+ " 's3.root.path' = '/',\n"
|
||||
+ " 's3.bucket' = 'bucket',\n"
|
||||
+ " 's3_validity_check' = 'false'"
|
||||
+ ");";
|
||||
CreateResourceStmt analyzedResourceStmtNew = createStmt(queryNew);
|
||||
Assertions.assertEquals(analyzedResourceStmtNew.getProperties().size(), 8);
|
||||
Resource newResource = Resource.fromStmt(analyzedResourceStmtNew);
|
||||
// will add converted properties
|
||||
Assertions.assertEquals(newResource.getCopiedProperties().size(), 14);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testS3RepositoryPropertiesConverter() throws Exception {
|
||||
FeConstants.runningUnitTest = true;
|
||||
String s3Repo = "CREATE REPOSITORY `s3_repo`\n"
|
||||
+ "WITH S3\n"
|
||||
+ "ON LOCATION 's3://s3-repo'\n"
|
||||
+ "PROPERTIES\n"
|
||||
+ "(\n"
|
||||
+ " 'AWS_ENDPOINT' = 'http://s3.us-east-1.amazonaws.com',\n"
|
||||
+ " 'AWS_ACCESS_KEY' = 'akk',\n"
|
||||
+ " 'AWS_SECRET_KEY'='skk',\n"
|
||||
+ " 'AWS_REGION' = 'us-east-1'\n"
|
||||
+ ");";
|
||||
CreateRepositoryStmt analyzedStmt = createStmt(s3Repo);
|
||||
Assertions.assertEquals(analyzedStmt.getProperties().size(), 4);
|
||||
Repository repository = getRepository(analyzedStmt, "s3_repo");
|
||||
Assertions.assertEquals(repository.getStorage().getProperties().size(), 5);
|
||||
|
||||
String s3RepoNew = "CREATE REPOSITORY `s3_repo_new`\n"
|
||||
+ "WITH S3\n"
|
||||
+ "ON LOCATION 's3://s3-repo'\n"
|
||||
+ "PROPERTIES\n"
|
||||
+ "(\n"
|
||||
+ " 's3.endpoint' = 'http://s3.us-east-1.amazonaws.com',\n"
|
||||
+ " 's3.access_key' = 'akk',\n"
|
||||
+ " 's3.secret_key' = 'skk'\n"
|
||||
+ ");";
|
||||
CreateRepositoryStmt analyzedStmtNew = createStmt(s3RepoNew);
|
||||
Assertions.assertEquals(analyzedStmtNew.getProperties().size(), 3);
|
||||
Repository repositoryNew = getRepository(analyzedStmtNew, "s3_repo_new");
|
||||
Assertions.assertEquals(repositoryNew.getStorage().getProperties().size(), 4);
|
||||
}
|
||||
|
||||
private static Repository getRepository(CreateRepositoryStmt analyzedStmt, String name) throws DdlException {
|
||||
Env.getCurrentEnv().getBackupHandler().createRepository(analyzedStmt);
|
||||
return Env.getCurrentEnv().getBackupHandler().getRepoMgr().getRepo(name);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBosBrokerRepositoryPropertiesConverter() throws Exception {
|
||||
FeConstants.runningUnitTest = true;
|
||||
String bosBroker = "CREATE REPOSITORY `bos_broker_repo`\n"
|
||||
+ "WITH BROKER `bos_broker`\n"
|
||||
+ "ON LOCATION 'bos://backup'\n"
|
||||
+ "PROPERTIES\n"
|
||||
+ "(\n"
|
||||
+ " 'bos_endpoint' = 'http://gz.bcebos.com',\n"
|
||||
+ " 'bos_accesskey' = 'akk',\n"
|
||||
+ " 'bos_secret_accesskey'='skk'\n"
|
||||
+ ");";
|
||||
CreateRepositoryStmt analyzedStmt = createStmt(bosBroker);
|
||||
analyzedStmt.getProperties();
|
||||
Assertions.assertEquals(analyzedStmt.getProperties().size(), 3);
|
||||
|
||||
List<Pair<String, Integer>> brokers = ImmutableList.of(Pair.of("127.0.0.1", 9999));
|
||||
Env.getCurrentEnv().getBrokerMgr().addBrokers("bos_broker", brokers);
|
||||
|
||||
Repository repositoryNew = getRepository(analyzedStmt, "bos_broker_repo");
|
||||
Assertions.assertEquals(repositoryNew.getStorage().getProperties().size(), 4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testS3TVFPropertiesConverter() throws Exception {
|
||||
FeConstants.runningUnitTest = true;
|
||||
String queryOld = "select * from s3(\n"
|
||||
+ " 'uri' = 'http://s3.us-east-1.amazonaws.com/test.parquet',\n"
|
||||
+ " 'access_key' = 'akk',\n"
|
||||
+ " 'secret_key' = 'skk',\n"
|
||||
+ " 'region' = 'us-east-1',\n"
|
||||
+ " 'format' = 'parquet',\n"
|
||||
+ " 'use_path_style' = 'true'\n"
|
||||
+ ") limit 10;";
|
||||
SelectStmt analyzedStmt = createStmt(queryOld);
|
||||
Assertions.assertEquals(analyzedStmt.getTableRefs().size(), 1);
|
||||
TableValuedFunctionRef oldFuncTable = (TableValuedFunctionRef) analyzedStmt.getTableRefs().get(0);
|
||||
S3TableValuedFunction s3Tvf = (S3TableValuedFunction) oldFuncTable.getTableFunction();
|
||||
Assertions.assertEquals(s3Tvf.getBrokerDesc().getProperties().size(), 9);
|
||||
|
||||
String queryNew = "select * from s3(\n"
|
||||
+ " 'uri' = 'http://s3.us-east-1.amazonaws.com/test.parquet',\n"
|
||||
+ " 's3.access_key' = 'akk',\n"
|
||||
+ " 's3.secret_key' = 'skk',\n"
|
||||
+ " 'format' = 'parquet',\n"
|
||||
+ " 'use_path_style' = 'true'\n"
|
||||
+ ") limit 10;";
|
||||
SelectStmt analyzedStmtNew = createStmt(queryNew);
|
||||
Assertions.assertEquals(analyzedStmtNew.getTableRefs().size(), 1);
|
||||
TableValuedFunctionRef newFuncTable = (TableValuedFunctionRef) analyzedStmt.getTableRefs().get(0);
|
||||
S3TableValuedFunction newS3Tvf = (S3TableValuedFunction) newFuncTable.getTableFunction();
|
||||
Assertions.assertEquals(newS3Tvf.getBrokerDesc().getProperties().size(), 9);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAWSOldCatalogPropertiesConverter() throws Exception {
|
||||
String queryOld = "create catalog hms_s3_old properties (\n"
|
||||
+ " 'type'='hms',\n"
|
||||
+ " 'hive.metastore.uris' = 'thrift://172.21.0.44:7004',\n"
|
||||
+ " 'AWS_ENDPOINT' = 's3.us-east-1.amazonaws.com',\n"
|
||||
+ " 'AWS_REGION' = 'us-east-1',\n"
|
||||
+ " 'AWS_ACCESS_KEY' = 'akk',\n"
|
||||
+ " 'AWS_SECRET_KEY' = 'skk'\n"
|
||||
+ ");";
|
||||
CreateCatalogStmt analyzedStmt = createStmt(queryOld);
|
||||
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, "hms_s3_old");
|
||||
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
|
||||
Assertions.assertEquals(properties.size(), 6);
|
||||
|
||||
Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
|
||||
Assertions.assertEquals(hdProps.size(), 19);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testS3CatalogPropertiesConverter() throws Exception {
|
||||
String query = "create catalog hms_s3 properties (\n"
|
||||
+ " 'type'='hms',\n"
|
||||
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
|
||||
+ " 's3.endpoint' = 's3.us-east-1.amazonaws.com',\n"
|
||||
+ " 's3.access_key' = 'akk',\n"
|
||||
+ " 's3.secret_key' = 'skk'\n"
|
||||
+ ");";
|
||||
CreateCatalogStmt analyzedStmt = createStmt(query);
|
||||
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, "hms_s3");
|
||||
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
|
||||
Assertions.assertEquals(properties.size(), 9);
|
||||
|
||||
Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
|
||||
Assertions.assertEquals(hdProps.size(), 18);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGlueCatalogPropertiesConverter() throws Exception {
|
||||
String queryOld = "create catalog hms_glue_old properties (\n"
|
||||
+ " 'type'='hms',\n"
|
||||
+ " 'hive.metastore.type'='glue',\n"
|
||||
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
|
||||
+ " 'aws.glue.endpoint' = 'glue.us-east-1.amazonaws.com',\n"
|
||||
+ " 'aws.glue.access-key' = 'akk',\n"
|
||||
+ " 'aws.glue.secret-key' = 'skk',\n"
|
||||
+ " 'aws.region' = 'us-east-1'\n"
|
||||
+ ");";
|
||||
CreateCatalogStmt analyzedStmt = createStmt(queryOld);
|
||||
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, "hms_glue_old");
|
||||
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
|
||||
Assertions.assertEquals(properties.size(), 18);
|
||||
|
||||
Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
|
||||
Assertions.assertEquals(hdProps.size(), 27);
|
||||
|
||||
String query = "create catalog hms_glue properties (\n"
|
||||
+ " 'type'='hms',\n"
|
||||
+ " 'hive.metastore.type'='glue',\n"
|
||||
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
|
||||
+ " 'glue.endpoint' = 'glue.us-east-1.amazonaws.com',\n"
|
||||
+ " 'glue.access_key' = 'akk',\n"
|
||||
+ " 'glue.secret_key' = 'skk'\n"
|
||||
+ ");";
|
||||
CreateCatalogStmt analyzedStmtNew = createStmt(query);
|
||||
HMSExternalCatalog catalogNew = createAndGetCatalog(analyzedStmtNew, "hms_glue");
|
||||
Map<String, String> propertiesNew = catalogNew.getCatalogProperty().getProperties();
|
||||
Assertions.assertEquals(propertiesNew.size(), 18);
|
||||
|
||||
Map<String, String> hdPropsNew = catalogNew.getCatalogProperty().getHadoopProperties();
|
||||
Assertions.assertEquals(hdPropsNew.size(), 27);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOBSCatalogPropertiesConverter() throws Exception {
|
||||
String query = "create catalog hms_obs properties (\n"
|
||||
+ " 'type'='hms',\n"
|
||||
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
|
||||
+ " 'obs.endpoint' = 'obs.cn-north-4.myhuaweicloud.com',\n"
|
||||
+ " 'obs.access_key' = 'akk',\n"
|
||||
+ " 'obs.secret_key' = 'skk'\n"
|
||||
+ ");";
|
||||
CreateCatalogStmt analyzedStmt = createStmt(query);
|
||||
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, "hms_obs");
|
||||
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
|
||||
Assertions.assertEquals(properties.size(), 9);
|
||||
|
||||
Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
|
||||
Assertions.assertEquals(hdProps.size(), 13);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCOSCatalogPropertiesConverter() throws Exception {
|
||||
String query = "create catalog hms_cos properties (\n"
|
||||
+ " 'type'='hms',\n"
|
||||
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
|
||||
+ " 'cos.endpoint' = 'cos.ap-beijing.myqcloud.com',\n"
|
||||
+ " 'cos.access_key' = 'akk',\n"
|
||||
+ " 'cos.secret_key' = 'skk',\n"
|
||||
+ " 'enable.self.splitter'='true'\n"
|
||||
+ ");";
|
||||
CreateCatalogStmt analyzedStmt = createStmt(query);
|
||||
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, "hms_cos");
|
||||
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
|
||||
Assertions.assertEquals(properties.size(), 10);
|
||||
|
||||
Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
|
||||
Assertions.assertEquals(hdProps.size(), 22);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOSSCatalogPropertiesConverter() throws Exception {
|
||||
String query = "create catalog hms_oss properties (\n"
|
||||
+ " 'type'='hms',\n"
|
||||
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
|
||||
+ " 'oss.endpoint' = 'oss.oss-cn-beijing.aliyuncs.com',\n"
|
||||
+ " 'oss.access_key' = 'akk',\n"
|
||||
+ " 'oss.secret_key' = 'skk'\n"
|
||||
+ ");";
|
||||
CreateCatalogStmt analyzedStmt = createStmt(query);
|
||||
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, "hms_oss");
|
||||
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
|
||||
Assertions.assertEquals(properties.size(), 9);
|
||||
|
||||
Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
|
||||
Assertions.assertEquals(hdProps.size(), 21);
|
||||
}
|
||||
|
||||
private static HMSExternalCatalog createAndGetCatalog(CreateCatalogStmt analyzedStmt, String name)
|
||||
throws UserException {
|
||||
Env.getCurrentEnv().getCatalogMgr().createCatalog(analyzedStmt);
|
||||
return (HMSExternalCatalog) Env.getCurrentEnv().getCatalogMgr().getCatalog(name);
|
||||
}
|
||||
}
|
||||
@ -177,6 +177,11 @@ public abstract class TestWithFeService {
|
||||
return statementContext;
|
||||
}
|
||||
|
||||
protected <T extends StatementBase> T createStmt(String showSql)
|
||||
throws Exception {
|
||||
return (T) parseAndAnalyzeStmt(showSql, connectContext);
|
||||
}
|
||||
|
||||
protected CascadesContext createCascadesContext(String sql) {
|
||||
StatementContext statementCtx = createStatementCtx(sql);
|
||||
return MemoTestUtils.createCascadesContext(statementCtx, sql);
|
||||
|
||||
Reference in New Issue
Block a user