[fix](multi-catalog)fix maxcompute partition filter and session creation (#24911)

add maxcompute partition support
fix maxcompute partition filter
modify maxcompute session create method
This commit is contained in:
slothever
2023-10-17 22:36:10 +08:00
committed by GitHub
parent ce18f1148a
commit 18c2a13e09
13 changed files with 379 additions and 16 deletions

View File

@ -17,6 +17,11 @@
package org.apache.doris.catalog.external;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.InPredicate;
import org.apache.doris.analysis.Predicate;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MapType;
@ -39,9 +44,16 @@ import com.aliyun.odps.type.StructTypeInfo;
import com.aliyun.odps.type.TypeInfo;
import com.aliyun.odps.type.VarcharTypeInfo;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.StringJoiner;
/**
* MaxCompute external table.
@ -49,6 +61,8 @@ import java.util.List;
public class MaxComputeExternalTable extends ExternalTable {
private Table odpsTable;
private Set<String> partitionKeys;
private String partitionSpec;
public MaxComputeExternalTable(long id, String name, String dbName, MaxComputeExternalCatalog catalog) {
super(id, name, catalog, dbName, TableType.MAX_COMPUTE_EXTERNAL_TABLE);
@ -72,9 +86,74 @@ public class MaxComputeExternalTable extends ExternalTable {
result.add(new Column(field.getName(), mcTypeToDorisType(field.getTypeInfo()), true, null,
true, field.getComment(), true, -1));
}
List<com.aliyun.odps.Column> partitionColumns = odpsTable.getSchema().getPartitionColumns();
partitionKeys = new HashSet<>();
for (com.aliyun.odps.Column partColumn : partitionColumns) {
result.add(new Column(partColumn.getName(), mcTypeToDorisType(partColumn.getTypeInfo()), true, null,
true, partColumn.getComment(), true, -1));
partitionKeys.add(partColumn.getName());
}
return result;
}
public Optional<String> getPartitionSpec(List<Expr> conjuncts) {
if (!partitionKeys.isEmpty()) {
if (conjuncts.isEmpty()) {
throw new IllegalArgumentException("Max Compute partition table need partition predicate.");
}
// recreate partitionSpec when conjuncts is changed.
List<String> partitionConjuncts = parsePartitionConjuncts(conjuncts, partitionKeys);
StringJoiner partitionSpec = new StringJoiner(",");
partitionConjuncts.forEach(partitionSpec::add);
this.partitionSpec = partitionSpec.toString();
return Optional.of(this.partitionSpec);
}
return Optional.empty();
}
private static List<String> parsePartitionConjuncts(List<Expr> conjuncts, Set<String> partitionKeys) {
List<String> partitionConjuncts = new ArrayList<>();
Set<Predicate> predicates = Sets.newHashSet();
for (Expr conjunct : conjuncts) {
// collect depart predicate
conjunct.collect(BinaryPredicate.class, predicates);
conjunct.collect(InPredicate.class, predicates);
}
Map<String, Predicate> slotToConjuncts = new HashMap<>();
for (Predicate predicate : predicates) {
List<SlotRef> slotRefs = new ArrayList<>();
if (predicate instanceof BinaryPredicate) {
if (((BinaryPredicate) predicate).getOp() != BinaryPredicate.Operator.EQ) {
// max compute only support the EQ operator: pt='pt-value'
continue;
}
// BinaryPredicate has one left slotRef, and partition value not slotRef
predicate.collect(SlotRef.class, slotRefs);
slotToConjuncts.put(slotRefs.get(0).getColumnName(), predicate);
} else if (predicate instanceof InPredicate) {
predicate.collect(SlotRef.class, slotRefs);
slotToConjuncts.put(slotRefs.get(0).getColumnName(), predicate);
}
}
for (String partitionKey : partitionKeys) {
Predicate partitionPredicate = slotToConjuncts.get(partitionKey);
if (partitionPredicate == null) {
continue;
}
if (partitionPredicate instanceof InPredicate) {
List<Expr> inList = ((InPredicate) partitionPredicate).getListChildren();
for (Expr expr : inList) {
String partitionConjunct = partitionKey + "=" + expr.toSql();
partitionConjuncts.add(partitionConjunct.replace("`", ""));
}
} else {
String partitionConjunct = partitionPredicate.toSql();
partitionConjuncts.add(partitionConjunct.replace("`", ""));
}
}
return partitionConjuncts;
}
private Type mcTypeToDorisType(TypeInfo typeInfo) {
OdpsType odpsType = typeInfo.getOdpsType();
switch (odpsType) {
@ -166,6 +245,7 @@ public class MaxComputeExternalTable extends ExternalTable {
tMcTable.setRegion(mcCatalog.getRegion());
tMcTable.setAccessKey(mcCatalog.getAccessKey());
tMcTable.setSecretKey(mcCatalog.getSecretKey());
tMcTable.setPartitionSpec(this.partitionSpec);
tMcTable.setPublicAccess(String.valueOf(mcCatalog.enablePublicAccess()));
// use mc project as dbName
tMcTable.setProject(dbName);

View File

@ -24,6 +24,7 @@ import org.apache.doris.datasource.property.constants.MCProperties;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.tunnel.TableTunnel;
@ -35,6 +36,7 @@ import com.google.gson.annotations.SerializedName;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class MaxComputeExternalCatalog extends ExternalCatalog {
private Odps odps;
@ -93,15 +95,21 @@ public class MaxComputeExternalCatalog extends ExternalCatalog {
odps.setDefaultProject(defaultProject);
}
public long getTotalRows(String project, String table) throws TunnelException {
public long getTotalRows(String project, String table, Optional<String> partitionSpec) throws TunnelException {
makeSureInitialized();
TableTunnel tunnel = new TableTunnel(odps);
String tunnelUrl = tunnelUrlTemplate.replace("{}", region);
if (enablePublicAccess) {
tunnelUrl = tunnelUrl.replace("-inc", "");
}
TableTunnel.DownloadSession downloadSession;
tunnel.setEndpoint(tunnelUrl);
return tunnel.createDownloadSession(project, table).getRecordCount();
if (!partitionSpec.isPresent()) {
downloadSession = tunnel.getDownloadSession(project, table, null);
} else {
downloadSession = tunnel.getDownloadSession(project, table, new PartitionSpec(partitionSpec.get()), null);
}
return downloadSession.getRecordCount();
}
public Odps getClient() {

View File

@ -37,6 +37,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class MaxComputeScanNode extends FileQueryScanNode {
@ -96,7 +97,8 @@ public class MaxComputeScanNode extends FileQueryScanNode {
}
try {
List<Pair<Long, Long>> sliceRange = new ArrayList<>();
long totalRows = catalog.getTotalRows(table.getDbName(), table.getName());
Optional<String> partitionSpec = table.getPartitionSpec(conjuncts);
long totalRows = catalog.getTotalRows(table.getDbName(), table.getName(), partitionSpec);
long fileNum = odpsTable.getFileNum();
long start = 0;
long splitSize = (long) Math.ceil((double) totalRows / fileNum);