[fix](multi-catalog)fix s3 check, complete catalog properties (#20591)
stability and some fixes 1. fix s3 availability check 2. add independent minio properties 3. add job conf cache 4. remove extra s3 propertie when convert catalog properties 5. add some ut case to check conveted properties
This commit is contained in:
@ -127,7 +127,8 @@ public class S3Resource extends Resource {
|
||||
propertiesPing.put(S3Properties.Env.SECRET_KEY, credential.getSecretKey());
|
||||
propertiesPing.put(S3Properties.Env.ENDPOINT, credential.getEndpoint());
|
||||
propertiesPing.put(S3Properties.Env.REGION, credential.getRegion());
|
||||
propertiesPing.put(PropertyConverter.USE_PATH_STYLE, "false");
|
||||
propertiesPing.put(PropertyConverter.USE_PATH_STYLE,
|
||||
properties.getOrDefault(PropertyConverter.USE_PATH_STYLE, "false"));
|
||||
properties.putAll(propertiesPing);
|
||||
S3FileSystem fileSystem = new S3FileSystem(properties);
|
||||
String testFile = bucket + rootPath + "/test-object-valid.txt";
|
||||
|
||||
@ -63,6 +63,7 @@ import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
@ -417,7 +418,9 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
if (!Strings.isNullOrEmpty(catalog.getResource())) {
|
||||
rows.add(Arrays.asList("resource", catalog.getResource()));
|
||||
}
|
||||
for (Map.Entry<String, String> elem : catalog.getProperties().entrySet()) {
|
||||
// use tree map to maintain display order, making it easier to view properties
|
||||
Map<String, String> sortedMap = new TreeMap<>(catalog.getProperties()).descendingMap();
|
||||
for (Map.Entry<String, String> elem : sortedMap.entrySet()) {
|
||||
if (PrintableMap.HIDDEN_KEY.contains(elem.getKey())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -108,6 +108,7 @@ public class HiveMetaStoreCache {
|
||||
private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_";
|
||||
|
||||
private HMSExternalCatalog catalog;
|
||||
private JobConf jobConf;
|
||||
|
||||
private Executor executor;
|
||||
|
||||
@ -153,6 +154,8 @@ public class HiveMetaStoreCache {
|
||||
* generate a filecache and set to fileCacheRef
|
||||
*/
|
||||
public void setNewFileCache() {
|
||||
// init or refresh job conf
|
||||
setJobConf();
|
||||
// if the file.meta.cache.ttl-second is equal or greater than 0, the cache expired will be set to that value
|
||||
int fileMetaCacheTtlSecond = NumberUtils.toInt(
|
||||
(catalog.getProperties().get(HMSExternalCatalog.FILE_META_CACHE_TTL_SECOND)),
|
||||
@ -325,16 +328,6 @@ public class HiveMetaStoreCache {
|
||||
try {
|
||||
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
|
||||
String finalLocation = S3Util.convertToS3IfNecessary(key.location);
|
||||
JobConf jobConf = getJobConf();
|
||||
// For Tez engine, it may generate subdirectories for "union" query.
|
||||
// So there may be files and directories in the table directory at the same time. eg:
|
||||
// /us£er/hive/warehouse/region_tmp_union_all2/000000_0
|
||||
// /user/hive/warehouse/region_tmp_union_all2/1
|
||||
// /user/hive/warehouse/region_tmp_union_all2/2
|
||||
// So we need to set this config to support visit dir recursively.
|
||||
// Otherwise, getSplits() may throw exception: "Not a file xxx"
|
||||
// https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
|
||||
jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
|
||||
FileInputFormat.setInputPaths(jobConf, finalLocation);
|
||||
try {
|
||||
FileCacheValue result;
|
||||
@ -373,12 +366,21 @@ public class HiveMetaStoreCache {
|
||||
}
|
||||
}
|
||||
|
||||
private JobConf getJobConf() {
|
||||
private synchronized void setJobConf() {
|
||||
Configuration configuration = new HdfsConfiguration();
|
||||
for (Map.Entry<String, String> entry : catalog.getCatalogProperty().getHadoopProperties().entrySet()) {
|
||||
configuration.set(entry.getKey(), entry.getValue());
|
||||
}
|
||||
return new JobConf(configuration);
|
||||
jobConf = new JobConf(configuration);
|
||||
// For Tez engine, it may generate subdirectories for "union" query.
|
||||
// So there may be files and directories in the table directory at the same time. eg:
|
||||
// /us£er/hive/warehouse/region_tmp_union_all2/000000_0
|
||||
// /user/hive/warehouse/region_tmp_union_all2/1
|
||||
// /user/hive/warehouse/region_tmp_union_all2/2
|
||||
// So we need to set this config to support visit dir recursively.
|
||||
// Otherwise, getSplits() may throw exception: "Not a file xxx"
|
||||
// https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
|
||||
jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
|
||||
}
|
||||
|
||||
public HivePartitionValues getPartitionValues(String dbName, String tblName, List<Type> types) {
|
||||
@ -671,7 +673,6 @@ public class HiveMetaStoreCache {
|
||||
public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, ValidWriteIdList validWriteIds,
|
||||
boolean isFullAcid) {
|
||||
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
|
||||
JobConf jobConf = getJobConf();
|
||||
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
|
||||
try {
|
||||
for (HivePartition partition : partitions) {
|
||||
|
||||
@ -26,6 +26,7 @@ import org.apache.doris.datasource.property.constants.DLFProperties;
|
||||
import org.apache.doris.datasource.property.constants.GCSProperties;
|
||||
import org.apache.doris.datasource.property.constants.GlueProperties;
|
||||
import org.apache.doris.datasource.property.constants.HMSProperties;
|
||||
import org.apache.doris.datasource.property.constants.MinioProperties;
|
||||
import org.apache.doris.datasource.property.constants.ObsProperties;
|
||||
import org.apache.doris.datasource.property.constants.OssProperties;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
@ -80,15 +81,25 @@ public class PropertyConverter {
|
||||
|| props.containsKey(DataLakeConfig.CATALOG_ENDPOINT)) {
|
||||
metaProperties = convertToDLFProperties(props, DLFProperties.getCredential(props));
|
||||
} else if (props.containsKey(S3Properties.Env.ENDPOINT)) {
|
||||
// checkout env in the end
|
||||
// if meet AWS_XXX properties, convert to s3 properties
|
||||
return convertToS3EnvProperties(props, S3Properties.getEnvironmentCredentialWithEndpoint(props), true);
|
||||
if (!hasS3Properties(props)) {
|
||||
// checkout env in the end
|
||||
// if meet AWS_XXX properties, convert to s3 properties
|
||||
return convertToS3EnvProperties(props, S3Properties.getEnvironmentCredentialWithEndpoint(props), true);
|
||||
}
|
||||
}
|
||||
metaProperties.putAll(props);
|
||||
metaProperties.putAll(S3ClientBEProperties.getBeFSProperties(props));
|
||||
return metaProperties;
|
||||
}
|
||||
|
||||
private static boolean hasS3Properties(Map<String, String> props) {
|
||||
return props.containsKey(ObsProperties.ENDPOINT)
|
||||
|| props.containsKey(GCSProperties.ENDPOINT)
|
||||
|| props.containsKey(OssProperties.ENDPOINT)
|
||||
|| props.containsKey(CosProperties.ENDPOINT)
|
||||
|| props.containsKey(MinioProperties.ENDPOINT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert properties defined at doris to FE S3 client properties
|
||||
* Support other cloud client here.
|
||||
@ -102,6 +113,8 @@ public class PropertyConverter {
|
||||
return convertToOSSProperties(props, OssProperties.getCredential(props));
|
||||
} else if (props.containsKey(CosProperties.ENDPOINT)) {
|
||||
return convertToCOSProperties(props, CosProperties.getCredential(props));
|
||||
} else if (props.containsKey(MinioProperties.ENDPOINT)) {
|
||||
return convertToMinioProperties(props, MinioProperties.getCredential(props));
|
||||
} else if (props.containsKey(S3Properties.ENDPOINT)) {
|
||||
return convertToS3Properties(props, S3Properties.getCredential(props));
|
||||
} else if (props.containsKey(S3Properties.Env.ENDPOINT)) {
|
||||
@ -179,6 +192,8 @@ public class PropertyConverter {
|
||||
}
|
||||
setS3FsAccess(s3Properties, properties, credential);
|
||||
s3Properties.putAll(properties);
|
||||
// remove extra meta properties
|
||||
S3Properties.FS_KEYS.forEach(s3Properties::remove);
|
||||
return s3Properties;
|
||||
}
|
||||
|
||||
@ -235,6 +250,12 @@ public class PropertyConverter {
|
||||
return convertToS3Properties(S3Properties.prefixToS3(props), credential);
|
||||
}
|
||||
|
||||
private static Map<String, String> convertToMinioProperties(Map<String, String> props, CloudCredential credential) {
|
||||
// minio does not have region, use an arbitrary one.
|
||||
props.put(MinioProperties.REGION, "us-east-1");
|
||||
return convertToS3Properties(S3Properties.prefixToS3(props), credential);
|
||||
}
|
||||
|
||||
private static Map<String, String> convertToDLFProperties(Map<String, String> props, CloudCredential credential) {
|
||||
getPropertiesFromDLFConf(props);
|
||||
// if configure DLF properties in catalog properties, use them to override config in hive-site.xml
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.datasource.property;
|
||||
|
||||
import org.apache.doris.datasource.property.constants.CosProperties;
|
||||
import org.apache.doris.datasource.property.constants.GCSProperties;
|
||||
import org.apache.doris.datasource.property.constants.MinioProperties;
|
||||
import org.apache.doris.datasource.property.constants.ObsProperties;
|
||||
import org.apache.doris.datasource.property.constants.OssProperties;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
@ -33,7 +34,11 @@ public class S3ClientBEProperties {
|
||||
* On BE, should use properties like AWS_XXX.
|
||||
*/
|
||||
public static Map<String, String> getBeFSProperties(Map<String, String> properties) {
|
||||
if (properties.containsKey(S3Properties.ENDPOINT)) {
|
||||
if (properties.containsKey(MinioProperties.ENDPOINT)) {
|
||||
// minio does not have region, use an arbitrary one.
|
||||
properties.put(MinioProperties.REGION, "us-east-1");
|
||||
return getBeAWSPropertiesFromS3(S3Properties.prefixToS3(properties));
|
||||
} else if (properties.containsKey(S3Properties.ENDPOINT)) {
|
||||
// s3,oss,cos,obs use this.
|
||||
return getBeAWSPropertiesFromS3(properties);
|
||||
} else if (properties.containsKey(ObsProperties.ENDPOINT)
|
||||
|
||||
@ -0,0 +1,39 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.datasource.property.constants;
|
||||
|
||||
import org.apache.doris.datasource.credentials.CloudCredential;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class MinioProperties extends BaseProperties {
|
||||
|
||||
public static final String MINIO_PREFIX = "minio.";
|
||||
public static final String ENDPOINT = "minio.endpoint";
|
||||
public static final String REGION = "minio.region";
|
||||
public static final String ACCESS_KEY = "minio.access_key";
|
||||
public static final String SECRET_KEY = "minio.secret_key";
|
||||
public static final String SESSION_TOKEN = "minio.session_token";
|
||||
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT, ACCESS_KEY, SECRET_KEY, REGION);
|
||||
|
||||
public static CloudCredential getCredential(Map<String, String> props) {
|
||||
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
|
||||
}
|
||||
}
|
||||
@ -57,6 +57,8 @@ public class S3Properties extends BaseProperties {
|
||||
public static final String VALIDITY_CHECK = "s3_validity_check";
|
||||
public static final List<String> REQUIRED_FIELDS = Arrays.asList(ENDPOINT, ACCESS_KEY, SECRET_KEY);
|
||||
public static final List<String> TVF_REQUIRED_FIELDS = Arrays.asList(ACCESS_KEY, SECRET_KEY);
|
||||
public static final List<String> FS_KEYS = Arrays.asList(ENDPOINT, REGION, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN,
|
||||
ROOT_PATH, BUCKET, MAX_CONNECTIONS, REQUEST_TIMEOUT_MS, CONNECTION_TIMEOUT_MS);
|
||||
|
||||
public static final List<String> AWS_CREDENTIALS_PROVIDERS = Arrays.asList(
|
||||
DataLakeAWSCredentialsProvider.class.getName(),
|
||||
@ -140,6 +142,9 @@ public class S3Properties extends BaseProperties {
|
||||
} else if (entry.getKey().startsWith(ObsProperties.OBS_PREFIX)) {
|
||||
String s3Key = entry.getKey().replace(ObsProperties.OBS_PREFIX, S3Properties.S3_PREFIX);
|
||||
s3Properties.put(s3Key, entry.getValue());
|
||||
} else if (entry.getKey().startsWith(MinioProperties.MINIO_PREFIX)) {
|
||||
String s3Key = entry.getKey().replace(MinioProperties.MINIO_PREFIX, S3Properties.S3_PREFIX);
|
||||
s3Properties.put(s3Key, entry.getValue());
|
||||
}
|
||||
}
|
||||
return s3Properties;
|
||||
|
||||
@ -126,7 +126,7 @@ public class DFSFileSystem extends RemoteFileSystem {
|
||||
String hadoopUserName = conf.get(HdfsResource.HADOOP_USER_NAME);
|
||||
if (hadoopUserName == null) {
|
||||
hadoopUserName = "hadoop";
|
||||
LOG.warn("hadoop.username is unset, use default user: hadoop");
|
||||
LOG.debug(HdfsResource.HADOOP_USER_NAME + " is unset, use default user: hadoop");
|
||||
}
|
||||
return UserGroupInformation.createRemoteUser(hadoopUserName);
|
||||
}
|
||||
|
||||
@ -812,7 +812,7 @@ public class SystemInfoService {
|
||||
|
||||
public void checkAvailableCapacity() throws DdlException {
|
||||
if (getAvailableCapacityB() <= 0L) {
|
||||
throw new DdlException("System has no available disk capacity");
|
||||
throw new DdlException("System has no available disk capacity or no available BE nodes");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -166,7 +166,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
|
||||
public String getFsName() {
|
||||
TFileType fileType = getTFileType();
|
||||
if (fileType == TFileType.FILE_HDFS) {
|
||||
return locationProperties.get(HdfsTableValuedFunction.HADOOP_FS_NAME);
|
||||
return locationProperties.get(HdfsResource.HADOOP_FS_NAME);
|
||||
} else if (fileType == TFileType.FILE_S3) {
|
||||
return locationProperties.get(S3Properties.ENDPOINT);
|
||||
}
|
||||
@ -445,7 +445,7 @@ public abstract class ExternalFileTableValuedFunction extends TableValuedFunctio
|
||||
fileScanRangeParams.setFileAttributes(getFileAttributes());
|
||||
if (getTFileType() == TFileType.FILE_HDFS) {
|
||||
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(locationProperties);
|
||||
String fsNmae = getLocationProperties().get(HdfsTableValuedFunction.HADOOP_FS_NAME);
|
||||
String fsNmae = getLocationProperties().get(HdfsResource.HADOOP_FS_NAME);
|
||||
tHdfsParams.setFsName(fsNmae);
|
||||
fileScanRangeParams.setHdfsParams(tHdfsParams);
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.tablefunction;
|
||||
import org.apache.doris.analysis.BrokerDesc;
|
||||
import org.apache.doris.analysis.ExportStmt;
|
||||
import org.apache.doris.analysis.StorageBackend.StorageType;
|
||||
import org.apache.doris.catalog.HdfsResource;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.util.URI;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
@ -41,24 +42,17 @@ public class HdfsTableValuedFunction extends ExternalFileTableValuedFunction {
|
||||
|
||||
public static final String NAME = "hdfs";
|
||||
public static final String HDFS_URI = "uri";
|
||||
public static String HADOOP_FS_NAME = "fs.defaultFS";
|
||||
// simple or kerberos
|
||||
public static String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
|
||||
public static String HADOOP_USER_NAME = "hadoop.username";
|
||||
public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
|
||||
public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
|
||||
public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
|
||||
public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
|
||||
|
||||
private static final ImmutableSet<String> LOCATION_PROPERTIES = new ImmutableSet.Builder<String>()
|
||||
.add(HDFS_URI)
|
||||
.add(HADOOP_SECURITY_AUTHENTICATION)
|
||||
.add(HADOOP_FS_NAME)
|
||||
.add(HADOOP_USER_NAME)
|
||||
.add(HADOOP_KERBEROS_PRINCIPAL)
|
||||
.add(HADOOP_KERBEROS_KEYTAB)
|
||||
.add(HADOOP_SHORT_CIRCUIT)
|
||||
.add(HADOOP_SOCKET_PATH)
|
||||
.add(HdfsResource.HADOOP_SECURITY_AUTHENTICATION)
|
||||
.add(HdfsResource.HADOOP_FS_NAME)
|
||||
.add(HdfsResource.HADOOP_USER_NAME)
|
||||
.add(HdfsResource.HADOOP_KERBEROS_PRINCIPAL)
|
||||
.add(HdfsResource.HADOOP_KERBEROS_KEYTAB)
|
||||
.add(HdfsResource.HADOOP_SHORT_CIRCUIT)
|
||||
.add(HdfsResource.HADOOP_SOCKET_PATH)
|
||||
.build();
|
||||
|
||||
private URI hdfsUri;
|
||||
@ -72,8 +66,8 @@ public class HdfsTableValuedFunction extends ExternalFileTableValuedFunction {
|
||||
fileFormatParams.put(key, params.get(key));
|
||||
} else {
|
||||
// because HADOOP_FS_NAME contains upper and lower case
|
||||
if (HADOOP_FS_NAME.equalsIgnoreCase(key)) {
|
||||
locationProperties.put(HADOOP_FS_NAME, params.get(key));
|
||||
if (HdfsResource.HADOOP_FS_NAME.equalsIgnoreCase(key)) {
|
||||
locationProperties.put(HdfsResource.HADOOP_FS_NAME, params.get(key));
|
||||
} else {
|
||||
locationProperties.put(key, params.get(key));
|
||||
}
|
||||
@ -85,7 +79,7 @@ public class HdfsTableValuedFunction extends ExternalFileTableValuedFunction {
|
||||
}
|
||||
ExportStmt.checkPath(locationProperties.get(HDFS_URI), StorageType.HDFS);
|
||||
hdfsUri = URI.create(locationProperties.get(HDFS_URI));
|
||||
filePath = locationProperties.get(HADOOP_FS_NAME) + hdfsUri.getPath();
|
||||
filePath = locationProperties.get(HdfsResource.HADOOP_FS_NAME) + hdfsUri.getPath();
|
||||
|
||||
parseProperties(fileFormatParams);
|
||||
parseFile();
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.datasource.property;
|
||||
import org.apache.doris.analysis.CreateCatalogStmt;
|
||||
import org.apache.doris.analysis.CreateRepositoryStmt;
|
||||
import org.apache.doris.analysis.CreateResourceStmt;
|
||||
import org.apache.doris.analysis.DropCatalogStmt;
|
||||
import org.apache.doris.analysis.OutFileClause;
|
||||
import org.apache.doris.analysis.QueryStmt;
|
||||
import org.apache.doris.analysis.SelectStmt;
|
||||
@ -29,10 +30,17 @@ import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Resource;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.FeMetaVersion;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.jmockit.Deencapsulation;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.property.constants.CosProperties;
|
||||
import org.apache.doris.datasource.property.constants.MinioProperties;
|
||||
import org.apache.doris.datasource.property.constants.ObsProperties;
|
||||
import org.apache.doris.datasource.property.constants.OssProperties;
|
||||
import org.apache.doris.datasource.property.constants.S3Properties;
|
||||
import org.apache.doris.meta.MetaContext;
|
||||
import org.apache.doris.tablefunction.S3TableValuedFunction;
|
||||
import org.apache.doris.thrift.TFileFormatType;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
@ -41,11 +49,18 @@ import com.google.common.collect.ImmutableList;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
|
||||
public class PropertyConverterTest extends TestWithFeService {
|
||||
|
||||
private final Set<String> checkSet = new HashSet<>();
|
||||
private final Map<String, String> expectedCredential = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
|
||||
|
||||
@Override
|
||||
protected void runBeforeAll() throws Exception {
|
||||
createDorisCluster();
|
||||
@ -53,6 +68,12 @@ public class PropertyConverterTest extends TestWithFeService {
|
||||
useDatabase("mock_db");
|
||||
createTable("create table mock_tbl1 \n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n"
|
||||
+ "properties('replication_num' = '1');");
|
||||
|
||||
List<String> withoutPrefix = ImmutableList.of("endpoint", "access_key", "secret_key");
|
||||
checkSet.addAll(withoutPrefix);
|
||||
checkSet.addAll(S3Properties.Env.REQUIRED_FIELDS);
|
||||
expectedCredential.put("access_key", "akk");
|
||||
expectedCredential.put("secret_key", "skk");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -130,6 +151,7 @@ public class PropertyConverterTest extends TestWithFeService {
|
||||
Resource newResource = Resource.fromStmt(analyzedResourceStmtNew);
|
||||
// will add converted properties
|
||||
Assertions.assertEquals(newResource.getCopiedProperties().size(), 14);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -272,8 +294,9 @@ public class PropertyConverterTest extends TestWithFeService {
|
||||
+ " 'aws.glue.secret-key' = 'skk',\n"
|
||||
+ " 'aws.region' = 'us-east-1'\n"
|
||||
+ ");";
|
||||
String catalogName = "hms_glue_old";
|
||||
CreateCatalogStmt analyzedStmt = createStmt(queryOld);
|
||||
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, "hms_glue_old");
|
||||
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, catalogName);
|
||||
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
|
||||
Assertions.assertEquals(properties.size(), 20);
|
||||
|
||||
@ -288,8 +311,9 @@ public class PropertyConverterTest extends TestWithFeService {
|
||||
+ " 'glue.access_key' = 'akk',\n"
|
||||
+ " 'glue.secret_key' = 'skk'\n"
|
||||
+ ");";
|
||||
catalogName = "hms_glue";
|
||||
CreateCatalogStmt analyzedStmtNew = createStmt(query);
|
||||
HMSExternalCatalog catalogNew = createAndGetCatalog(analyzedStmtNew, "hms_glue");
|
||||
HMSExternalCatalog catalogNew = createAndGetCatalog(analyzedStmtNew, catalogName);
|
||||
Map<String, String> propertiesNew = catalogNew.getCatalogProperty().getProperties();
|
||||
Assertions.assertEquals(propertiesNew.size(), 20);
|
||||
|
||||
@ -313,43 +337,78 @@ public class PropertyConverterTest extends TestWithFeService {
|
||||
|
||||
Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
|
||||
Assertions.assertEquals(hdProps.size(), 16);
|
||||
|
||||
Map<String, String> expectedMetaProperties = new HashMap<>();
|
||||
expectedMetaProperties.put("endpoint", "obs.cn-north-4.myhuaweicloud.com");
|
||||
expectedMetaProperties.put("AWS_ENDPOINT", "obs.cn-north-4.myhuaweicloud.com");
|
||||
expectedMetaProperties.putAll(expectedCredential);
|
||||
checkExpectedProperties(ObsProperties.OBS_PREFIX, properties, expectedMetaProperties);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCOSCatalogPropertiesConverter() throws Exception {
|
||||
String query = "create catalog hms_cos properties (\n"
|
||||
public void testS3CompatibleCatalogPropertiesConverter() throws Exception {
|
||||
String catalogName0 = "hms_cos";
|
||||
String query0 = "create catalog " + catalogName0 + " properties (\n"
|
||||
+ " 'type'='hms',\n"
|
||||
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
|
||||
+ " 'cos.endpoint' = 'cos.ap-beijing.myqcloud.com',\n"
|
||||
+ " 'cos.access_key' = 'akk',\n"
|
||||
+ " 'cos.secret_key' = 'skk',\n"
|
||||
+ " 'enable.self.splitter'='true'\n"
|
||||
+ " 'cos.secret_key' = 'skk'\n"
|
||||
+ ");";
|
||||
CreateCatalogStmt analyzedStmt = createStmt(query);
|
||||
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, "hms_cos");
|
||||
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
|
||||
Assertions.assertEquals(properties.size(), 12);
|
||||
testS3CompatibleCatalogProperties(catalogName0, CosProperties.COS_PREFIX,
|
||||
"cos.ap-beijing.myqcloud.com", query0);
|
||||
|
||||
Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
|
||||
Assertions.assertEquals(hdProps.size(), 24);
|
||||
String catalogName1 = "hms_oss";
|
||||
String query1 = "create catalog " + catalogName1 + " properties (\n"
|
||||
+ " 'type'='hms',\n"
|
||||
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
|
||||
+ " 'oss.endpoint' = 'oss.oss-cn-beijing.aliyuncs.com',\n"
|
||||
+ " 'oss.access_key' = 'akk',\n"
|
||||
+ " 'oss.secret_key' = 'skk'\n"
|
||||
+ ");";
|
||||
testS3CompatibleCatalogProperties(catalogName1, OssProperties.OSS_PREFIX,
|
||||
"oss.oss-cn-beijing.aliyuncs.com", query1);
|
||||
|
||||
String catalogName2 = "hms_minio";
|
||||
String query2 = "create catalog " + catalogName2 + " properties (\n"
|
||||
+ " 'type'='hms',\n"
|
||||
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
|
||||
+ " 'minio.endpoint' = 'http://127.0.0.1',\n"
|
||||
+ " 'minio.access_key' = 'akk',\n"
|
||||
+ " 'minio.secret_key' = 'skk'\n"
|
||||
+ ");";
|
||||
testS3CompatibleCatalogProperties(catalogName2, MinioProperties.MINIO_PREFIX,
|
||||
"http://127.0.0.1", query2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOSSCatalogPropertiesConverter() throws Exception {
|
||||
String query = "create catalog hms_oss properties (\n"
|
||||
+ " 'type'='hms',\n"
|
||||
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
|
||||
+ " 'oss.endpoint' = 'oss.oss-cn-beijing.aliyuncs.com',\n"
|
||||
+ " 'oss.access_key' = 'akk',\n"
|
||||
+ " 'oss.secret_key' = 'skk'\n"
|
||||
+ ");";
|
||||
CreateCatalogStmt analyzedStmt = createStmt(query);
|
||||
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, "hms_oss");
|
||||
private void testS3CompatibleCatalogProperties(String catalogName, String prefix,
|
||||
String endpoint, String sql) throws Exception {
|
||||
Env.getCurrentEnv().getCatalogMgr().dropCatalog(new DropCatalogStmt(true, catalogName));
|
||||
CreateCatalogStmt analyzedStmt = createStmt(sql);
|
||||
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, catalogName);
|
||||
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
|
||||
Assertions.assertEquals(properties.size(), 11);
|
||||
|
||||
Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
|
||||
Assertions.assertEquals(hdProps.size(), 23);
|
||||
Assertions.assertEquals(hdProps.size(), 20);
|
||||
|
||||
Map<String, String> expectedMetaProperties = new HashMap<>();
|
||||
expectedMetaProperties.put("endpoint", endpoint);
|
||||
expectedMetaProperties.put("AWS_ENDPOINT", endpoint);
|
||||
expectedMetaProperties.putAll(expectedCredential);
|
||||
checkExpectedProperties(prefix, properties, expectedMetaProperties);
|
||||
}
|
||||
|
||||
private void checkExpectedProperties(String prefix, Map<String, String> properties,
|
||||
Map<String, String> expectedProperties) {
|
||||
properties.forEach((key, value) -> {
|
||||
if (key.startsWith(prefix)) {
|
||||
String keyToCheck = key.replace(prefix, "");
|
||||
if (checkSet.contains(keyToCheck)) {
|
||||
Assertions.assertEquals(value, expectedProperties.get(keyToCheck));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static HMSExternalCatalog createAndGetCatalog(CreateCatalogStmt analyzedStmt, String name)
|
||||
@ -357,4 +416,12 @@ public class PropertyConverterTest extends TestWithFeService {
|
||||
Env.getCurrentEnv().getCatalogMgr().createCatalog(analyzedStmt);
|
||||
return (HMSExternalCatalog) Env.getCurrentEnv().getCatalogMgr().getCatalog(name);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSerialization() throws Exception {
|
||||
MetaContext metaContext = new MetaContext();
|
||||
metaContext.setMetaVersion(FeMetaVersion.VERSION_CURRENT);
|
||||
metaContext.setThreadLocalInfo();
|
||||
}
|
||||
}
|
||||
|
||||
@ -26,8 +26,7 @@ import org.junit.Test;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.nio.file.Files;
|
||||
|
||||
public class DropDbInfoTest {
|
||||
@Test
|
||||
@ -39,7 +38,7 @@ public class DropDbInfoTest {
|
||||
// 1. Write objects to file
|
||||
File file = new File("./dropDbInfo");
|
||||
file.createNewFile();
|
||||
DataOutputStream dos = new DataOutputStream(new FileOutputStream(file));
|
||||
DataOutputStream dos = new DataOutputStream(Files.newOutputStream(file.toPath()));
|
||||
|
||||
DropDbInfo info1 = new DropDbInfo();
|
||||
info1.write(dos);
|
||||
@ -51,7 +50,7 @@ public class DropDbInfoTest {
|
||||
dos.close();
|
||||
|
||||
// 2. Read objects from file
|
||||
DataInputStream dis = new DataInputStream(new FileInputStream(file));
|
||||
DataInputStream dis = new DataInputStream(Files.newInputStream(file.toPath()));
|
||||
|
||||
DropDbInfo rInfo1 = DropDbInfo.read(dis);
|
||||
Assert.assertEquals(rInfo1, info1);
|
||||
|
||||
Reference in New Issue
Block a user