[fix](multi-catalog)add oss sdk, supported oss properties (#21029)
This commit is contained in:
@ -34,6 +34,9 @@ under the License.
|
||||
<fe_ut_parallel>1</fe_ut_parallel>
|
||||
<antlr4.version>4.9.3</antlr4.version>
|
||||
<awssdk.version>2.17.257</awssdk.version>
|
||||
<huaweiobs.version>3.1.1-hw-45</huaweiobs.version>
|
||||
<tencentcos.version>3.3.5</tencentcos.version>
|
||||
<aliyunoss.version>3.2.2</aliyunoss.version>
|
||||
</properties>
|
||||
<profiles>
|
||||
<profile>
|
||||
@ -399,7 +402,7 @@ under the License.
|
||||
<dependency>
|
||||
<groupId>com.huaweicloud</groupId>
|
||||
<artifactId>hadoop-huaweicloud</artifactId>
|
||||
<version>3.1.1-hw-45</version>
|
||||
<version>${huaweiobs.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
@ -407,6 +410,16 @@ under the License.
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-aliyun</artifactId>
|
||||
<version>${aliyunoss.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-cos</artifactId>
|
||||
<version>${tencentcos.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.aliyun.odps</groupId>
|
||||
<artifactId>odps-sdk-core</artifactId>
|
||||
|
||||
@ -35,6 +35,9 @@ import com.aliyun.datalake.metastore.common.DataLakeConfig;
|
||||
import com.amazonaws.glue.catalog.util.AWSGlueConfig;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem;
|
||||
import org.apache.hadoop.fs.cosn.CosNConfigKeys;
|
||||
import org.apache.hadoop.fs.cosn.CosNFileSystem;
|
||||
import org.apache.hadoop.fs.obs.OBSConstants;
|
||||
import org.apache.hadoop.fs.obs.OBSFileSystem;
|
||||
import org.apache.hadoop.fs.s3a.Constants;
|
||||
@ -241,13 +244,46 @@ public class PropertyConverter {
|
||||
}
|
||||
|
||||
private static Map<String, String> convertToOSSProperties(Map<String, String> props, CloudCredential credential) {
|
||||
// Now we use s3 client to access
|
||||
return convertToS3Properties(S3Properties.prefixToS3(props), credential);
|
||||
Map<String, String> ossProperties = Maps.newHashMap();
|
||||
String endpoint = props.get(OssProperties.ENDPOINT);
|
||||
if (endpoint.startsWith(OssProperties.OSS_PREFIX)) {
|
||||
// may use oss.oss-cn-beijing.aliyuncs.com
|
||||
endpoint = endpoint.replace(OssProperties.OSS_PREFIX, "");
|
||||
}
|
||||
ossProperties.put(org.apache.hadoop.fs.aliyun.oss.Constants.ENDPOINT_KEY, endpoint);
|
||||
ossProperties.put("fs.oss.impl.disable.cache", "true");
|
||||
ossProperties.put("fs.oss.impl", AliyunOSSFileSystem.class.getName());
|
||||
if (credential.isWhole()) {
|
||||
ossProperties.put(org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_ID, credential.getAccessKey());
|
||||
ossProperties.put(org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET, credential.getSecretKey());
|
||||
}
|
||||
if (credential.isTemporary()) {
|
||||
ossProperties.put(org.apache.hadoop.fs.aliyun.oss.Constants.SECURITY_TOKEN, credential.getSessionToken());
|
||||
}
|
||||
for (Map.Entry<String, String> entry : props.entrySet()) {
|
||||
if (entry.getKey().startsWith(OssProperties.OSS_FS_PREFIX)) {
|
||||
ossProperties.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
return ossProperties;
|
||||
}
|
||||
|
||||
private static Map<String, String> convertToCOSProperties(Map<String, String> props, CloudCredential credential) {
|
||||
// Now we use s3 client to access
|
||||
return convertToS3Properties(S3Properties.prefixToS3(props), credential);
|
||||
Map<String, String> cosProperties = Maps.newHashMap();
|
||||
cosProperties.put(CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY, props.get(CosProperties.ENDPOINT));
|
||||
cosProperties.put("fs.cosn.impl.disable.cache", "true");
|
||||
cosProperties.put("fs.cosn.impl", CosNFileSystem.class.getName());
|
||||
if (credential.isWhole()) {
|
||||
cosProperties.put(CosNConfigKeys.COSN_SECRET_ID_KEY, credential.getAccessKey());
|
||||
cosProperties.put(CosNConfigKeys.COSN_SECRET_KEY_KEY, credential.getSecretKey());
|
||||
}
|
||||
// session token is unsupported
|
||||
for (Map.Entry<String, String> entry : props.entrySet()) {
|
||||
if (entry.getKey().startsWith(CosProperties.COS_FS_PREFIX)) {
|
||||
cosProperties.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
return cosProperties;
|
||||
}
|
||||
|
||||
private static Map<String, String> convertToMinioProperties(Map<String, String> props, CloudCredential credential) {
|
||||
|
||||
@ -321,30 +321,6 @@ public class PropertyConverterTest extends TestWithFeService {
|
||||
Assertions.assertEquals(hdPropsNew.size(), 29);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOBSCatalogPropertiesConverter() throws Exception {
|
||||
String query = "create catalog hms_obs properties (\n"
|
||||
+ " 'type'='hms',\n"
|
||||
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
|
||||
+ " 'obs.endpoint' = 'obs.cn-north-4.myhuaweicloud.com',\n"
|
||||
+ " 'obs.access_key' = 'akk',\n"
|
||||
+ " 'obs.secret_key' = 'skk'\n"
|
||||
+ ");";
|
||||
CreateCatalogStmt analyzedStmt = createStmt(query);
|
||||
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, "hms_obs");
|
||||
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
|
||||
Assertions.assertEquals(properties.size(), 11);
|
||||
|
||||
Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
|
||||
Assertions.assertEquals(hdProps.size(), 16);
|
||||
|
||||
Map<String, String> expectedMetaProperties = new HashMap<>();
|
||||
expectedMetaProperties.put("endpoint", "obs.cn-north-4.myhuaweicloud.com");
|
||||
expectedMetaProperties.put("AWS_ENDPOINT", "obs.cn-north-4.myhuaweicloud.com");
|
||||
expectedMetaProperties.putAll(expectedCredential);
|
||||
checkExpectedProperties(ObsProperties.OBS_PREFIX, properties, expectedMetaProperties);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testS3CompatibleCatalogPropertiesConverter() throws Exception {
|
||||
String catalogName0 = "hms_cos";
|
||||
@ -356,7 +332,7 @@ public class PropertyConverterTest extends TestWithFeService {
|
||||
+ " 'cos.secret_key' = 'skk'\n"
|
||||
+ ");";
|
||||
testS3CompatibleCatalogProperties(catalogName0, CosProperties.COS_PREFIX,
|
||||
"cos.ap-beijing.myqcloud.com", query0);
|
||||
"cos.ap-beijing.myqcloud.com", query0, 11, 16);
|
||||
|
||||
String catalogName1 = "hms_oss";
|
||||
String query1 = "create catalog " + catalogName1 + " properties (\n"
|
||||
@ -367,7 +343,7 @@ public class PropertyConverterTest extends TestWithFeService {
|
||||
+ " 'oss.secret_key' = 'skk'\n"
|
||||
+ ");";
|
||||
testS3CompatibleCatalogProperties(catalogName1, OssProperties.OSS_PREFIX,
|
||||
"oss.oss-cn-beijing.aliyuncs.com", query1);
|
||||
"oss.oss-cn-beijing.aliyuncs.com", query1, 11, 16);
|
||||
|
||||
String catalogName2 = "hms_minio";
|
||||
String query2 = "create catalog " + catalogName2 + " properties (\n"
|
||||
@ -378,19 +354,31 @@ public class PropertyConverterTest extends TestWithFeService {
|
||||
+ " 'minio.secret_key' = 'skk'\n"
|
||||
+ ");";
|
||||
testS3CompatibleCatalogProperties(catalogName2, MinioProperties.MINIO_PREFIX,
|
||||
"http://127.0.0.1", query2);
|
||||
"http://127.0.0.1", query2, 11, 20);
|
||||
|
||||
String catalogName3 = "hms_obs";
|
||||
String query3 = "create catalog hms_obs properties (\n"
|
||||
+ " 'type'='hms',\n"
|
||||
+ " 'hive.metastore.uris' = 'thrift://172.21.0.1:7004',\n"
|
||||
+ " 'obs.endpoint' = 'obs.cn-north-4.myhuaweicloud.com',\n"
|
||||
+ " 'obs.access_key' = 'akk',\n"
|
||||
+ " 'obs.secret_key' = 'skk'\n"
|
||||
+ ");";
|
||||
testS3CompatibleCatalogProperties(catalogName3, ObsProperties.OBS_PREFIX,
|
||||
"obs.cn-north-4.myhuaweicloud.com", query3, 11, 16);
|
||||
}
|
||||
|
||||
private void testS3CompatibleCatalogProperties(String catalogName, String prefix,
|
||||
String endpoint, String sql) throws Exception {
|
||||
String endpoint, String sql,
|
||||
int catalogPropsSize, int bePropsSize) throws Exception {
|
||||
Env.getCurrentEnv().getCatalogMgr().dropCatalog(new DropCatalogStmt(true, catalogName));
|
||||
CreateCatalogStmt analyzedStmt = createStmt(sql);
|
||||
HMSExternalCatalog catalog = createAndGetCatalog(analyzedStmt, catalogName);
|
||||
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
|
||||
Assertions.assertEquals(properties.size(), 11);
|
||||
Assertions.assertEquals(properties.size(), catalogPropsSize);
|
||||
|
||||
Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
|
||||
Assertions.assertEquals(hdProps.size(), 20);
|
||||
Assertions.assertEquals(hdProps.size(), bePropsSize);
|
||||
|
||||
Map<String, String> expectedMetaProperties = new HashMap<>();
|
||||
expectedMetaProperties.put("endpoint", endpoint);
|
||||
|
||||
Reference in New Issue
Block a user