[fix](multi-catalog) add max compute custom odps and tunnel url (#31390)

add max compute custom odps and tunnel url
This commit is contained in:
slothever
2024-02-29 15:30:29 +08:00
committed by yiguolei
parent b9a87c63f7
commit 0b5b7175d6
11 changed files with 131 additions and 17 deletions

View File

@ -197,6 +197,8 @@ MaxComputeTableDescriptor::MaxComputeTableDescriptor(const TTableDescriptor& tde
_region(tdesc.mcTable.region),
_project(tdesc.mcTable.project),
_table(tdesc.mcTable.table),
_odps_url(tdesc.mcTable.odps_url),
_tunnel_url(tdesc.mcTable.tunnel_url),
_access_key(tdesc.mcTable.access_key),
_secret_key(tdesc.mcTable.secret_key),
_public_access(tdesc.mcTable.public_access) {}

View File

@ -238,6 +238,8 @@ public:
const std::string region() const { return _region; }
const std::string project() const { return _project; }
const std::string table() const { return _table; }
const std::string odps_url() const { return _odps_url; }
const std::string tunnel_url() const { return _tunnel_url; }
const std::string access_key() const { return _access_key; }
const std::string secret_key() const { return _secret_key; }
const std::string public_access() const { return _public_access; }
@ -246,6 +248,8 @@ private:
std::string _region;
std::string _project;
std::string _table;
std::string _odps_url;
std::string _tunnel_url;
std::string _access_key;
std::string _secret_key;
std::string _public_access;

View File

@ -66,6 +66,8 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des
index++;
}
std::map<String, String> params = {{"region", _table_desc->region()},
{"odps_url", _table_desc->odps_url()},
{"tunnel_url", _table_desc->tunnel_url()},
{"access_key", _table_desc->access_key()},
{"secret_key", _table_desc->secret_key()},
{"project", _table_desc->project()},

View File

@ -61,4 +61,29 @@ Pay-as-you-go quota has limited concurrency and usage. For additional resources,
Consistent with Hive Catalog, please refer to the **column type mapping** section in [Hive Catalog](./hive.md).
## User-defined service address
The region property is specified to generate a default endpoint of public network.
In addition to default endpoint addresses, Max Compute Catalog also supports custom service addresses in properties.
Use the following properties:
* `mc.odps_endpoint`:Max Compute Endpoint。
* `mc.tunnel_endpoint`: Tunnel Endpoint,Max Compute Catalog uses the Tunnel SDK to obtain data.
For more information about Max Compute Endpoint and Tunnel Endpoint that are used in different regions and network connection modes, see [Endpoint](https://www.alibabacloud.com/help/en/maxcompute/user-guide/endpoints)
For example:
```sql
CREATE CATALOG mc PROPERTIES (
"type" = "max_compute",
"mc.region" = "cn-beijing",
"mc.default.project" = "your-project",
"mc.access_key" = "ak",
"mc.secret_key" = "sk"
"mc.odps_endpoint" = "http://service.cn-beijing.maxcompute.aliyun-inc.com/api",
"mc.tunnel_endpoint" = "http://dt.cn-beijing.maxcompute.aliyun-inc.com"
);
```

View File

@ -61,4 +61,30 @@ CREATE CATALOG mc PROPERTIES (
和 Hive Catalog 一致,可参阅 [Hive Catalog](./hive.md) 中 **列类型映射** 一节。
## 自定义服务地址
默认情况下,Max Compute Catalog根据region去默认生成公网的endpoint。
除了默认的endpoint地址外,Max Compute Catalog也支持在属性中自定义服务地址。
使用以下两个属性:
* `mc.odps_endpoint`:Max Compute Endpoint。
* `mc.tunnel_endpoint`: Tunnel Endpoint,Max Compute Catalog使用Tunnel SDK获取数据。
Max Compute Endpoint和Tunnel Endpoint的配置请参见[各地域及不同网络连接方式下的Endpoint](https://help.aliyun.com/zh/maxcompute/user-guide/endpoints)
示例:
```sql
CREATE CATALOG mc PROPERTIES (
"type" = "max_compute",
"mc.region" = "cn-beijing",
"mc.default.project" = "your-project",
"mc.access_key" = "ak",
"mc.secret_key" = "sk"
"mc.odps_endpoint" = "http://service.cn-beijing.maxcompute.aliyun-inc.com/api",
"mc.tunnel_endpoint" = "http://dt.cn-beijing.maxcompute.aliyun-inc.com"
);
```

View File

@ -58,6 +58,8 @@ public class MaxComputeJniScanner extends JniScanner {
private static final String TABLE = "table";
private static final String ACCESS_KEY = "access_key";
private static final String SECRET_KEY = "secret_key";
private static final String ODPS_URL = "odps_url";
private static final String TUNNEL_URL = "tunnel_url";
private static final String START_OFFSET = "start_offset";
private static final String SPLIT_SIZE = "split_size";
private static final String PUBLIC_ACCESS = "public_access";
@ -122,7 +124,8 @@ public class MaxComputeJniScanner extends JniScanner {
String accessKey = Objects.requireNonNull(params.get(ACCESS_KEY), "required property '" + ACCESS_KEY + "'.");
String secretKey = Objects.requireNonNull(params.get(SECRET_KEY), "required property '" + SECRET_KEY + "'.");
boolean enablePublicAccess = Boolean.parseBoolean(params.getOrDefault(PUBLIC_ACCESS, "false"));
return new MaxComputeTableScan(region, project, table, accessKey, secretKey, enablePublicAccess);
return new MaxComputeTableScan(params.get(ODPS_URL), params.get(TUNNEL_URL), region, project, table,
accessKey, secretKey, enablePublicAccess);
}
public String tableUniqKey() {

View File

@ -23,6 +23,7 @@ import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
@ -39,20 +40,40 @@ public class MaxComputeTableScan {
private volatile long readRows = 0;
private long totalRows = 0;
public MaxComputeTableScan(String region, String project, String table,
public MaxComputeTableScan(String odpsUrl, String tunnelUrl, String region, String project, String table,
String accessKey, String secretKey, boolean enablePublicAccess) {
this.project = project;
this.table = table;
odps = new Odps(new AliyunAccount(accessKey, secretKey));
String odpsUrl = odpsUrlTemplate.replace("{}", region);
String tunnelUrl = tunnelUrlTemplate.replace("{}", region);
if (enablePublicAccess) {
odpsUrl = odpsUrl.replace("-inc", "");
tunnelUrl = tunnelUrl.replace("-inc", "");
}
odps.setEndpoint(odpsUrl);
setOdpsUrl(odpsUrl, region, enablePublicAccess);
odps.setDefaultProject(this.project);
tunnel = new TableTunnel(odps);
setTunnelUrl(tunnelUrl, region, enablePublicAccess);
}
private void setOdpsUrl(String defaultOdpsUrl, String region, boolean enablePublicAccess) {
String odpsUrl;
if (StringUtils.isNotEmpty(defaultOdpsUrl)) {
odpsUrl = defaultOdpsUrl;
} else {
odpsUrl = odpsUrlTemplate.replace("{}", region);
if (enablePublicAccess) {
odpsUrl = odpsUrl.replace("-inc", "");
}
}
odps.setEndpoint(odpsUrl);
}
private void setTunnelUrl(String defaultTunnelUrl, String region, boolean enablePublicAccess) {
String tunnelUrl;
if (StringUtils.isNotEmpty(defaultTunnelUrl)) {
tunnelUrl = defaultTunnelUrl;
} else {
tunnelUrl = tunnelUrlTemplate.replace("{}", region);
if (enablePublicAccess) {
tunnelUrl = tunnelUrl.replace("-inc", "");
}
}
tunnel.setEndpoint(tunnelUrl);
}

View File

@ -35,6 +35,7 @@ import com.aliyun.odps.tunnel.TableTunnel;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Iterator;
@ -55,6 +56,8 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
private boolean enablePublicAccess;
private static final String odpsUrlTemplate = "http://service.{}.maxcompute.aliyun-inc.com/api";
private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun-inc.com";
private static String odpsUrl;
private static String tunnelUrl;
private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
MCProperties.REGION,
MCProperties.PROJECT
@ -64,6 +67,8 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
String comment) {
super(catalogId, name, InitCatalogLog.Type.MAX_COMPUTE, comment);
catalogProperty = new CatalogProperty(resource, props);
odpsUrl = props.getOrDefault(MCProperties.ODPS_ENDPOINT, "");
tunnelUrl = props.getOrDefault(MCProperties.TUNNEL_SDK_ENDPOINT, "");
}
@Override
@ -92,16 +97,28 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
Account account = new AliyunAccount(accessKey, secretKey);
this.odps = new Odps(account);
enablePublicAccess = Boolean.parseBoolean(props.getOrDefault(MCProperties.PUBLIC_ACCESS, "false"));
String odpsUrl = odpsUrlTemplate.replace("{}", region);
if (enablePublicAccess) {
odpsUrl = odpsUrl.replace("-inc", "");
}
odps.setEndpoint(odpsUrl);
setOdpsUrl(region);
odps.setDefaultProject(defaultProject);
tunnel = new TableTunnel(odps);
String tunnelUrl = tunnelUrlTemplate.replace("{}", region);
if (enablePublicAccess) {
tunnelUrl = tunnelUrl.replace("-inc", "");
setTunnelUrl(region);
}
private void setOdpsUrl(String region) {
if (StringUtils.isEmpty(odpsUrl)) {
odpsUrl = odpsUrlTemplate.replace("{}", region);
if (enablePublicAccess) {
odpsUrl = odpsUrl.replace("-inc", "");
}
}
odps.setEndpoint(odpsUrl);
}
private void setTunnelUrl(String region) {
if (StringUtils.isEmpty(tunnelUrl)) {
tunnelUrl = tunnelUrlTemplate.replace("{}", region);
if (enablePublicAccess) {
tunnelUrl = tunnelUrl.replace("-inc", "");
}
}
tunnel.setEndpoint(tunnelUrl);
}
@ -215,4 +232,12 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
}
}
}
public String getOdpsUrl() {
return odpsUrl;
}
public String getTunnelUrl() {
return tunnelUrl;
}
}

View File

@ -269,6 +269,8 @@ public class MaxComputeExternalTable extends ExternalTable {
tMcTable.setRegion(mcCatalog.getRegion());
tMcTable.setAccessKey(mcCatalog.getAccessKey());
tMcTable.setSecretKey(mcCatalog.getSecretKey());
tMcTable.setOdpsUrl(mcCatalog.getOdpsUrl());
tMcTable.setTunnelUrl(mcCatalog.getTunnelUrl());
tMcTable.setPublicAccess(String.valueOf(mcCatalog.enablePublicAccess()));
// use mc project as dbName
tMcTable.setProject(dbName);

View File

@ -31,6 +31,8 @@ public class MCProperties extends BaseProperties {
public static final String SECRET_KEY = "mc.secret_key";
public static final String SESSION_TOKEN = "mc.session_token";
public static final String PUBLIC_ACCESS = "mc.public_access";
public static final String ODPS_ENDPOINT = "mc.odps_endpoint";
public static final String TUNNEL_SDK_ENDPOINT = "mc.tunnel_endpoint";
public static CloudCredential getCredential(Map<String, String> props) {
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);

View File

@ -339,6 +339,8 @@ struct TMCTable {
4: optional string access_key
5: optional string secret_key
6: optional string public_access
7: optional string odps_url
8: optional string tunnel_url
}
// "Union" of all table types.