[fix](multi-catalog)support the max compute partition prune (#27154)

1. max compute partition prune,
we just support filter mc partitions by '=',it can filter just one partition
to support multiple partition filter and range operator('>','<', '>='..), the partition prune should be supported.

2. add max compute row count cache and partitionValues cache

3. add max compute regression case
This commit is contained in:
slothever
2023-12-01 22:28:26 +08:00
committed by GitHub
parent f4afcae452
commit 1706699e7e
25 changed files with 684 additions and 153 deletions

View File

@ -26,6 +26,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.catalog.external.MaxComputeExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
@ -37,6 +38,7 @@ import org.apache.doris.common.proc.ProcService;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.MaxComputeExternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
@ -125,7 +127,8 @@ public class ShowPartitionsStmt extends ShowStmt {
}
DatabaseIf db = catalog.getDbOrAnalysisException(dbName);
TableIf table = db.getTableOrMetaException(tblName, Table.TableType.OLAP, TableType.HMS_EXTERNAL_TABLE);
TableIf table = db.getTableOrMetaException(tblName, Table.TableType.OLAP,
TableType.HMS_EXTERNAL_TABLE, TableType.MAX_COMPUTE_EXTERNAL_TABLE);
if (table instanceof HMSExternalTable) {
if (((HMSExternalTable) table).isView()) {
@ -137,6 +140,13 @@ public class ShowPartitionsStmt extends ShowStmt {
return;
}
if (table instanceof MaxComputeExternalTable) {
if (((MaxComputeExternalTable) table).getOdpsTable().getPartitions().isEmpty()) {
throw new AnalysisException("Table " + tblName + " is not a partitioned table");
}
return;
}
table.readLock();
try {
// build proc path
@ -169,7 +179,8 @@ public class ShowPartitionsStmt extends ShowStmt {
}
// disallow unsupported catalog
if (!(catalog.isInternalCatalog() || catalog instanceof HMSExternalCatalog)) {
if (!(catalog.isInternalCatalog() || catalog instanceof HMSExternalCatalog
|| catalog instanceof MaxComputeExternalCatalog)) {
throw new AnalysisException(String.format("Catalog of type '%s' is not allowed in ShowPartitionsStmt",
catalog.getType()));
}

View File

@ -17,25 +17,25 @@
package org.apache.doris.catalog.external;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.Predicate;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.MaxComputeCacheKey;
import org.apache.doris.datasource.MaxComputeExternalCatalog;
import org.apache.doris.datasource.MaxComputeMetadataCache;
import org.apache.doris.planner.external.TablePartitionValues;
import org.apache.doris.thrift.TMCTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.aliyun.odps.OdpsType;
import com.aliyun.odps.Table;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.type.ArrayTypeInfo;
import com.aliyun.odps.type.CharTypeInfo;
import com.aliyun.odps.type.DecimalTypeInfo;
@ -43,17 +43,15 @@ import com.aliyun.odps.type.MapTypeInfo;
import com.aliyun.odps.type.StructTypeInfo;
import com.aliyun.odps.type.TypeInfo;
import com.aliyun.odps.type.VarcharTypeInfo;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
import java.util.stream.Collectors;
/**
* MaxCompute external table.
@ -61,8 +59,9 @@ import java.util.StringJoiner;
public class MaxComputeExternalTable extends ExternalTable {
private Table odpsTable;
private Set<String> partitionKeys;
private String partitionSpec;
private List<String> partitionSpecs;
private Map<String, Column> partitionNameToColumns;
private List<Type> partitionTypes;
public MaxComputeExternalTable(long id, String name, String dbName, MaxComputeExternalCatalog catalog) {
super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE);
@ -73,12 +72,80 @@ public class MaxComputeExternalTable extends ExternalTable {
super.makeSureInitialized();
if (!objectCreated) {
odpsTable = ((MaxComputeExternalCatalog) catalog).getClient().tables().get(name);
initTablePartitions();
objectCreated = true;
}
}
public long getTotalRows() throws TunnelException {
// use for non-partitioned table
// partition table will read the entire partition on FE so get total rows is unnecessary.
makeSureInitialized();
MaxComputeMetadataCache metadataCache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMaxComputeMetadataCache(catalog.getId());
MaxComputeExternalCatalog mcCatalog = ((MaxComputeExternalCatalog) catalog);
return metadataCache.getCachedRowCount(dbName, name, null, () -> mcCatalog.getTableTunnel()
.getDownloadSession(dbName, name, null)
.getRecordCount());
}
@Override
public Set<String> getPartitionNames() {
makeSureInitialized();
return partitionNameToColumns.keySet();
}
public List<Column> getPartitionColumns() {
makeSureInitialized();
return new ArrayList<>(partitionNameToColumns.values());
}
public TablePartitionValues getPartitionValues() {
makeSureInitialized();
// Make sure to call it after initSchema() completes
String projectName = odpsTable.getProject();
String tableName = odpsTable.getName();
MaxComputeMetadataCache metadataCache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMaxComputeMetadataCache(catalog.getId());
return metadataCache.getCachedPartitionValues(
new MaxComputeCacheKey(projectName, tableName),
() -> {
TablePartitionValues partitionValues = new TablePartitionValues();
partitionValues.addPartitions(partitionSpecs,
partitionSpecs.stream()
.map(p -> parsePartitionValues(new ArrayList<>(getPartitionNames()), p))
.collect(Collectors.toList()),
partitionTypes);
return partitionValues;
});
}
/**
* parse all values from partitionPath to a single list.
* @param partitionColumns partitionColumns can contain the part1,part2,part3...
* @param partitionPath partitionPath format is like the 'part1=123/part2=abc/part3=1bc'
* @return all values of partitionPath
*/
private static List<String> parsePartitionValues(List<String> partitionColumns, String partitionPath) {
String[] partitionFragments = partitionPath.split("/");
if (partitionFragments.length != partitionColumns.size()) {
throw new RuntimeException("Failed to parse partition values of path: " + partitionPath);
}
List<String> partitionValues = new ArrayList<>(partitionFragments.length);
for (int i = 0; i < partitionFragments.length; i++) {
String prefix = partitionColumns.get(i) + "=";
if (partitionFragments[i].startsWith(prefix)) {
partitionValues.add(partitionFragments[i].substring(prefix.length()));
} else {
partitionValues.add(partitionFragments[i]);
}
}
return partitionValues;
}
@Override
public List<Column> initSchema() {
// this method will be called at semantic parsing.
makeSureInitialized();
List<com.aliyun.odps.Column> columns = odpsTable.getSchema().getColumns();
List<Column> result = Lists.newArrayListWithCapacity(columns.size());
@ -86,72 +153,31 @@ public class MaxComputeExternalTable extends ExternalTable {
result.add(new Column(field.getName(), mcTypeToDorisType(field.getTypeInfo()), true, null,
true, field.getComment(), true, -1));
}
List<com.aliyun.odps.Column> partitionColumns = odpsTable.getSchema().getPartitionColumns();
partitionKeys = new HashSet<>();
for (com.aliyun.odps.Column partColumn : partitionColumns) {
result.add(new Column(partColumn.getName(), mcTypeToDorisType(partColumn.getTypeInfo()), true, null,
true, partColumn.getComment(), true, -1));
partitionKeys.add(partColumn.getName());
}
result.addAll(partitionNameToColumns.values());
return result;
}
public Optional<String> getPartitionSpec(List<Expr> conjuncts) {
if (!partitionKeys.isEmpty()) {
if (conjuncts.isEmpty()) {
throw new IllegalArgumentException("Max Compute partition table need partition predicate.");
}
// recreate partitionSpec when conjuncts is changed.
List<String> partitionConjuncts = parsePartitionConjuncts(conjuncts, partitionKeys);
StringJoiner partitionSpec = new StringJoiner(",");
partitionConjuncts.forEach(partitionSpec::add);
this.partitionSpec = partitionSpec.toString();
return Optional.of(this.partitionSpec);
private void initTablePartitions() {
List<com.aliyun.odps.Column> partitionColumns = odpsTable.getSchema().getPartitionColumns();
if (!partitionColumns.isEmpty()) {
partitionSpecs = odpsTable.getPartitions().stream()
.map(e -> e.getPartitionSpec().toString(false, true))
.collect(Collectors.toList());
} else {
partitionSpecs = ImmutableList.of();
}
return Optional.empty();
}
private static List<String> parsePartitionConjuncts(List<Expr> conjuncts, Set<String> partitionKeys) {
List<String> partitionConjuncts = new ArrayList<>();
Set<Predicate> predicates = Sets.newHashSet();
for (Expr conjunct : conjuncts) {
// collect depart predicate
conjunct.collect(BinaryPredicate.class, predicates);
conjunct.collect(InPredicate.class, predicates);
// sort partition columns to align partitionTypes and partitionName.
partitionNameToColumns = new LinkedHashMap<>();
for (com.aliyun.odps.Column partColumn : partitionColumns) {
Column dorisCol = new Column(partColumn.getName(),
mcTypeToDorisType(partColumn.getTypeInfo()), true, null,
true, partColumn.getComment(), true, -1);
partitionNameToColumns.put(dorisCol.getName(), dorisCol);
}
Map<String, Predicate> slotToConjuncts = new HashMap<>();
for (Predicate predicate : predicates) {
List<SlotRef> slotRefs = new ArrayList<>();
if (predicate instanceof BinaryPredicate) {
if (((BinaryPredicate) predicate).getOp() != BinaryPredicate.Operator.EQ) {
// max compute only support the EQ operator: pt='pt-value'
continue;
}
// BinaryPredicate has one left slotRef, and partition value not slotRef
predicate.collect(SlotRef.class, slotRefs);
slotToConjuncts.put(slotRefs.get(0).getColumnName(), predicate);
} else if (predicate instanceof InPredicate) {
predicate.collect(SlotRef.class, slotRefs);
slotToConjuncts.put(slotRefs.get(0).getColumnName(), predicate);
}
}
for (String partitionKey : partitionKeys) {
Predicate partitionPredicate = slotToConjuncts.get(partitionKey);
if (partitionPredicate == null) {
continue;
}
if (partitionPredicate instanceof InPredicate) {
List<Expr> inList = ((InPredicate) partitionPredicate).getListChildren();
for (Expr expr : inList) {
String partitionConjunct = partitionKey + "=" + expr.toSql();
partitionConjuncts.add(partitionConjunct.replace("`", ""));
}
} else {
String partitionConjunct = partitionPredicate.toSql();
partitionConjuncts.add(partitionConjunct.replace("`", ""));
}
}
return partitionConjuncts;
partitionTypes = partitionNameToColumns.values()
.stream()
.map(Column::getType)
.collect(Collectors.toList());
}
private Type mcTypeToDorisType(TypeInfo typeInfo) {
@ -241,11 +267,10 @@ public class MaxComputeExternalTable extends ExternalTable {
public TTableDescriptor toThrift() {
List<Column> schema = getFullSchema();
TMCTable tMcTable = new TMCTable();
MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) catalog;
MaxComputeExternalCatalog mcCatalog = ((MaxComputeExternalCatalog) catalog);
tMcTable.setRegion(mcCatalog.getRegion());
tMcTable.setAccessKey(mcCatalog.getAccessKey());
tMcTable.setSecretKey(mcCatalog.getSecretKey());
tMcTable.setPartitionSpec(this.partitionSpec);
tMcTable.setPublicAccess(String.valueOf(mcCatalog.enablePublicAccess()));
// use mc project as dbName
tMcTable.setProject(dbName);
@ -257,6 +282,7 @@ public class MaxComputeExternalTable extends ExternalTable {
}
public Table getOdpsTable() {
makeSureInitialized();
return odpsTable;
}
@ -264,6 +290,5 @@ public class MaxComputeExternalTable extends ExternalTable {
public String getMysqlType() {
return "BASE TABLE";
}
}

View File

@ -54,6 +54,7 @@ public class ExternalMetaCacheMgr {
// all catalogs could share the same fsCache.
private FileSystemCache fsCache;
private final IcebergMetadataCacheMgr icebergMetadataCacheMgr;
private final MaxComputeMetadataCacheMgr maxComputeMetadataCacheMgr;
public ExternalMetaCacheMgr() {
executor = ThreadPoolManager.newDaemonFixedThreadPool(
@ -63,6 +64,7 @@ public class ExternalMetaCacheMgr {
hudiPartitionMgr = HudiPartitionMgr.get(executor);
fsCache = new FileSystemCache(executor);
icebergMetadataCacheMgr = new IcebergMetadataCacheMgr();
maxComputeMetadataCacheMgr = new MaxComputeMetadataCacheMgr();
}
public HiveMetaStoreCache getMetaStoreCache(HMSExternalCatalog catalog) {
@ -99,6 +101,10 @@ public class ExternalMetaCacheMgr {
return icebergMetadataCacheMgr.getIcebergMetadataCache();
}
public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
return maxComputeMetadataCacheMgr.getMaxComputeMetadataCache(catalogId);
}
public FileSystemCache getFsCache() {
return fsCache;
}
@ -112,6 +118,7 @@ public class ExternalMetaCacheMgr {
}
hudiPartitionMgr.removePartitionProcessor(catalogId);
icebergMetadataCacheMgr.removeCache(catalogId);
maxComputeMetadataCacheMgr.removeCache(catalogId);
}
public void invalidateTableCache(long catalogId, String dbName, String tblName) {
@ -126,6 +133,7 @@ public class ExternalMetaCacheMgr {
}
hudiPartitionMgr.cleanTablePartitions(catalogId, dbName, tblName);
icebergMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
maxComputeMetadataCacheMgr.invalidateTableCache(catalogId, dbName, tblName);
LOG.debug("invalid table cache for {}.{} in catalog {}", dbName, tblName, catalogId);
}
@ -141,6 +149,7 @@ public class ExternalMetaCacheMgr {
}
hudiPartitionMgr.cleanDatabasePartitions(catalogId, dbName);
icebergMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
maxComputeMetadataCacheMgr.invalidateDbCache(catalogId, dbName);
LOG.debug("invalid db cache for {} in catalog {}", dbName, catalogId);
}
@ -155,6 +164,7 @@ public class ExternalMetaCacheMgr {
}
hudiPartitionMgr.cleanPartitionProcess(catalogId);
icebergMetadataCacheMgr.invalidateCatalogCache(catalogId);
maxComputeMetadataCacheMgr.invalidateCatalogCache(catalogId);
LOG.debug("invalid catalog cache for {}", catalogId);
}

View File

@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import lombok.Data;
import java.util.Objects;
@Data
public class MaxComputeCacheKey {
private final String dbName;
private final String tblName;
private String partitionSpec; // optional
public MaxComputeCacheKey(String dbName, String tblName) {
this(dbName, tblName, null);
}
public MaxComputeCacheKey(String dbName, String tblName, String partitionSpec) {
this.dbName = dbName;
this.tblName = tblName;
this.partitionSpec = partitionSpec;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (!(obj instanceof MaxComputeCacheKey)) {
return false;
}
boolean partitionEquals = true;
if (partitionSpec != null) {
partitionEquals = partitionSpec.equals(((MaxComputeCacheKey) obj).partitionSpec);
}
return partitionEquals && dbName.equals(((MaxComputeCacheKey) obj).dbName)
&& tblName.equals(((MaxComputeCacheKey) obj).tblName);
}
@Override
public int hashCode() {
return Objects.hash(dbName, tblName);
}
@Override
public String toString() {
return "TablePartitionKey{" + "dbName='" + dbName + '\'' + ", tblName='" + tblName + '\'' + '}';
}
}

View File

@ -24,22 +24,23 @@ import org.apache.doris.datasource.property.constants.MCProperties;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.Partition;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TunnelException;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.gson.annotations.SerializedName;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
public class MaxComputeExternalCatalog extends ExternalCatalog {
private Odps odps;
private TableTunnel tunnel;
@SerializedName(value = "region")
private String region;
@SerializedName(value = "accessKey")
@ -93,23 +94,17 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
}
odps.setEndpoint(odpsUrl);
odps.setDefaultProject(defaultProject);
}
public long getTotalRows(String project, String table, Optional<String> partitionSpec) throws TunnelException {
makeSureInitialized();
TableTunnel tunnel = new TableTunnel(odps);
tunnel = new TableTunnel(odps);
String tunnelUrl = tunnelUrlTemplate.replace("{}", region);
if (enablePublicAccess) {
tunnelUrl = tunnelUrl.replace("-inc", "");
}
TableTunnel.DownloadSession downloadSession;
tunnel.setEndpoint(tunnelUrl);
if (!partitionSpec.isPresent()) {
downloadSession = tunnel.getDownloadSession(project, table, null);
} else {
downloadSession = tunnel.getDownloadSession(project, table, new PartitionSpec(partitionSpec.get()), null);
}
return downloadSession.getRecordCount();
}
public TableTunnel getTableTunnel() {
makeSureInitialized();
return tunnel;
}
public Odps getClient() {
@ -139,6 +134,42 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
}
}
public List<String> listPartitionNames(String dbName, String tbl) {
return listPartitionNames(dbName, tbl, 0, -1);
}
public List<String> listPartitionNames(String dbName, String tbl, long skip, long limit) {
try {
if (getClient().projects().exists(dbName)) {
List<Partition> parts;
if (limit < 0) {
parts = getClient().tables().get(tbl).getPartitions();
} else {
skip = skip < 0 ? 0 : skip;
parts = new ArrayList<>();
Iterator<Partition> it = getClient().tables().get(tbl).getPartitionIterator();
int count = 0;
while (it.hasNext()) {
if (count < skip) {
count++;
it.next();
} else if (parts.size() >= limit) {
break;
} else {
parts.add(it.next());
}
}
}
return parts.stream().map(p -> p.getPartitionSpec().toString(false, true))
.collect(Collectors.toList());
} else {
throw new OdpsException("Max compute project: " + dbName + " not exists.");
}
} catch (OdpsException e) {
throw new RuntimeException(e);
}
}
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();

View File

@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.common.Config;
import org.apache.doris.planner.external.TablePartitionValues;
import com.aliyun.odps.tunnel.TunnelException;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class MaxComputeMetadataCache {
private final Cache<MaxComputeCacheKey, TablePartitionValues> partitionValuesCache;
private final Cache<MaxComputeCacheKey, Long> tableRowCountCache;
public MaxComputeMetadataCache() {
partitionValuesCache = CacheBuilder.newBuilder().maximumSize(Config.max_hive_partition_cache_num)
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES)
.build();
tableRowCountCache = CacheBuilder.newBuilder().maximumSize(10000)
.expireAfterAccess(Config.external_cache_expire_time_minutes_after_access, TimeUnit.MINUTES)
.build();
}
public Long getCachedRowCount(String dbName, String tblName, String partitionSpec,
Callable<? extends Long> loader) throws TunnelException {
try {
MaxComputeCacheKey tablePartitionKey = new MaxComputeCacheKey(dbName, tblName, partitionSpec);
return tableRowCountCache.get(tablePartitionKey, loader);
} catch (ExecutionException e) {
throw new TunnelException(e.getMessage(), e);
}
}
public TablePartitionValues getCachedPartitionValues(MaxComputeCacheKey tablePartitionKey,
Callable<? extends TablePartitionValues> loader) {
try {
return partitionValuesCache.get(tablePartitionKey, loader);
} catch (ExecutionException e) {
throw new RuntimeException("Fail to load partition values for table:"
+ " '" + tablePartitionKey.getDbName() + "." + tablePartitionKey.getTblName() + "'");
}
}
public void cleanUp() {
partitionValuesCache.invalidateAll();
tableRowCountCache.invalidateAll();
}
public void cleanDatabaseCache(String dbName) {
List<MaxComputeCacheKey> removeCacheList = partitionValuesCache.asMap().keySet()
.stream()
.filter(k -> k.getDbName().equalsIgnoreCase(dbName))
.collect(Collectors.toList());
partitionValuesCache.invalidateAll(removeCacheList);
List<MaxComputeCacheKey> removeCacheRowCountList = tableRowCountCache.asMap().keySet()
.stream()
.filter(k -> k.getDbName().equalsIgnoreCase(dbName))
.collect(Collectors.toList());
tableRowCountCache.invalidateAll(removeCacheRowCountList);
}
public void cleanTableCache(String dbName, String tblName) {
MaxComputeCacheKey cacheKey = new MaxComputeCacheKey(dbName, tblName);
partitionValuesCache.invalidate(cacheKey);
tableRowCountCache.invalidate(cacheKey);
}
}

View File

@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import com.google.common.collect.Maps;
import java.util.Map;
public class MaxComputeMetadataCacheMgr {
private static final Map<Long, MaxComputeMetadataCache> maxComputeMetadataCaches = Maps.newConcurrentMap();
public MaxComputeMetadataCache getMaxComputeMetadataCache(long catalogId) {
MaxComputeMetadataCache cache = maxComputeMetadataCaches.get(catalogId);
if (cache == null) {
cache = new MaxComputeMetadataCache();
maxComputeMetadataCaches.put(catalogId, cache);
}
return cache;
}
public void removeCache(long catalogId) {
MaxComputeMetadataCache cache = maxComputeMetadataCaches.remove(catalogId);
if (cache != null) {
cache.cleanUp();
}
}
public void invalidateCatalogCache(long catalogId) {
MaxComputeMetadataCache cache = maxComputeMetadataCaches.get(catalogId);
if (cache != null) {
cache.cleanUp();
}
}
public void invalidateDbCache(long catalogId, String dbName) {
MaxComputeMetadataCache cache = maxComputeMetadataCaches.get(catalogId);
if (cache != null) {
cache.cleanDatabaseCache(dbName);
}
}
public void invalidateTableCache(long catalogId, String dbName, String tblName) {
MaxComputeMetadataCache cache = maxComputeMetadataCaches.get(catalogId);
if (cache != null) {
cache.cleanTableCache(dbName, tblName);
}
}
}

View File

@ -27,7 +27,6 @@ import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
@ -126,9 +125,15 @@ public class PooledHiveMetaStoreClient {
}
public List<String> listPartitionNames(String dbName, String tblName) {
return listPartitionNames(dbName, tblName, MAX_LIST_PARTITION_NUM);
}
public List<String> listPartitionNames(String dbName, String tblName, long max) {
// list all parts when the limit is greater than the short maximum
short limited = max <= Short.MAX_VALUE ? (short) max : MAX_LIST_PARTITION_NUM;
try (CachedClient client = getClient()) {
try {
return client.client.listPartitionNames(dbName, tblName, MAX_LIST_PARTITION_NUM);
return client.client.listPartitionNames(dbName, tblName, limited);
} catch (Exception e) {
client.setThrowable(e);
throw e;

View File

@ -365,6 +365,8 @@ public abstract class FileQueryScanNode extends FileScanNode {
PaimonScanNode.setPaimonParams(rangeDesc, (PaimonSplit) fileSplit);
} else if (fileSplit instanceof HudiSplit) {
HudiScanNode.setHudiParams(rangeDesc, (HudiSplit) fileSplit);
} else if (fileSplit instanceof MaxComputeSplit) {
MaxComputeScanNode.setScanParams(rangeDesc, (MaxComputeSplit) fileSplit);
}
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);

View File

@ -18,26 +18,33 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.MaxComputeExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.MaxComputeExternalCatalog;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TMaxComputeFileDesc;
import org.apache.doris.thrift.TTableFormatFileDesc;
import com.aliyun.odps.Table;
import com.aliyun.odps.tunnel.TunnelException;
import org.apache.hadoop.fs.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class MaxComputeScanNode extends FileQueryScanNode {
@ -56,6 +63,17 @@ public class MaxComputeScanNode extends FileQueryScanNode {
catalog = (MaxComputeExternalCatalog) table.getCatalog();
}
public static void setScanParams(TFileRangeDesc rangeDesc, MaxComputeSplit maxComputeSplit) {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(TableFormatType.MAX_COMPUTE.value());
TMaxComputeFileDesc fileDesc = new TMaxComputeFileDesc();
if (maxComputeSplit.getPartitionSpec().isPresent()) {
fileDesc.setPartitionSpec(maxComputeSplit.getPartitionSpec().get());
}
tableFormatFileDesc.setMaxComputeParams(fileDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
@Override
protected TFileType getLocationType() throws UserException {
return getLocationType(null);
@ -89,43 +107,92 @@ public class MaxComputeScanNode extends FileQueryScanNode {
@Override
protected List<Split> getSplits() throws UserException {
List<Split> result = new ArrayList<>();
// String splitPath = catalog.getTunnelUrl();
// TODO: use single max compute scan node rather than file scan node
com.aliyun.odps.Table odpsTable = table.getOdpsTable();
if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) {
return result;
}
try {
List<Pair<Long, Long>> sliceRange = new ArrayList<>();
Optional<String> partitionSpec = table.getPartitionSpec(conjuncts);
long totalRows = catalog.getTotalRows(table.getDbName(), table.getName(), partitionSpec);
long fileNum = odpsTable.getFileNum();
long start = 0;
long splitSize = (long) Math.ceil((double) totalRows / fileNum);
if (splitSize <= 0 || totalRows < MIN_SPLIT_SIZE) {
// use whole split
sliceRange.add(Pair.of(start, totalRows));
if (!table.getPartitionNames().isEmpty()) {
if (conjuncts.isEmpty()) {
throw new IllegalArgumentException("Max Compute partition table need partition predicate.");
}
List<String> partitionSpecs = getPartitionSpecs();
for (String partitionSpec : partitionSpecs) {
addPartitionSplits(result, odpsTable, partitionSpec);
}
} else {
for (int i = 0; i < fileNum; i++) {
if (start > totalRows) {
break;
}
sliceRange.add(Pair.of(start, splitSize));
start += splitSize;
}
}
long modificationTime = odpsTable.getLastDataModifiedTime().getTime();
if (!sliceRange.isEmpty()) {
for (int i = 0; i < sliceRange.size(); i++) {
Pair<Long, Long> range = sliceRange.get(i);
result.add(new FileSplit(new Path("/virtual_slice_" + i), range.first, range.second,
totalRows, modificationTime, null, Collections.emptyList()));
}
addBatchSplits(result, odpsTable, table.getTotalRows());
}
} catch (TunnelException e) {
throw new UserException("Max Compute tunnel SDK exception.", e);
throw new UserException("Max Compute tunnel SDK exception: " + e.getMessage(), e);
}
return result;
}
private static void addPartitionSplits(List<Split> result, Table odpsTable, String partitionSpec) {
long modificationTime = odpsTable.getLastDataModifiedTime().getTime();
// use '-1' to read whole partition, avoid expending too much time on calling table.getTotalRows()
Pair<Long, Long> range = Pair.of(0L, -1L);
FileSplit rangeSplit = new FileSplit(new Path("/virtual_slice_part"),
range.first, range.second, -1, modificationTime, null, Collections.emptyList());
result.add(new MaxComputeSplit(partitionSpec, rangeSplit));
}
private static void addBatchSplits(List<Split> result, Table odpsTable, long totalRows) {
List<Pair<Long, Long>> sliceRange = new ArrayList<>();
long fileNum = odpsTable.getFileNum();
long start = 0;
long splitSize = (long) Math.ceil((double) totalRows / fileNum);
if (splitSize <= 0 || totalRows < MIN_SPLIT_SIZE) {
// use whole split
sliceRange.add(Pair.of(start, totalRows));
} else {
for (int i = 0; i < fileNum; i++) {
if (start > totalRows) {
break;
}
sliceRange.add(Pair.of(start, splitSize));
start += splitSize;
}
}
long modificationTime = odpsTable.getLastDataModifiedTime().getTime();
if (!sliceRange.isEmpty()) {
for (int i = 0; i < sliceRange.size(); i++) {
Pair<Long, Long> range = sliceRange.get(i);
FileSplit rangeSplit = new FileSplit(new Path("/virtual_slice_" + i),
range.first, range.second, totalRows, modificationTime, null, Collections.emptyList());
result.add(new MaxComputeSplit(rangeSplit));
}
}
}
private List<String> getPartitionSpecs() throws AnalysisException {
return getPrunedPartitionSpecs();
}
private List<String> getPrunedPartitionSpecs() throws AnalysisException {
List<String> result = new ArrayList<>();
TablePartitionValues partitionValues = table.getPartitionValues();
// prune partitions by expr
partitionValues.readLock().lock();
try {
Map<Long, PartitionItem> idToPartitionItem = partitionValues.getIdToPartitionItem();
this.totalPartitionNum = idToPartitionItem.size();
ListPartitionPrunerV2 pruner = new ListPartitionPrunerV2(idToPartitionItem,
table.getPartitionColumns(), columnNameToRange,
partitionValues.getUidToPartitionRange(),
partitionValues.getRangeToId(),
partitionValues.getSingleColumnRangeMap(),
false);
Collection<Long> filteredPartitionIds = pruner.prune();
this.readPartitionNum = filteredPartitionIds.size();
// get partitions from cache
Map<Long, String> partitionIdToNameMap = partitionValues.getPartitionIdToNameMap();
filteredPartitionIds.forEach(id -> result.add(partitionIdToNameMap.get(id)));
return result;
} finally {
partitionValues.readLock().unlock();
}
}
}

View File

@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner.external;
import java.util.Optional;
public class MaxComputeSplit extends FileSplit {
private final Optional<String> partitionSpec;
public MaxComputeSplit(FileSplit rangeSplit) {
super(rangeSplit.path, rangeSplit.start, rangeSplit.length, rangeSplit.fileLength,
rangeSplit.hosts, rangeSplit.partitionValues);
this.partitionSpec = Optional.empty();
}
public MaxComputeSplit(String partitionSpec, FileSplit rangeSplit) {
super(rangeSplit.path, rangeSplit.start, rangeSplit.length, rangeSplit.fileLength,
rangeSplit.hosts, rangeSplit.partitionValues);
this.partitionSpec = Optional.of(partitionSpec);
}
public Optional<String> getPartitionSpec() {
return partitionSpec;
}
}

View File

@ -22,6 +22,7 @@ public enum TableFormatType {
ICEBERG("iceberg"),
HUDI("hudi"),
PAIMON("paimon"),
MAX_COMPUTE("max_compute"),
TRANSACTIONAL_HIVE("transactional_hive");
private final String tableFormatType;

View File

@ -218,11 +218,16 @@ public class TablePartitionValues {
@Data
public static class TablePartitionKey {
private String dbName;
private String tblName;
private final String dbName;
private final String tblName;
// not in key
private List<Type> types;
public TablePartitionKey(String dbName, String tblName) {
this.dbName = dbName;
this.tblName = tblName;
}
public TablePartitionKey(String dbName, String tblName, List<Type> types) {
this.dbName = dbName;
this.tblName = tblName;

View File

@ -96,11 +96,11 @@ public class HudiCachedPartitionProcessor extends HudiPartitionProcessor {
if (Long.parseLong(timestamp) == lastTimestamp) {
return getPartitionValues(table, tableMetaClient);
}
List<String> partitionNames = getPartitionNamesBeforeOrEquals(timeline, timestamp);
List<String> partitionColumnsList = Arrays.asList(partitionColumns.get());
List<String> partitionNameAndValues = getPartitionNamesBeforeOrEquals(timeline, timestamp);
List<String> partitionNames = Arrays.asList(partitionColumns.get());
TablePartitionValues partitionValues = new TablePartitionValues();
partitionValues.addPartitions(partitionNames,
partitionNames.stream().map(p -> parsePartitionValues(partitionColumnsList, p))
partitionValues.addPartitions(partitionNameAndValues,
partitionNameAndValues.stream().map(p -> parsePartitionValues(partitionNames, p))
.collect(Collectors.toList()), table.getPartitionColumnTypes());
return partitionValues;
}

View File

@ -25,6 +25,7 @@ import org.apache.doris.analysis.AdminShowReplicaStatusStmt;
import org.apache.doris.analysis.AdminShowTabletStorageFormatStmt;
import org.apache.doris.analysis.DescribeStmt;
import org.apache.doris.analysis.HelpStmt;
import org.apache.doris.analysis.LimitElement;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.ShowAlterStmt;
import org.apache.doris.analysis.ShowAnalyzeStmt;
@ -179,6 +180,7 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.MaxComputeExternalCatalog;
import org.apache.doris.job.task.AbstractTask;
import org.apache.doris.load.DeleteHandler;
import org.apache.doris.load.ExportJobState;
@ -1660,18 +1662,49 @@ public class ShowExecutor {
List<List<String>> rows = ((PartitionsProcDir) procNodeI).fetchResultByFilter(showStmt.getFilterMap(),
showStmt.getOrderByPairs(), showStmt.getLimitElement()).getRows();
resultSet = new ShowResultSet(showStmt.getMetaData(), rows);
} else if (showStmt.getCatalog() instanceof MaxComputeExternalCatalog) {
handleShowMaxComputeTablePartitions(showStmt);
} else {
handleShowHMSTablePartitions(showStmt);
}
}
private void handleShowMaxComputeTablePartitions(ShowPartitionsStmt showStmt) {
MaxComputeExternalCatalog catalog = (MaxComputeExternalCatalog) (showStmt.getCatalog());
List<List<String>> rows = new ArrayList<>();
String dbName = ClusterNamespace.getNameFromFullName(showStmt.getTableName().getDb());
List<String> partitionNames;
LimitElement limit = showStmt.getLimitElement();
if (limit != null && limit.hasLimit()) {
partitionNames = catalog.listPartitionNames(dbName,
showStmt.getTableName().getTbl(), limit.getOffset(), limit.getLimit());
} else {
partitionNames = catalog.listPartitionNames(dbName, showStmt.getTableName().getTbl());
}
for (String partition : partitionNames) {
List<String> list = new ArrayList<>();
list.add(partition);
rows.add(list);
}
// sort by partition name
rows.sort(Comparator.comparing(x -> x.get(0)));
resultSet = new ShowResultSet(showStmt.getMetaData(), rows);
}
private void handleShowHMSTablePartitions(ShowPartitionsStmt showStmt) {
HMSExternalCatalog catalog = (HMSExternalCatalog) (showStmt.getCatalog());
List<List<String>> rows = new ArrayList<>();
String dbName = ClusterNamespace.getNameFromFullName(showStmt.getTableName().getDb());
List<String> partitionNames = catalog.getClient().listPartitionNames(dbName,
showStmt.getTableName().getTbl());
List<String> partitionNames;
LimitElement limit = showStmt.getLimitElement();
if (limit != null && limit.hasLimit()) {
// only limit is valid on Hive
partitionNames = catalog.getClient()
.listPartitionNames(dbName, showStmt.getTableName().getTbl(), limit.getLimit());
} else {
partitionNames = catalog.getClient().listPartitionNames(dbName, showStmt.getTableName().getTbl());
}
for (String partition : partitionNames) {
List<String> list = new ArrayList<>();
list.add(partition);