[Enhancement](MaxCompute)Refactoring maxCompute catalog using Storage API.(#40225 , #40888 ,#41386 ) (#41610)
bp #40225 , #40888 ,#41386 ## Proposed changes Among them, #40225 is the new api of mc, #40888 is used to fix the bug when reading null between the new and old apis, #41386 is used for compatibility between the new and old versions
This commit is contained in:
@ -727,6 +727,39 @@ public class DateLiteral extends LiteralExpr {
|
||||
return new String(dateTimeChars, 0, 19);
|
||||
}
|
||||
|
||||
public String getStringValue(Type type) {
|
||||
char[] dateTimeChars = new char[26]; // Enough to hold "YYYY-MM-DD HH:MM:SS.mmmmmm"
|
||||
|
||||
// Populate the date part
|
||||
fillPaddedValue(dateTimeChars, 0, year, 4);
|
||||
dateTimeChars[4] = '-';
|
||||
fillPaddedValue(dateTimeChars, 5, month, 2);
|
||||
dateTimeChars[7] = '-';
|
||||
fillPaddedValue(dateTimeChars, 8, day, 2);
|
||||
|
||||
if (type.isDate() || type.isDateV2()) {
|
||||
return new String(dateTimeChars, 0, 10);
|
||||
}
|
||||
|
||||
// Populate the time part
|
||||
dateTimeChars[10] = ' ';
|
||||
fillPaddedValue(dateTimeChars, 11, hour, 2);
|
||||
dateTimeChars[13] = ':';
|
||||
fillPaddedValue(dateTimeChars, 14, minute, 2);
|
||||
dateTimeChars[16] = ':';
|
||||
fillPaddedValue(dateTimeChars, 17, second, 2);
|
||||
|
||||
if (type.isDatetimeV2()) {
|
||||
int scale = ((ScalarType) type).getScalarScale();
|
||||
long scaledMicroseconds = (long) (microsecond / SCALE_FACTORS[scale]);
|
||||
dateTimeChars[19] = '.';
|
||||
fillPaddedValue(dateTimeChars, 20, (int) scaledMicroseconds, scale);
|
||||
return new String(dateTimeChars, 0, 20 + scale);
|
||||
}
|
||||
|
||||
return new String(dateTimeChars, 0, 19);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getStringValueForArray(FormatOptions options) {
|
||||
return options.getNestedStringWrapper() + getStringValue() + options.getNestedStringWrapper();
|
||||
|
||||
@ -29,103 +29,174 @@ import org.apache.doris.datasource.property.constants.MCProperties;
|
||||
import com.aliyun.odps.Odps;
|
||||
import com.aliyun.odps.OdpsException;
|
||||
import com.aliyun.odps.Partition;
|
||||
import com.aliyun.odps.Project;
|
||||
import com.aliyun.odps.account.Account;
|
||||
import com.aliyun.odps.account.AliyunAccount;
|
||||
import com.aliyun.odps.tunnel.TableTunnel;
|
||||
import com.google.common.base.Strings;
|
||||
import com.aliyun.odps.security.SecurityManager;
|
||||
import com.aliyun.odps.table.configuration.SplitOptions;
|
||||
import com.aliyun.odps.table.enviroment.Credentials;
|
||||
import com.aliyun.odps.table.enviroment.EnvironmentSettings;
|
||||
import com.aliyun.odps.utils.StringUtils;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import com.google.gson.JsonObject;
|
||||
import com.google.gson.JsonParser;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.time.ZoneId;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class MaxComputeExternalCatalog extends ExternalCatalog {
|
||||
private static final Logger LOG = Logger.getLogger(MaxComputeExternalCatalog.class);
|
||||
|
||||
// you can ref : https://help.aliyun.com/zh/maxcompute/user-guide/endpoints
|
||||
private static final String endpointTemplate = "http://service.{}.maxcompute.aliyun-inc.com/api";
|
||||
|
||||
private Odps odps;
|
||||
private TableTunnel tunnel;
|
||||
@SerializedName(value = "region")
|
||||
private String region;
|
||||
@SerializedName(value = "accessKey")
|
||||
private String accessKey;
|
||||
@SerializedName(value = "secretKey")
|
||||
private String secretKey;
|
||||
@SerializedName(value = "publicAccess")
|
||||
private boolean enablePublicAccess;
|
||||
private static final String odpsUrlTemplate = "http://service.{}.maxcompute.aliyun-inc.com/api";
|
||||
private static final String tunnelUrlTemplate = "http://dt.{}.maxcompute.aliyun-inc.com";
|
||||
private static String odpsUrl;
|
||||
private static String tunnelUrl;
|
||||
private String endpoint;
|
||||
private String defaultProject;
|
||||
private String quota;
|
||||
private EnvironmentSettings settings;
|
||||
private String catalogOwner;
|
||||
|
||||
private String splitStrategy;
|
||||
private SplitOptions splitOptions;
|
||||
private long splitRowCount;
|
||||
private long splitByteSize;
|
||||
|
||||
private static final Map<String, ZoneId> REGION_ZONE_MAP;
|
||||
private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
|
||||
MCProperties.REGION,
|
||||
MCProperties.PROJECT
|
||||
MCProperties.PROJECT,
|
||||
MCProperties.ENDPOINT
|
||||
);
|
||||
|
||||
static {
|
||||
Map<String, ZoneId> map = new HashMap<>();
|
||||
|
||||
map.put("cn-hangzhou", ZoneId.of("Asia/Shanghai"));
|
||||
map.put("cn-shanghai", ZoneId.of("Asia/Shanghai"));
|
||||
map.put("cn-shanghai-finance-1", ZoneId.of("Asia/Shanghai"));
|
||||
map.put("cn-beijing", ZoneId.of("Asia/Shanghai"));
|
||||
map.put("cn-north-2-gov-1", ZoneId.of("Asia/Shanghai"));
|
||||
map.put("cn-zhangjiakou", ZoneId.of("Asia/Shanghai"));
|
||||
map.put("cn-wulanchabu", ZoneId.of("Asia/Shanghai"));
|
||||
map.put("cn-shenzhen", ZoneId.of("Asia/Shanghai"));
|
||||
map.put("cn-shenzhen-finance-1", ZoneId.of("Asia/Shanghai"));
|
||||
map.put("cn-chengdu", ZoneId.of("Asia/Shanghai"));
|
||||
map.put("cn-hongkong", ZoneId.of("Asia/Shanghai"));
|
||||
map.put("ap-southeast-1", ZoneId.of("Asia/Singapore"));
|
||||
map.put("ap-southeast-2", ZoneId.of("Australia/Sydney"));
|
||||
map.put("ap-southeast-3", ZoneId.of("Asia/Kuala_Lumpur"));
|
||||
map.put("ap-southeast-5", ZoneId.of("Asia/Jakarta"));
|
||||
map.put("ap-northeast-1", ZoneId.of("Asia/Tokyo"));
|
||||
map.put("eu-central-1", ZoneId.of("Europe/Berlin"));
|
||||
map.put("eu-west-1", ZoneId.of("Europe/London"));
|
||||
map.put("us-west-1", ZoneId.of("America/Los_Angeles"));
|
||||
map.put("us-east-1", ZoneId.of("America/New_York"));
|
||||
map.put("me-east-1", ZoneId.of("Asia/Dubai"));
|
||||
|
||||
REGION_ZONE_MAP = Collections.unmodifiableMap(map);
|
||||
}
|
||||
|
||||
|
||||
public MaxComputeExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
|
||||
String comment) {
|
||||
super(catalogId, name, InitCatalogLog.Type.MAX_COMPUTE, comment);
|
||||
catalogProperty = new CatalogProperty(resource, props);
|
||||
odpsUrl = props.getOrDefault(MCProperties.ODPS_ENDPOINT, "");
|
||||
tunnelUrl = props.getOrDefault(MCProperties.TUNNEL_SDK_ENDPOINT, "");
|
||||
}
|
||||
|
||||
//Compatible with existing catalogs in previous versions.
|
||||
protected void generatorEndpoint() {
|
||||
Map<String, String> props = catalogProperty.getProperties();
|
||||
|
||||
if (props.containsKey(MCProperties.ENDPOINT)) {
|
||||
// This is a new version of the property, so no parsing conversion is required.
|
||||
endpoint = props.get(MCProperties.ENDPOINT);
|
||||
} else if (props.containsKey(MCProperties.TUNNEL_SDK_ENDPOINT)) {
|
||||
// If customized `mc.tunnel_endpoint` before,
|
||||
// need to convert the value of this property because used the `tunnel API` before.
|
||||
String tunnelEndpoint = props.get(MCProperties.TUNNEL_SDK_ENDPOINT);
|
||||
endpoint = tunnelEndpoint.replace("//dt", "//service") + "/api";
|
||||
} else if (props.containsKey(MCProperties.ODPS_ENDPOINT)) {
|
||||
// If you customized `mc.odps_endpoint` before,
|
||||
// this value is equivalent to the new version of `mc.endpoint`, so you can use it directly
|
||||
endpoint = props.get(MCProperties.ODPS_ENDPOINT);
|
||||
} else if (props.containsKey(MCProperties.REGION)) {
|
||||
//Copied from original logic.
|
||||
String region = props.get(MCProperties.REGION);
|
||||
if (region.startsWith("oss-")) {
|
||||
// may use oss-cn-beijing, ensure compatible
|
||||
region = region.replace("oss-", "");
|
||||
}
|
||||
boolean enablePublicAccess = Boolean.parseBoolean(props.getOrDefault(MCProperties.PUBLIC_ACCESS,
|
||||
MCProperties.DEFAULT_PUBLIC_ACCESS));
|
||||
endpoint = endpointTemplate.replace("{}", region);
|
||||
if (enablePublicAccess) {
|
||||
endpoint = endpoint.replace("-inc", "");
|
||||
}
|
||||
}
|
||||
/*
|
||||
Since MCProperties.REGION is a REQUIRED_PROPERTIES in previous versions
|
||||
and MCProperties.ENDPOINT is a REQUIRED_PROPERTIES in current versions,
|
||||
`else {}` is not needed here.
|
||||
*/
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void initLocalObjectsImpl() {
|
||||
Map<String, String> props = catalogProperty.getProperties();
|
||||
String region = props.get(MCProperties.REGION);
|
||||
String defaultProject = props.get(MCProperties.PROJECT);
|
||||
if (Strings.isNullOrEmpty(region)) {
|
||||
throw new IllegalArgumentException("Missing required property '" + MCProperties.REGION + "'.");
|
||||
|
||||
generatorEndpoint();
|
||||
|
||||
defaultProject = props.get(MCProperties.PROJECT);
|
||||
quota = props.getOrDefault(MCProperties.QUOTA, MCProperties.DEFAULT_QUOTA);
|
||||
|
||||
|
||||
splitStrategy = props.getOrDefault(MCProperties.SPLIT_STRATEGY, MCProperties.DEFAULT_SPLIT_STRATEGY);
|
||||
if (splitStrategy.equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY)) {
|
||||
splitByteSize = Long.parseLong(props.getOrDefault(MCProperties.SPLIT_BYTE_SIZE,
|
||||
MCProperties.DEFAULT_SPLIT_BYTE_SIZE));
|
||||
|
||||
splitOptions = SplitOptions.newBuilder()
|
||||
.SplitByByteSize(splitByteSize)
|
||||
.withCrossPartition(false)
|
||||
.build();
|
||||
} else {
|
||||
splitRowCount = Long.parseLong(props.getOrDefault(MCProperties.SPLIT_ROW_COUNT,
|
||||
MCProperties.DEFAULT_SPLIT_ROW_COUNT));
|
||||
splitOptions = SplitOptions.newBuilder()
|
||||
.SplitByRowOffset()
|
||||
.withCrossPartition(false)
|
||||
.build();
|
||||
}
|
||||
if (Strings.isNullOrEmpty(defaultProject)) {
|
||||
throw new IllegalArgumentException("Missing required property '" + MCProperties.PROJECT + "'.");
|
||||
}
|
||||
if (region.startsWith("oss-")) {
|
||||
// may use oss-cn-beijing, ensure compatible
|
||||
region = region.replace("oss-", "");
|
||||
}
|
||||
this.region = region;
|
||||
|
||||
|
||||
CloudCredential credential = MCProperties.getCredential(props);
|
||||
if (!credential.isWhole()) {
|
||||
throw new IllegalArgumentException("Max-Compute credential properties '"
|
||||
+ MCProperties.ACCESS_KEY + "' and '" + MCProperties.SECRET_KEY + "' are required.");
|
||||
}
|
||||
accessKey = credential.getAccessKey();
|
||||
secretKey = credential.getSecretKey();
|
||||
|
||||
|
||||
|
||||
Account account = new AliyunAccount(accessKey, secretKey);
|
||||
this.odps = new Odps(account);
|
||||
enablePublicAccess = Boolean.parseBoolean(props.getOrDefault(MCProperties.PUBLIC_ACCESS, "false"));
|
||||
setOdpsUrl(region);
|
||||
odps.setDefaultProject(defaultProject);
|
||||
tunnel = new TableTunnel(odps);
|
||||
setTunnelUrl(region);
|
||||
}
|
||||
odps.setEndpoint(endpoint);
|
||||
Credentials credentials = Credentials.newBuilder().withAccount(odps.getAccount())
|
||||
.withAppAccount(odps.getAppAccount()).build();
|
||||
|
||||
private void setOdpsUrl(String region) {
|
||||
if (StringUtils.isEmpty(odpsUrl)) {
|
||||
odpsUrl = odpsUrlTemplate.replace("{}", region);
|
||||
if (enablePublicAccess) {
|
||||
odpsUrl = odpsUrl.replace("-inc", "");
|
||||
}
|
||||
}
|
||||
odps.setEndpoint(odpsUrl);
|
||||
}
|
||||
|
||||
private void setTunnelUrl(String region) {
|
||||
if (StringUtils.isEmpty(tunnelUrl)) {
|
||||
tunnelUrl = tunnelUrlTemplate.replace("{}", region);
|
||||
if (enablePublicAccess) {
|
||||
tunnelUrl = tunnelUrl.replace("-inc", "");
|
||||
}
|
||||
}
|
||||
tunnel.setEndpoint(tunnelUrl);
|
||||
}
|
||||
|
||||
public TableTunnel getTableTunnel() {
|
||||
makeSureInitialized();
|
||||
return tunnel;
|
||||
settings = EnvironmentSettings.newBuilder()
|
||||
.withCredentials(credentials)
|
||||
.withServiceEndpoint(odps.getEndpoint())
|
||||
.withQuotaName(quota)
|
||||
.build();
|
||||
}
|
||||
|
||||
public Odps getClient() {
|
||||
@ -135,10 +206,24 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
|
||||
|
||||
protected List<String> listDatabaseNames() {
|
||||
List<String> result = new ArrayList<>();
|
||||
result.add(defaultProject);
|
||||
|
||||
try {
|
||||
// TODO: How to get all privileged project from max compute as databases?
|
||||
// Now only have permission to show default project.
|
||||
result.add(odps.projects().get(odps.getDefaultProject()).getName());
|
||||
result.add(defaultProject);
|
||||
if (StringUtils.isNullOrEmpty(catalogOwner)) {
|
||||
SecurityManager sm = odps.projects().get().getSecurityManager();
|
||||
String whoami = sm.runQuery("whoami", false);
|
||||
|
||||
JsonObject js = JsonParser.parseString(whoami).getAsJsonObject();
|
||||
catalogOwner = js.get("DisplayName").getAsString();
|
||||
}
|
||||
Iterator<Project> iterator = odps.projects().iterator(catalogOwner);
|
||||
while (iterator.hasNext()) {
|
||||
Project project = iterator.next();
|
||||
if (!project.getName().equals(defaultProject)) {
|
||||
result.add(project.getName());
|
||||
}
|
||||
}
|
||||
} catch (OdpsException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@ -149,7 +234,7 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
|
||||
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
|
||||
makeSureInitialized();
|
||||
try {
|
||||
return odps.tables().exists(tblName);
|
||||
return getClient().tables().exists(dbName, tblName);
|
||||
} catch (OdpsException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@ -164,11 +249,11 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
|
||||
if (getClient().projects().exists(dbName)) {
|
||||
List<Partition> parts;
|
||||
if (limit < 0) {
|
||||
parts = getClient().tables().get(tbl).getPartitions();
|
||||
parts = getClient().tables().get(dbName, tbl).getPartitions();
|
||||
} else {
|
||||
skip = skip < 0 ? 0 : skip;
|
||||
parts = new ArrayList<>();
|
||||
Iterator<Partition> it = getClient().tables().get(tbl).getPartitionIterator();
|
||||
Iterator<Partition> it = getClient().tables().get(dbName, tbl).getPartitionIterator();
|
||||
int count = 0;
|
||||
while (it.hasNext()) {
|
||||
if (count < skip) {
|
||||
@ -195,19 +280,10 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
|
||||
public List<String> listTableNames(SessionContext ctx, String dbName) {
|
||||
makeSureInitialized();
|
||||
List<String> result = new ArrayList<>();
|
||||
odps.tables().forEach(e -> result.add(e.getName()));
|
||||
getClient().tables().iterable(dbName).forEach(e -> result.add(e.getName()));
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* use region to create data tunnel url
|
||||
* @return region, required by jni scanner.
|
||||
*/
|
||||
public String getRegion() {
|
||||
makeSureInitialized();
|
||||
return region;
|
||||
}
|
||||
|
||||
public String getAccessKey() {
|
||||
makeSureInitialized();
|
||||
return accessKey;
|
||||
@ -218,26 +294,101 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
|
||||
return secretKey;
|
||||
}
|
||||
|
||||
public boolean enablePublicAccess() {
|
||||
public String getEndpoint() {
|
||||
makeSureInitialized();
|
||||
return enablePublicAccess;
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
public String getDefaultProject() {
|
||||
makeSureInitialized();
|
||||
return defaultProject;
|
||||
}
|
||||
|
||||
public ZoneId getProjectDateTimeZone() {
|
||||
makeSureInitialized();
|
||||
|
||||
String[] endpointSplit = endpoint.split("\\.");
|
||||
if (endpointSplit.length >= 2) {
|
||||
// http://service.cn-hangzhou-vpc.maxcompute.aliyun-inc.com/api => cn-hangzhou-vpc
|
||||
String regionAndSuffix = endpointSplit[1];
|
||||
|
||||
//remove `-vpc` and `-intranet` suffix.
|
||||
String region = regionAndSuffix.replace("-vpc", "").replace("-intranet", "");
|
||||
if (REGION_ZONE_MAP.containsKey(region)) {
|
||||
return REGION_ZONE_MAP.get(region);
|
||||
}
|
||||
LOG.warn("Not exist region. region = " + region + ". endpoint = " + endpoint + ". use systemDefault.");
|
||||
return ZoneId.systemDefault();
|
||||
}
|
||||
LOG.warn("Split EndPoint " + endpoint + "fill. use systemDefault.");
|
||||
return ZoneId.systemDefault();
|
||||
}
|
||||
|
||||
public String getQuota() {
|
||||
return quota;
|
||||
}
|
||||
|
||||
public SplitOptions getSplitOption() {
|
||||
return splitOptions;
|
||||
}
|
||||
|
||||
public EnvironmentSettings getSettings() {
|
||||
return settings;
|
||||
}
|
||||
|
||||
public String getSplitStrategy() {
|
||||
return splitStrategy;
|
||||
}
|
||||
|
||||
public long getSplitRowCount() {
|
||||
return splitRowCount;
|
||||
}
|
||||
|
||||
|
||||
public long getSplitByteSize() {
|
||||
return splitByteSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkProperties() throws DdlException {
|
||||
super.checkProperties();
|
||||
Map<String, String> props = catalogProperty.getProperties();
|
||||
for (String requiredProperty : REQUIRED_PROPERTIES) {
|
||||
if (!catalogProperty.getProperties().containsKey(requiredProperty)) {
|
||||
if (!props.containsKey(requiredProperty)) {
|
||||
throw new DdlException("Required property '" + requiredProperty + "' is missing");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public String getOdpsUrl() {
|
||||
return odpsUrl;
|
||||
}
|
||||
try {
|
||||
splitStrategy = props.getOrDefault(MCProperties.SPLIT_STRATEGY, MCProperties.DEFAULT_SPLIT_STRATEGY);
|
||||
if (splitStrategy.equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY)) {
|
||||
splitByteSize = Long.parseLong(props.getOrDefault(MCProperties.SPLIT_BYTE_SIZE,
|
||||
MCProperties.DEFAULT_SPLIT_BYTE_SIZE));
|
||||
|
||||
public String getTunnelUrl() {
|
||||
return tunnelUrl;
|
||||
if (splitByteSize < 10485760L) {
|
||||
throw new DdlException(MCProperties.SPLIT_ROW_COUNT + " must be greater than or equal to 10485760");
|
||||
}
|
||||
|
||||
} else if (splitStrategy.equals(MCProperties.SPLIT_BY_ROW_COUNT_STRATEGY)) {
|
||||
splitRowCount = Long.parseLong(props.getOrDefault(MCProperties.SPLIT_ROW_COUNT,
|
||||
MCProperties.DEFAULT_SPLIT_ROW_COUNT));
|
||||
if (splitRowCount <= 0) {
|
||||
throw new DdlException(MCProperties.SPLIT_ROW_COUNT + " must be greater than 0");
|
||||
}
|
||||
|
||||
} else {
|
||||
throw new DdlException("property " + MCProperties.SPLIT_STRATEGY + "must is "
|
||||
+ MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY + " or " + MCProperties.SPLIT_BY_ROW_COUNT_STRATEGY);
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
throw new DdlException("property " + MCProperties.SPLIT_BYTE_SIZE + "/"
|
||||
+ MCProperties.SPLIT_ROW_COUNT + "must be an integer");
|
||||
}
|
||||
|
||||
CloudCredential credential = MCProperties.getCredential(props);
|
||||
if (!credential.isWhole()) {
|
||||
throw new DdlException("Max-Compute credential properties '"
|
||||
+ MCProperties.ACCESS_KEY + "' and '" + MCProperties.SECRET_KEY + "' are required.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -34,7 +34,6 @@ import org.apache.doris.thrift.TTableType;
|
||||
|
||||
import com.aliyun.odps.OdpsType;
|
||||
import com.aliyun.odps.Table;
|
||||
import com.aliyun.odps.tunnel.TunnelException;
|
||||
import com.aliyun.odps.type.ArrayTypeInfo;
|
||||
import com.aliyun.odps.type.CharTypeInfo;
|
||||
import com.aliyun.odps.type.DecimalTypeInfo;
|
||||
@ -48,6 +47,7 @@ import com.google.common.collect.Maps;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@ -61,6 +61,8 @@ public class MaxComputeExternalTable extends ExternalTable {
|
||||
super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE);
|
||||
}
|
||||
|
||||
private Map<String, com.aliyun.odps.Column> columnNameToOdpsColumn = new HashMap();
|
||||
|
||||
@Override
|
||||
protected synchronized void makeSureInitialized() {
|
||||
super.makeSureInitialized();
|
||||
@ -69,27 +71,6 @@ public class MaxComputeExternalTable extends ExternalTable {
|
||||
}
|
||||
}
|
||||
|
||||
public long getTotalRows() throws TunnelException {
|
||||
// use for non-partitioned table
|
||||
// partition table will read the entire partition on FE so get total rows is unnecessary.
|
||||
makeSureInitialized();
|
||||
MaxComputeMetadataCache metadataCache = Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.getMaxComputeMetadataCache(catalog.getId());
|
||||
MaxComputeExternalCatalog mcCatalog = ((MaxComputeExternalCatalog) catalog);
|
||||
return metadataCache.getCachedRowCount(dbName, name, null, key -> {
|
||||
try {
|
||||
return loadRowCount(mcCatalog, key);
|
||||
} catch (TunnelException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private long loadRowCount(MaxComputeExternalCatalog catalog, MaxComputeCacheKey key) throws TunnelException {
|
||||
return catalog.getTableTunnel()
|
||||
.getDownloadSession(key.getDbName(), key.getTblName(), null)
|
||||
.getRecordCount();
|
||||
}
|
||||
|
||||
public List<Column> getPartitionColumns() {
|
||||
makeSureInitialized();
|
||||
@ -151,19 +132,34 @@ public class MaxComputeExternalTable extends ExternalTable {
|
||||
return partitionValues;
|
||||
}
|
||||
|
||||
public Map<String, com.aliyun.odps.Column> getColumnNameToOdpsColumn() {
|
||||
return columnNameToOdpsColumn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<SchemaCacheValue> initSchema() {
|
||||
// this method will be called at semantic parsing.
|
||||
makeSureInitialized();
|
||||
Table odpsTable = ((MaxComputeExternalCatalog) catalog).getClient().tables().get(name);
|
||||
Table odpsTable = ((MaxComputeExternalCatalog) catalog).getClient().tables().get(dbName, name);
|
||||
List<com.aliyun.odps.Column> columns = odpsTable.getSchema().getColumns();
|
||||
|
||||
|
||||
for (com.aliyun.odps.Column column : columns) {
|
||||
columnNameToOdpsColumn.put(column.getName(), column);
|
||||
}
|
||||
|
||||
List<Column> schema = Lists.newArrayListWithCapacity(columns.size());
|
||||
for (com.aliyun.odps.Column field : columns) {
|
||||
schema.add(new Column(field.getName(), mcTypeToDorisType(field.getTypeInfo()), true, null,
|
||||
true, field.getComment(), true, -1));
|
||||
field.isNullable(), field.getComment(), true, -1));
|
||||
}
|
||||
|
||||
List<com.aliyun.odps.Column> partitionColumns = odpsTable.getSchema().getPartitionColumns();
|
||||
|
||||
for (com.aliyun.odps.Column partitionColumn : partitionColumns) {
|
||||
columnNameToOdpsColumn.put(partitionColumn.getName(), partitionColumn);
|
||||
}
|
||||
|
||||
List<String> partitionSpecs;
|
||||
if (!partitionColumns.isEmpty()) {
|
||||
partitionSpecs = odpsTable.getPartitions().stream()
|
||||
@ -239,10 +235,12 @@ public class MaxComputeExternalTable extends ExternalTable {
|
||||
case DATE: {
|
||||
return ScalarType.createDateV2Type();
|
||||
}
|
||||
case DATETIME:
|
||||
case TIMESTAMP: {
|
||||
case DATETIME: {
|
||||
return ScalarType.createDatetimeV2Type(3);
|
||||
}
|
||||
case TIMESTAMP_NTZ: {
|
||||
return ScalarType.createDatetimeV2Type(6);
|
||||
}
|
||||
case ARRAY: {
|
||||
ArrayTypeInfo arrayType = (ArrayTypeInfo) typeInfo;
|
||||
Type innerType = mcTypeToDorisType(arrayType.getElementTypeInfo());
|
||||
@ -275,17 +273,22 @@ public class MaxComputeExternalTable extends ExternalTable {
|
||||
|
||||
@Override
|
||||
public TTableDescriptor toThrift() {
|
||||
// ak sk endpoint project quota
|
||||
List<Column> schema = getFullSchema();
|
||||
TMCTable tMcTable = new TMCTable();
|
||||
MaxComputeExternalCatalog mcCatalog = ((MaxComputeExternalCatalog) catalog);
|
||||
tMcTable.setRegion(mcCatalog.getRegion());
|
||||
|
||||
tMcTable.setAccessKey(mcCatalog.getAccessKey());
|
||||
tMcTable.setSecretKey(mcCatalog.getSecretKey());
|
||||
tMcTable.setOdpsUrl(mcCatalog.getOdpsUrl());
|
||||
tMcTable.setTunnelUrl(mcCatalog.getTunnelUrl());
|
||||
tMcTable.setPublicAccess(String.valueOf(mcCatalog.enablePublicAccess()));
|
||||
tMcTable.setOdpsUrl("deprecated");
|
||||
tMcTable.setRegion("deprecated");
|
||||
tMcTable.setEndpoint(mcCatalog.getEndpoint());
|
||||
// use mc project as dbName
|
||||
tMcTable.setProject(dbName);
|
||||
tMcTable.setQuota(mcCatalog.getQuota());
|
||||
|
||||
tMcTable.setTunnelUrl("deprecated");
|
||||
tMcTable.setProject("deprecated");
|
||||
tMcTable.setTable(name);
|
||||
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.MAX_COMPUTE_TABLE,
|
||||
schema.size(), 0, getName(), dbName);
|
||||
|
||||
@ -30,21 +30,11 @@ import java.util.stream.Collectors;
|
||||
|
||||
public class MaxComputeMetadataCache {
|
||||
private final Cache<MaxComputeCacheKey, TablePartitionValues> partitionValuesCache;
|
||||
private final Cache<MaxComputeCacheKey, Long> tableRowCountCache;
|
||||
|
||||
public MaxComputeMetadataCache() {
|
||||
partitionValuesCache = Caffeine.newBuilder().maximumSize(Config.max_hive_partition_cache_num)
|
||||
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES)
|
||||
.build();
|
||||
tableRowCountCache = Caffeine.newBuilder().maximumSize(10000)
|
||||
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES)
|
||||
.build();
|
||||
}
|
||||
|
||||
public Long getCachedRowCount(String dbName, String tblName, String partitionSpec,
|
||||
Function<? super MaxComputeCacheKey, ? extends Long> loader) {
|
||||
MaxComputeCacheKey tablePartitionKey = new MaxComputeCacheKey(dbName, tblName, partitionSpec);
|
||||
return tableRowCountCache.get(tablePartitionKey, loader);
|
||||
}
|
||||
|
||||
public TablePartitionValues getCachedPartitionValues(MaxComputeCacheKey tablePartitionKey,
|
||||
@ -54,7 +44,6 @@ public class MaxComputeMetadataCache {
|
||||
|
||||
public void cleanUp() {
|
||||
partitionValuesCache.invalidateAll();
|
||||
tableRowCountCache.invalidateAll();
|
||||
}
|
||||
|
||||
public void cleanDatabaseCache(String dbName) {
|
||||
@ -63,17 +52,10 @@ public class MaxComputeMetadataCache {
|
||||
.filter(k -> k.getDbName().equalsIgnoreCase(dbName))
|
||||
.collect(Collectors.toList());
|
||||
partitionValuesCache.invalidateAll(removeCacheList);
|
||||
|
||||
List<MaxComputeCacheKey> removeCacheRowCountList = tableRowCountCache.asMap().keySet()
|
||||
.stream()
|
||||
.filter(k -> k.getDbName().equalsIgnoreCase(dbName))
|
||||
.collect(Collectors.toList());
|
||||
tableRowCountCache.invalidateAll(removeCacheRowCountList);
|
||||
}
|
||||
|
||||
public void cleanTableCache(String dbName, String tblName) {
|
||||
MaxComputeCacheKey cacheKey = new MaxComputeCacheKey(dbName, tblName);
|
||||
partitionValuesCache.invalidate(cacheKey);
|
||||
tableRowCountCache.invalidate(cacheKey);
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,18 +17,30 @@
|
||||
|
||||
package org.apache.doris.datasource.maxcompute.source;
|
||||
|
||||
import org.apache.doris.analysis.BinaryPredicate;
|
||||
import org.apache.doris.analysis.CastExpr;
|
||||
import org.apache.doris.analysis.CompoundPredicate;
|
||||
import org.apache.doris.analysis.CompoundPredicate.Operator;
|
||||
import org.apache.doris.analysis.DateLiteral;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.InPredicate;
|
||||
import org.apache.doris.analysis.IsNullPredicate;
|
||||
import org.apache.doris.analysis.LiteralExpr;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.LocationPath;
|
||||
import org.apache.doris.datasource.FileQueryScanNode;
|
||||
import org.apache.doris.datasource.TableFormatType;
|
||||
import org.apache.doris.datasource.TablePartitionValues;
|
||||
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
|
||||
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
|
||||
import org.apache.doris.planner.ListPartitionPrunerV2;
|
||||
import org.apache.doris.datasource.maxcompute.source.MaxComputeSplit.SplitType;
|
||||
import org.apache.doris.datasource.property.constants.MCProperties;
|
||||
import org.apache.doris.nereids.util.DateUtils;
|
||||
import org.apache.doris.planner.PlanNodeId;
|
||||
import org.apache.doris.spi.Split;
|
||||
import org.apache.doris.statistics.StatisticalType;
|
||||
@ -37,22 +49,39 @@ import org.apache.doris.thrift.TFileRangeDesc;
|
||||
import org.apache.doris.thrift.TMaxComputeFileDesc;
|
||||
import org.apache.doris.thrift.TTableFormatFileDesc;
|
||||
|
||||
import com.aliyun.odps.Table;
|
||||
import com.aliyun.odps.tunnel.TunnelException;
|
||||
import com.aliyun.odps.OdpsType;
|
||||
import com.aliyun.odps.table.TableIdentifier;
|
||||
import com.aliyun.odps.table.configuration.ArrowOptions;
|
||||
import com.aliyun.odps.table.configuration.ArrowOptions.TimestampUnit;
|
||||
import com.aliyun.odps.table.optimizer.predicate.Predicate;
|
||||
import com.aliyun.odps.table.read.TableBatchReadSession;
|
||||
import com.aliyun.odps.table.read.TableReadSessionBuilder;
|
||||
import com.aliyun.odps.table.read.split.InputSplitAssigner;
|
||||
import com.aliyun.odps.table.read.split.impl.IndexedInputSplit;
|
||||
import com.google.common.collect.Maps;
|
||||
import jline.internal.Log;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.Serializable;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Base64;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class MaxComputeScanNode extends FileQueryScanNode {
|
||||
|
||||
private final MaxComputeExternalTable table;
|
||||
private static final int MIN_SPLIT_SIZE = 4096;
|
||||
private static final LocationPath VIRTUAL_SLICE_PART = new LocationPath("/virtual_slice_part", Maps.newHashMap());
|
||||
TableBatchReadSession tableBatchReadSession;
|
||||
|
||||
public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) {
|
||||
this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE, needCheckColumnPriv);
|
||||
@ -75,13 +104,303 @@ public class MaxComputeScanNode extends FileQueryScanNode {
|
||||
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
|
||||
tableFormatFileDesc.setTableFormatType(TableFormatType.MAX_COMPUTE.value());
|
||||
TMaxComputeFileDesc fileDesc = new TMaxComputeFileDesc();
|
||||
if (maxComputeSplit.getPartitionSpec().isPresent()) {
|
||||
fileDesc.setPartitionSpec(maxComputeSplit.getPartitionSpec().get());
|
||||
}
|
||||
fileDesc.setPartitionSpec("deprecated");
|
||||
fileDesc.setTableBatchReadSession(maxComputeSplit.scanSerialize);
|
||||
fileDesc.setSessionId(maxComputeSplit.getSessionId());
|
||||
tableFormatFileDesc.setMaxComputeParams(fileDesc);
|
||||
rangeDesc.setTableFormatParams(tableFormatFileDesc);
|
||||
rangeDesc.setPath("[ " + maxComputeSplit.getStart() + " , " + maxComputeSplit.getLength() + " ]");
|
||||
rangeDesc.setStartOffset(maxComputeSplit.getStart());
|
||||
rangeDesc.setSize(maxComputeSplit.getLength());
|
||||
}
|
||||
|
||||
void createTableBatchReadSession() throws UserException {
|
||||
Predicate filterPredicate = convertPredicate();
|
||||
|
||||
List<String> requiredPartitionColumns = new ArrayList<>();
|
||||
List<String> orderedRequiredDataColumns = new ArrayList<>();
|
||||
|
||||
Set<String> requiredSlots =
|
||||
desc.getSlots().stream().map(e -> e.getColumn().getName()).collect(Collectors.toSet());
|
||||
|
||||
Set<String> partitionColumns =
|
||||
table.getPartitionColumns().stream().map(Column::getName).collect(Collectors.toSet());
|
||||
|
||||
for (Column column : table.getColumns()) {
|
||||
String columnName = column.getName();
|
||||
if (!requiredSlots.contains(columnName)) {
|
||||
continue;
|
||||
}
|
||||
if (partitionColumns.contains(columnName)) {
|
||||
requiredPartitionColumns.add(columnName);
|
||||
} else {
|
||||
orderedRequiredDataColumns.add(columnName);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
|
||||
|
||||
try {
|
||||
TableReadSessionBuilder scanBuilder = new TableReadSessionBuilder();
|
||||
tableBatchReadSession =
|
||||
scanBuilder.identifier(TableIdentifier.of(table.getDbName(), table.getName()))
|
||||
.withSettings(mcCatalog.getSettings())
|
||||
.withSplitOptions(mcCatalog.getSplitOption())
|
||||
.requiredPartitionColumns(requiredPartitionColumns)
|
||||
.requiredDataColumns(orderedRequiredDataColumns)
|
||||
.withArrowOptions(
|
||||
ArrowOptions.newBuilder()
|
||||
.withDatetimeUnit(TimestampUnit.MILLI)
|
||||
.withTimestampUnit(TimestampUnit.NANO)
|
||||
.build()
|
||||
)
|
||||
.withFilterPredicate(filterPredicate)
|
||||
.buildBatchReadSession();
|
||||
} catch (java.io.IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected Predicate convertPredicate() {
|
||||
if (conjuncts.isEmpty()) {
|
||||
return Predicate.NO_PREDICATE;
|
||||
}
|
||||
|
||||
List<Predicate> odpsPredicates = new ArrayList<>();
|
||||
for (Expr dorisPredicate : conjuncts) {
|
||||
try {
|
||||
odpsPredicates.add(convertExprToOdpsPredicate(dorisPredicate));
|
||||
} catch (AnalysisException e) {
|
||||
Log.warn("Failed to convert predicate " + dorisPredicate.toString() + "Reason: "
|
||||
+ e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
if (odpsPredicates.isEmpty()) {
|
||||
return Predicate.NO_PREDICATE;
|
||||
} else if (odpsPredicates.size() == 1) {
|
||||
return odpsPredicates.get(0);
|
||||
} else {
|
||||
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate
|
||||
filterPredicate = new com.aliyun.odps.table.optimizer.predicate.CompoundPredicate(
|
||||
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.AND);
|
||||
|
||||
for (Predicate odpsPredicate : odpsPredicates) {
|
||||
filterPredicate.addPredicate(odpsPredicate);
|
||||
}
|
||||
return filterPredicate;
|
||||
}
|
||||
}
|
||||
|
||||
private Predicate convertExprToOdpsPredicate(Expr expr) throws AnalysisException {
|
||||
Predicate odpsPredicate = null;
|
||||
if (expr instanceof CompoundPredicate) {
|
||||
CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
|
||||
|
||||
com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator odpsOp;
|
||||
switch (compoundPredicate.getOp()) {
|
||||
case AND:
|
||||
odpsOp = com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.AND;
|
||||
break;
|
||||
case OR:
|
||||
odpsOp = com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.OR;
|
||||
break;
|
||||
case NOT:
|
||||
odpsOp = com.aliyun.odps.table.optimizer.predicate.CompoundPredicate.Operator.NOT;
|
||||
break;
|
||||
default:
|
||||
throw new AnalysisException("Unknown operator: " + compoundPredicate.getOp());
|
||||
}
|
||||
|
||||
List<Predicate> odpsPredicates = new ArrayList<>();
|
||||
|
||||
odpsPredicates.add(convertExprToOdpsPredicate(expr.getChild(0)));
|
||||
|
||||
if (compoundPredicate.getOp() != Operator.NOT) {
|
||||
odpsPredicates.add(convertExprToOdpsPredicate(expr.getChild(1)));
|
||||
}
|
||||
odpsPredicate = new com.aliyun.odps.table.optimizer.predicate.CompoundPredicate(odpsOp, odpsPredicates);
|
||||
|
||||
} else if (expr instanceof InPredicate) {
|
||||
|
||||
InPredicate inPredicate = (InPredicate) expr;
|
||||
com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator odpsOp =
|
||||
inPredicate.isNotIn()
|
||||
? com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator.IN
|
||||
: com.aliyun.odps.table.optimizer.predicate.InPredicate.Operator.NOT_IN;
|
||||
|
||||
String columnName = convertSlotRefToColumnName(expr.getChild(0));
|
||||
com.aliyun.odps.OdpsType odpsType = table.getColumnNameToOdpsColumn().get(columnName).getType();
|
||||
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
|
||||
|
||||
stringBuilder.append(columnName);
|
||||
stringBuilder.append(" ");
|
||||
stringBuilder.append(odpsOp.getDescription());
|
||||
stringBuilder.append(" (");
|
||||
|
||||
for (int i = 1; i < inPredicate.getChildren().size(); i++) {
|
||||
stringBuilder.append(convertLiteralToOdpsValues(odpsType, expr.getChild(i)));
|
||||
if (i < inPredicate.getChildren().size() - 1) {
|
||||
stringBuilder.append(", ");
|
||||
}
|
||||
}
|
||||
stringBuilder.append(" )");
|
||||
|
||||
odpsPredicate = new com.aliyun.odps.table.optimizer.predicate.RawPredicate(stringBuilder.toString());
|
||||
|
||||
} else if (expr instanceof BinaryPredicate) {
|
||||
BinaryPredicate binaryPredicate = (BinaryPredicate) expr;
|
||||
|
||||
|
||||
com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator odpsOp;
|
||||
switch (binaryPredicate.getOp()) {
|
||||
case EQ: {
|
||||
odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.EQUALS;
|
||||
break;
|
||||
}
|
||||
case NE: {
|
||||
odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.NOT_EQUALS;
|
||||
break;
|
||||
}
|
||||
case GE: {
|
||||
odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.GREATER_THAN_OR_EQUAL;
|
||||
break;
|
||||
}
|
||||
case LE: {
|
||||
odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.LESS_THAN_OR_EQUAL;
|
||||
break;
|
||||
}
|
||||
case LT: {
|
||||
odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.LESS_THAN;
|
||||
break;
|
||||
}
|
||||
case GT: {
|
||||
odpsOp = com.aliyun.odps.table.optimizer.predicate.BinaryPredicate.Operator.GREATER_THAN;
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
odpsOp = null;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (odpsOp != null) {
|
||||
String columnName = convertSlotRefToColumnName(expr.getChild(0));
|
||||
com.aliyun.odps.OdpsType odpsType = table.getColumnNameToOdpsColumn().get(columnName).getType();
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
stringBuilder.append(columnName);
|
||||
stringBuilder.append(" ");
|
||||
stringBuilder.append(odpsOp.getDescription());
|
||||
stringBuilder.append(" ");
|
||||
stringBuilder.append(convertLiteralToOdpsValues(odpsType, expr.getChild(1)));
|
||||
|
||||
odpsPredicate = new com.aliyun.odps.table.optimizer.predicate.RawPredicate(stringBuilder.toString());
|
||||
}
|
||||
} else if (expr instanceof IsNullPredicate) {
|
||||
IsNullPredicate isNullPredicate = (IsNullPredicate) expr;
|
||||
com.aliyun.odps.table.optimizer.predicate.UnaryPredicate.Operator odpsOp =
|
||||
isNullPredicate.isNotNull()
|
||||
? com.aliyun.odps.table.optimizer.predicate.UnaryPredicate.Operator.NOT_NULL
|
||||
: com.aliyun.odps.table.optimizer.predicate.UnaryPredicate.Operator.IS_NULL;
|
||||
|
||||
odpsPredicate = new com.aliyun.odps.table.optimizer.predicate.UnaryPredicate(odpsOp,
|
||||
new com.aliyun.odps.table.optimizer.predicate.Attribute(
|
||||
convertSlotRefToColumnName(expr.getChild(0))
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
if (odpsPredicate == null) {
|
||||
throw new AnalysisException("Do not support convert ["
|
||||
+ expr.getExprName() + "] in convertExprToOdpsPredicate.");
|
||||
}
|
||||
return odpsPredicate;
|
||||
}
|
||||
|
||||
private String convertSlotRefToColumnName(Expr expr) throws AnalysisException {
|
||||
if (expr instanceof SlotRef) {
|
||||
return ((SlotRef) expr).getColumnName();
|
||||
} else if (expr instanceof CastExpr) {
|
||||
if (expr.getChild(0) instanceof SlotRef) {
|
||||
return ((SlotRef) expr.getChild(0)).getColumnName();
|
||||
}
|
||||
}
|
||||
|
||||
throw new AnalysisException("Do not support convert ["
|
||||
+ expr.getExprName() + "] in convertSlotRefToAttribute.");
|
||||
|
||||
}
|
||||
|
||||
private String convertLiteralToOdpsValues(OdpsType odpsType, Expr expr) throws AnalysisException {
|
||||
if (!(expr instanceof LiteralExpr)) {
|
||||
throw new AnalysisException("Do not support convert ["
|
||||
+ expr.getExprName() + "] in convertSlotRefToAttribute.");
|
||||
}
|
||||
LiteralExpr literalExpr = (LiteralExpr) expr;
|
||||
|
||||
switch (odpsType) {
|
||||
case BOOLEAN:
|
||||
case TINYINT:
|
||||
case SMALLINT:
|
||||
case INT:
|
||||
case BIGINT:
|
||||
case DECIMAL:
|
||||
case FLOAT:
|
||||
case DOUBLE: {
|
||||
return " " + literalExpr.toString() + " ";
|
||||
}
|
||||
case STRING:
|
||||
case CHAR:
|
||||
case VARCHAR: {
|
||||
return " \"" + literalExpr.toString() + "\" ";
|
||||
}
|
||||
case DATE: {
|
||||
DateLiteral dateLiteral = (DateLiteral) literalExpr;
|
||||
ScalarType dstType = ScalarType.createDateV2Type();
|
||||
return " \"" + dateLiteral.getStringValue(dstType) + "\" ";
|
||||
}
|
||||
case DATETIME: {
|
||||
DateLiteral dateLiteral = (DateLiteral) literalExpr;
|
||||
ScalarType dstType = ScalarType.createDatetimeV2Type(3);
|
||||
|
||||
return " \"" + convertDateTimezone(dateLiteral.getStringValue(dstType),
|
||||
((MaxComputeExternalCatalog) table.getCatalog()).getProjectDateTimeZone()) + "\" ";
|
||||
}
|
||||
case TIMESTAMP_NTZ: {
|
||||
DateLiteral dateLiteral = (DateLiteral) literalExpr;
|
||||
ScalarType dstType = ScalarType.createDatetimeV2Type(6);
|
||||
return " \"" + dateLiteral.getStringValue(dstType) + "\" ";
|
||||
}
|
||||
default: {
|
||||
break;
|
||||
}
|
||||
}
|
||||
throw new AnalysisException("Do not support convert odps type [" + odpsType + "] to odps values.");
|
||||
}
|
||||
|
||||
|
||||
public static String convertDateTimezone(String dateTimeStr, ZoneId toZone) {
|
||||
if (DateUtils.getTimeZone().equals(toZone)) {
|
||||
return dateTimeStr;
|
||||
}
|
||||
|
||||
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
|
||||
LocalDateTime localDateTime = LocalDateTime.parse(dateTimeStr, formatter);
|
||||
|
||||
ZonedDateTime sourceZonedDateTime = localDateTime.atZone(DateUtils.getTimeZone());
|
||||
ZonedDateTime targetZonedDateTime = sourceZonedDateTime.withZoneSameInstant(toZone);
|
||||
|
||||
return targetZonedDateTime.format(formatter);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public TFileFormatType getFileFormatType() {
|
||||
return TFileFormatType.FORMAT_JNI;
|
||||
@ -109,85 +428,64 @@ public class MaxComputeScanNode extends FileQueryScanNode {
|
||||
if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
|
||||
return result;
|
||||
}
|
||||
createTableBatchReadSession();
|
||||
|
||||
try {
|
||||
if (!table.getPartitionColumns().isEmpty()) {
|
||||
if (conjuncts.isEmpty()) {
|
||||
throw new IllegalArgumentException("Max Compute partition table need partition predicate.");
|
||||
}
|
||||
List<String> partitionSpecs = getPartitionSpecs();
|
||||
for (String partitionSpec : partitionSpecs) {
|
||||
addPartitionSplits(result, odpsTable, partitionSpec);
|
||||
String scanSessionSerialize = serializeSession(tableBatchReadSession);
|
||||
InputSplitAssigner assigner = tableBatchReadSession.getInputSplitAssigner();
|
||||
long modificationTime = table.getOdpsTable().getLastDataModifiedTime().getTime();
|
||||
|
||||
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog();
|
||||
|
||||
if (mcCatalog.getSplitStrategy().equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY)) {
|
||||
|
||||
for (com.aliyun.odps.table.read.split.InputSplit split : assigner.getAllSplits()) {
|
||||
MaxComputeSplit maxComputeSplit =
|
||||
new MaxComputeSplit(new LocationPath("/byte_size", Maps.newHashMap()),
|
||||
((IndexedInputSplit) split).getSplitIndex(), -1,
|
||||
mcCatalog.getSplitByteSize(),
|
||||
modificationTime, null,
|
||||
Collections.emptyList());
|
||||
|
||||
|
||||
maxComputeSplit.scanSerialize = scanSessionSerialize;
|
||||
maxComputeSplit.splitType = SplitType.BYTE_SIZE;
|
||||
maxComputeSplit.sessionId = split.getSessionId();
|
||||
|
||||
result.add(maxComputeSplit);
|
||||
}
|
||||
} else {
|
||||
addBatchSplits(result, odpsTable, table.getTotalRows());
|
||||
}
|
||||
} catch (TunnelException e) {
|
||||
throw new UserException("Max Compute tunnel SDK exception: " + e.getMessage(), e);
|
||||
long totalRowCount = assigner.getTotalRowCount();
|
||||
|
||||
long recordsPerSplit = mcCatalog.getSplitRowCount();
|
||||
for (long offset = 0; offset < totalRowCount; offset += recordsPerSplit) {
|
||||
recordsPerSplit = Math.min(recordsPerSplit, totalRowCount - offset);
|
||||
com.aliyun.odps.table.read.split.InputSplit split =
|
||||
assigner.getSplitByRowOffset(offset, recordsPerSplit);
|
||||
|
||||
MaxComputeSplit maxComputeSplit =
|
||||
new MaxComputeSplit(new LocationPath("/row_offset", Maps.newHashMap()),
|
||||
offset, recordsPerSplit, totalRowCount, modificationTime, null,
|
||||
Collections.emptyList());
|
||||
|
||||
maxComputeSplit.scanSerialize = scanSessionSerialize;
|
||||
maxComputeSplit.splitType = SplitType.ROW_OFFSET;
|
||||
maxComputeSplit.sessionId = split.getSessionId();
|
||||
|
||||
result.add(maxComputeSplit);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static void addPartitionSplits(List<Split> result, Table odpsTable, String partitionSpec) {
|
||||
long modificationTime = odpsTable.getLastDataModifiedTime().getTime();
|
||||
// use '-1' to read whole partition, avoid expending too much time on calling table.getTotalRows()
|
||||
result.add(new MaxComputeSplit(VIRTUAL_SLICE_PART,
|
||||
0, -1L, -1, modificationTime, null, Collections.emptyList(), null));
|
||||
}
|
||||
|
||||
private static void addBatchSplits(List<Split> result, Table odpsTable, long totalRows) {
|
||||
List<Pair<Long, Long>> sliceRange = new ArrayList<>();
|
||||
long fileNum = odpsTable.getFileNum();
|
||||
long start = 0;
|
||||
long splitSize = (long) Math.ceil((double) totalRows / fileNum);
|
||||
if (splitSize <= 0 || totalRows < MIN_SPLIT_SIZE) {
|
||||
// use whole split
|
||||
sliceRange.add(Pair.of(start, totalRows));
|
||||
} else {
|
||||
for (int i = 0; i < fileNum; i++) {
|
||||
if (start > totalRows) {
|
||||
break;
|
||||
}
|
||||
sliceRange.add(Pair.of(start, splitSize));
|
||||
start += splitSize;
|
||||
}
|
||||
}
|
||||
long modificationTime = odpsTable.getLastDataModifiedTime().getTime();
|
||||
if (!sliceRange.isEmpty()) {
|
||||
for (int i = 0; i < sliceRange.size(); i++) {
|
||||
Pair<Long, Long> range = sliceRange.get(i);
|
||||
result.add(new MaxComputeSplit(new LocationPath("/virtual_slice_" + i, Maps.newHashMap()),
|
||||
range.first, range.second, totalRows, modificationTime, null, Collections.emptyList(), null));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> getPartitionSpecs() throws AnalysisException {
|
||||
return getPrunedPartitionSpecs();
|
||||
}
|
||||
|
||||
private List<String> getPrunedPartitionSpecs() throws AnalysisException {
|
||||
List<String> result = new ArrayList<>();
|
||||
TablePartitionValues partitionValues = table.getPartitionValues();
|
||||
// prune partitions by expr
|
||||
partitionValues.readLock().lock();
|
||||
try {
|
||||
Map<Long, PartitionItem> idToPartitionItem = partitionValues.getIdToPartitionItem();
|
||||
this.totalPartitionNum = idToPartitionItem.size();
|
||||
ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(idToPartitionItem,
|
||||
table.getPartitionColumns(), columnNameToRange,
|
||||
partitionValues.getUidToPartitionRange(),
|
||||
partitionValues.getRangeToId(),
|
||||
partitionValues.getSingleColumnRangeMap(),
|
||||
false);
|
||||
Collection<Long> filteredPartitionIds = pruner.prune();
|
||||
this.selectedPartitionNum = filteredPartitionIds.size();
|
||||
// get partitions from cache
|
||||
Map<Long, String> partitionIdToNameMap = partitionValues.getPartitionIdToNameMap();
|
||||
filteredPartitionIds.forEach(id -> result.add(partitionIdToNameMap.get(id)));
|
||||
return result;
|
||||
} finally {
|
||||
partitionValues.readLock().unlock();
|
||||
}
|
||||
private static String serializeSession(Serializable object) throws IOException {
|
||||
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
|
||||
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
|
||||
objectOutputStream.writeObject(object);
|
||||
byte[] serializedBytes = byteArrayOutputStream.toByteArray();
|
||||
return Base64.getEncoder().encodeToString(serializedBytes);
|
||||
}
|
||||
}
|
||||
|
||||
@ -21,21 +21,27 @@ import org.apache.doris.common.util.LocationPath;
|
||||
import org.apache.doris.datasource.FileSplit;
|
||||
import org.apache.doris.thrift.TFileType;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import lombok.Getter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Getter
|
||||
public class MaxComputeSplit extends FileSplit {
|
||||
private final Optional<String> partitionSpec;
|
||||
public String scanSerialize;
|
||||
public String sessionId;
|
||||
|
||||
public enum SplitType {
|
||||
ROW_OFFSET,
|
||||
BYTE_SIZE
|
||||
}
|
||||
|
||||
public SplitType splitType;
|
||||
|
||||
public MaxComputeSplit(LocationPath path, long start, long length, long fileLength,
|
||||
long modificationTime, String[] hosts, List<String> partitionValues, String partitionSpec) {
|
||||
long modificationTime, String[] hosts, List<String> partitionValues) {
|
||||
super(path, start, length, fileLength, modificationTime, hosts, partitionValues);
|
||||
this.partitionSpec = Optional.ofNullable(partitionSpec);
|
||||
// MC always use FILE_NET type
|
||||
this.locationType = TFileType.FILE_NET;
|
||||
}
|
||||
|
||||
public Optional<String> getPartitionSpec() {
|
||||
return partitionSpec;
|
||||
}
|
||||
}
|
||||
|
||||
@ -25,15 +25,37 @@ import java.util.Map;
|
||||
* properties for aliyun max compute
|
||||
*/
|
||||
public class MCProperties extends BaseProperties {
|
||||
|
||||
//To be compatible with previous versions of the catalog.
|
||||
public static final String REGION = "mc.region";
|
||||
public static final String PROJECT = "mc.default.project";
|
||||
public static final String ACCESS_KEY = "mc.access_key";
|
||||
public static final String SECRET_KEY = "mc.secret_key";
|
||||
public static final String SESSION_TOKEN = "mc.session_token";
|
||||
public static final String PUBLIC_ACCESS = "mc.public_access";
|
||||
public static final String DEFAULT_PUBLIC_ACCESS = "false";
|
||||
public static final String ODPS_ENDPOINT = "mc.odps_endpoint";
|
||||
public static final String TUNNEL_SDK_ENDPOINT = "mc.tunnel_endpoint";
|
||||
|
||||
|
||||
public static final String PROJECT = "mc.default.project";
|
||||
public static final String SESSION_TOKEN = "mc.session_token";
|
||||
|
||||
public static final String ACCESS_KEY = "mc.access_key";
|
||||
public static final String SECRET_KEY = "mc.secret_key";
|
||||
public static final String ENDPOINT = "mc.endpoint";
|
||||
|
||||
public static final String QUOTA = "mc.quota";
|
||||
public static final String DEFAULT_QUOTA = "pay-as-you-go";
|
||||
|
||||
|
||||
public static final String SPLIT_STRATEGY = "mc.split_strategy";
|
||||
public static final String SPLIT_BY_BYTE_SIZE_STRATEGY = "byte_size";
|
||||
public static final String SPLIT_BY_ROW_COUNT_STRATEGY = "row_count";
|
||||
public static final String DEFAULT_SPLIT_STRATEGY = SPLIT_BY_BYTE_SIZE_STRATEGY;
|
||||
|
||||
|
||||
public static final String SPLIT_BYTE_SIZE = "mc.split_byte_size";
|
||||
public static final String DEFAULT_SPLIT_BYTE_SIZE = "268435456"; //256 * 1024L * 1024L = 256MB
|
||||
public static final String SPLIT_ROW_COUNT = "mc.split_row_count";
|
||||
public static final String DEFAULT_SPLIT_ROW_COUNT = "1048576"; // 256 * 4096
|
||||
|
||||
public static CloudCredential getCredential(Map<String, String> props) {
|
||||
return getCloudCredential(props, ACCESS_KEY, SECRET_KEY, SESSION_TOKEN);
|
||||
}
|
||||
|
||||
@ -402,10 +402,9 @@ public class PropertyConverterTest extends TestWithFeService {
|
||||
String queryDlf1 = "create catalog hms_mc properties (\n"
|
||||
+ " 'type'='max_compute',\n"
|
||||
+ " 'mc.default.project' = 'project0',\n"
|
||||
+ " 'mc.region' = 'cn-beijing',\n"
|
||||
+ " 'mc.access_key' = 'ak',\n"
|
||||
+ " 'mc.secret_key' = 'sk',\n"
|
||||
+ " 'mc.public_access' = 'true'\n"
|
||||
+ " 'mc.endpoint' = 'http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api' \n"
|
||||
+ ");";
|
||||
String catalogName = "hms_mc";
|
||||
CreateCatalogStmt analyzedStmt = createStmt(queryDlf1);
|
||||
@ -414,10 +413,10 @@ public class PropertyConverterTest extends TestWithFeService {
|
||||
.getCatalogMgr().getCatalog(catalogName);
|
||||
Map<String, String> properties = catalog.getCatalogProperty().getProperties();
|
||||
Assertions.assertEquals(properties.get("type"), "max_compute");
|
||||
Assertions.assertEquals(properties.get("mc.region"), "cn-beijing");
|
||||
Assertions.assertEquals(properties.get("mc.access_key"), "ak");
|
||||
Assertions.assertEquals(properties.get("mc.secret_key"), "sk");
|
||||
Assertions.assertEquals(properties.get("mc.public_access"), "true");
|
||||
Assertions.assertEquals(properties.get("mc.endpoint"),
|
||||
"http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api");
|
||||
Assertions.assertEquals(properties.get("mc.default.project"), "project0");
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user