[opt](mtmv) Materialized view partition track supports date_trunc and optimize the fail reason (#35562) (#36947)
cherry pick from master #35562 commitId: 43d0f191
This commit is contained in:
@ -21,8 +21,6 @@ import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.ImmutableSortedSet;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
import java.util.List;
|
||||
@ -38,10 +36,6 @@ public class MTMVPartitionInfo {
|
||||
SELF_MANAGE
|
||||
}
|
||||
|
||||
public static final ImmutableSet<String> MTMV_PARTITION_FUNCTIONS = new ImmutableSortedSet.Builder<String>(
|
||||
String.CASE_INSENSITIVE_ORDER).add("date_trunc")
|
||||
.build();
|
||||
|
||||
@SerializedName("pt")
|
||||
private MTMVPartitionType partitionType;
|
||||
@SerializedName("rt")
|
||||
|
||||
@ -104,9 +104,9 @@ public class AsyncMaterializationContext extends MaterializationContext {
|
||||
return Optional.empty();
|
||||
}
|
||||
RelationId relationId = null;
|
||||
List<LogicalOlapScan> logicalOlapScan = this.getScanPlan().collectFirst(LogicalOlapScan.class::isInstance);
|
||||
if (!logicalOlapScan.isEmpty()) {
|
||||
relationId = logicalOlapScan.get(0).getRelationId();
|
||||
Optional<LogicalOlapScan> logicalOlapScan = this.getScanPlan().collectFirst(LogicalOlapScan.class::isInstance);
|
||||
if (logicalOlapScan.isPresent()) {
|
||||
relationId = logicalOlapScan.get().getRelationId();
|
||||
}
|
||||
return Optional.of(Pair.of(relationId, normalizeStatisticsColumnExpression(mtmvCache.getStatistics())));
|
||||
}
|
||||
|
||||
@ -27,12 +27,18 @@ import org.apache.doris.mtmv.MTMVRelatedTableIf;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.memo.Group;
|
||||
import org.apache.doris.nereids.memo.StructInfoMap;
|
||||
import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
|
||||
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
|
||||
import org.apache.doris.nereids.trees.expressions.Alias;
|
||||
import org.apache.doris.nereids.trees.expressions.ExprId;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.expressions.WindowExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.scalar.DateTrunc;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.Literal;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
|
||||
import org.apache.doris.nereids.trees.plans.JoinType;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.PreAggStatus;
|
||||
@ -48,10 +54,12 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
|
||||
import org.apache.doris.nereids.util.ExpressionUtils;
|
||||
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMultimap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Multimap;
|
||||
|
||||
@ -77,9 +85,11 @@ public class MaterializedViewUtils {
|
||||
* @param materializedViewPlan this should be rewritten or analyzed plan, should not be physical plan.
|
||||
* @param column ref column name.
|
||||
*/
|
||||
public static Optional<RelatedTableInfo> getRelatedTableInfo(String column, Plan materializedViewPlan) {
|
||||
public static RelatedTableInfo getRelatedTableInfo(String column, String timeUnit,
|
||||
Plan materializedViewPlan, CascadesContext cascadesContext) {
|
||||
|
||||
List<Slot> outputExpressions = materializedViewPlan.getOutput();
|
||||
Slot columnExpr = null;
|
||||
NamedExpression columnExpr = null;
|
||||
// get column slot
|
||||
for (Slot outputSlot : outputExpressions) {
|
||||
if (outputSlot.getName().equalsIgnoreCase(column)) {
|
||||
@ -88,14 +98,12 @@ public class MaterializedViewUtils {
|
||||
}
|
||||
}
|
||||
if (columnExpr == null) {
|
||||
return Optional.empty();
|
||||
return RelatedTableInfo.failWith("partition column can not find from sql select column");
|
||||
}
|
||||
if (!(columnExpr instanceof SlotReference)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
SlotReference columnSlot = (SlotReference) columnExpr;
|
||||
if (!columnSlot.isColumnFromTable()) {
|
||||
return Optional.empty();
|
||||
if (timeUnit != null) {
|
||||
Expression dateTrunc = new DateTrunc(columnExpr, new VarcharLiteral(timeUnit));
|
||||
columnExpr = new Alias(dateTrunc);
|
||||
materializedViewPlan = new LogicalProject<>(ImmutableList.of(columnExpr), materializedViewPlan);
|
||||
}
|
||||
// Collect table relation map which is used to identify self join
|
||||
List<CatalogRelation> catalogRelationObjs =
|
||||
@ -107,19 +115,20 @@ public class MaterializedViewUtils {
|
||||
}
|
||||
// Check sql pattern
|
||||
IncrementCheckerContext checkContext =
|
||||
new IncrementCheckerContext(columnSlot, tableCatalogRelationMultimapBuilder.build());
|
||||
new IncrementCheckerContext(columnExpr, tableCatalogRelationMultimapBuilder.build(), cascadesContext);
|
||||
materializedViewPlan.accept(MaterializedViewIncrementChecker.INSTANCE, checkContext);
|
||||
Multimap<TableIf, Column> partitionRelatedTableAndColumnMap =
|
||||
checkContext.getPartitionRelatedTableAndColumnMap();
|
||||
if (partitionRelatedTableAndColumnMap.isEmpty()) {
|
||||
return Optional.empty();
|
||||
return RelatedTableInfo.failWith(String.format("can't not find valid partition track column, because %s",
|
||||
String.join(",", checkContext.getFailReasons())));
|
||||
}
|
||||
// TODO support to return only one related table info, support multi later
|
||||
for (Map.Entry<TableIf, Column> entry : partitionRelatedTableAndColumnMap.entries()) {
|
||||
return Optional.of(new RelatedTableInfo(new BaseTableInfo(entry.getKey()), true,
|
||||
entry.getValue().getName()));
|
||||
return RelatedTableInfo.successWith(new BaseTableInfo(entry.getKey()), true,
|
||||
entry.getValue().getName(), checkContext.getPartitionExpression().orElseGet(() -> null));
|
||||
}
|
||||
return Optional.empty();
|
||||
return RelatedTableInfo.failWith("can't not find valid partition track column finally");
|
||||
}
|
||||
|
||||
/**
|
||||
@ -290,10 +299,63 @@ public class MaterializedViewUtils {
|
||||
DefaultPlanVisitor<Void, IncrementCheckerContext> {
|
||||
|
||||
public static final MaterializedViewIncrementChecker INSTANCE = new MaterializedViewIncrementChecker();
|
||||
public static final Set<Class<? extends Expression>> SUPPORT_EXPRESSION_TYPES =
|
||||
ImmutableSet.of(DateTrunc.class, SlotReference.class, Literal.class);
|
||||
|
||||
@Override
|
||||
public Void visitLogicalProject(LogicalProject<? extends Plan> project, IncrementCheckerContext context) {
|
||||
return visit(project, context);
|
||||
NamedExpression mvPartitionColumn = context.getMvPartitionColumn();
|
||||
List<Slot> output = project.getOutput();
|
||||
if (context.getMvPartitionColumn().isColumnFromTable()) {
|
||||
return visit(project, context);
|
||||
}
|
||||
for (Slot projectSlot : output) {
|
||||
if (!projectSlot.equals(mvPartitionColumn.toSlot())) {
|
||||
continue;
|
||||
}
|
||||
if (projectSlot.isColumnFromTable()) {
|
||||
context.setMvPartitionColumn(projectSlot);
|
||||
} else {
|
||||
// should be only use date_trunc
|
||||
Expression shuttledExpression =
|
||||
ExpressionUtils.shuttleExpressionWithLineage(projectSlot, project, new BitSet());
|
||||
// merge date_trunc
|
||||
shuttledExpression = new ExpressionNormalization().rewrite(shuttledExpression,
|
||||
new ExpressionRewriteContext(context.getCascadesContext()));
|
||||
|
||||
List<Expression> expressions = shuttledExpression.collectToList(Expression.class::isInstance);
|
||||
for (Expression expression : expressions) {
|
||||
if (SUPPORT_EXPRESSION_TYPES.stream().noneMatch(
|
||||
supportExpression -> supportExpression.isAssignableFrom(expression.getClass()))) {
|
||||
context.addFailReason(
|
||||
String.format("partition column use invalid implicit expression, invalid "
|
||||
+ "expression is %s", expression));
|
||||
return null;
|
||||
}
|
||||
}
|
||||
List<DateTrunc> dataTruncExpressions =
|
||||
shuttledExpression.collectToList(DateTrunc.class::isInstance);
|
||||
if (dataTruncExpressions.size() != 1) {
|
||||
// mv time unit level is little then query
|
||||
context.addFailReason("partition column time unit level should be "
|
||||
+ "greater than sql select column");
|
||||
return null;
|
||||
}
|
||||
Optional<Slot> columnExpr =
|
||||
shuttledExpression.getArgument(0).collectFirst(Slot.class::isInstance);
|
||||
if (!columnExpr.isPresent() || !columnExpr.get().isColumnFromTable()) {
|
||||
context.addFailReason(String.format("partition reference column should be direct column "
|
||||
+ "rather then expression except date_trunc, columnExpr is %s", columnExpr));
|
||||
return null;
|
||||
}
|
||||
context.setPartitionExpression(shuttledExpression);
|
||||
context.setMvPartitionColumn(columnExpr.get());
|
||||
}
|
||||
return visit(project, context);
|
||||
}
|
||||
context.addFailReason(String.format("partition reference column should be direct column "
|
||||
+ "rather then expression except date_trunc, current project is %s", project));
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -305,6 +367,7 @@ public class MaterializedViewUtils {
|
||||
public Void visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan> join,
|
||||
IncrementCheckerContext context) {
|
||||
if (join.isMarkJoin()) {
|
||||
context.addFailReason("partition track doesn't support mark join");
|
||||
return null;
|
||||
}
|
||||
Plan left = join.child(0);
|
||||
@ -313,7 +376,11 @@ public class MaterializedViewUtils {
|
||||
&& slot.isColumnFromTable())
|
||||
.map(slot -> ((SlotReference) slot).getColumn().get())
|
||||
.collect(Collectors.toSet());
|
||||
boolean useLeft = leftColumnSet.contains(context.getMvPartitionColumn().getColumn().get());
|
||||
SlotReference contextPartitionColumn = getContextPartitionColumn(context);
|
||||
if (contextPartitionColumn == null) {
|
||||
return null;
|
||||
}
|
||||
boolean useLeft = leftColumnSet.contains(contextPartitionColumn.getColumn().get());
|
||||
JoinType joinType = join.getJoinType();
|
||||
if (joinType.isInnerJoin() || joinType.isCrossJoin()) {
|
||||
return visit(join, context);
|
||||
@ -326,12 +393,16 @@ public class MaterializedViewUtils {
|
||||
|| joinType.isRightSemiJoin()) && !useLeft) {
|
||||
return visit(join.right(), context);
|
||||
}
|
||||
context.addFailReason(String.format("partition column is in un supported join null generate side, "
|
||||
+ "current join type is %s", joinType));
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void visitLogicalRelation(LogicalRelation relation, IncrementCheckerContext context) {
|
||||
if (!(relation instanceof LogicalCatalogRelation)) {
|
||||
context.addFailReason(String.format("relation should be LogicalCatalogRelation, "
|
||||
+ "but now is %s", relation.getClass().getSimpleName()));
|
||||
return null;
|
||||
}
|
||||
LogicalCatalogRelation logicalCatalogRelation = (LogicalCatalogRelation) relation;
|
||||
@ -339,25 +410,41 @@ public class MaterializedViewUtils {
|
||||
// if self join, self join can not partition track now, remove the partition column correspondingly
|
||||
if (context.getRelationByTable(table).size() > 1) {
|
||||
context.getPartitionRelatedTableAndColumnMap().removeAll(table);
|
||||
context.addFailReason(String.format("self join doesn't support partition update, "
|
||||
+ "self join table name is %s", table.getName()));
|
||||
return null;
|
||||
}
|
||||
// TODO: 2024/1/31 support only one partition referenced column, support multi later
|
||||
if (!context.getPartitionRelatedTableAndColumnMap().isEmpty()) {
|
||||
context.addFailReason(String.format("partition track already has an related base table column,"
|
||||
+ "track info is %s", context.getPartitionRelatedTableAndColumnMap()));
|
||||
return null;
|
||||
}
|
||||
if (!(table instanceof MTMVRelatedTableIf)) {
|
||||
context.addFailReason(String.format("relation base table is not MTMVRelatedTableIf, the table is %s",
|
||||
table.getName()));
|
||||
return null;
|
||||
}
|
||||
MTMVRelatedTableIf relatedTable = (MTMVRelatedTableIf) table;
|
||||
PartitionType type = relatedTable.getPartitionType();
|
||||
if (PartitionType.UNPARTITIONED.equals(type)) {
|
||||
context.addFailReason(String.format("related base table is not partition table, the table is %s",
|
||||
table.getName()));
|
||||
return null;
|
||||
}
|
||||
Set<Column> partitionColumnSet = new HashSet<>(relatedTable.getPartitionColumns());
|
||||
Column mvReferenceColumn = context.getMvPartitionColumn().getColumn().get();
|
||||
SlotReference contextPartitionColumn = getContextPartitionColumn(context);
|
||||
if (contextPartitionColumn == null) {
|
||||
return null;
|
||||
}
|
||||
Column mvReferenceColumn = contextPartitionColumn.getColumn().get();
|
||||
if (partitionColumnSet.contains(mvReferenceColumn)
|
||||
&& (!mvReferenceColumn.isAllowNull() || relatedTable.isPartitionColumnAllowNull())) {
|
||||
context.addTableColumn(table, mvReferenceColumn);
|
||||
} else {
|
||||
context.addFailReason(String.format("related base table partition column doesn't contain the mv"
|
||||
+ " partition or partition nullable check fail, the mvReferenceColumn is %s",
|
||||
mvReferenceColumn));
|
||||
}
|
||||
return visit(relation, context);
|
||||
}
|
||||
@ -367,6 +454,7 @@ public class MaterializedViewUtils {
|
||||
IncrementCheckerContext context) {
|
||||
Set<Expression> groupByExprSet = new HashSet<>(aggregate.getGroupByExpressions());
|
||||
if (groupByExprSet.isEmpty()) {
|
||||
context.addFailReason("group by sets is empty, doesn't contain the target partition");
|
||||
return null;
|
||||
}
|
||||
Set<Column> originalGroupbyExprSet = new HashSet<>();
|
||||
@ -375,7 +463,12 @@ public class MaterializedViewUtils {
|
||||
originalGroupbyExprSet.add(((SlotReference) groupExpr).getColumn().get());
|
||||
}
|
||||
});
|
||||
if (!originalGroupbyExprSet.contains(context.getMvPartitionColumn().getColumn().get())) {
|
||||
SlotReference contextPartitionColumn = getContextPartitionColumn(context);
|
||||
if (contextPartitionColumn == null) {
|
||||
return null;
|
||||
}
|
||||
if (!originalGroupbyExprSet.contains(contextPartitionColumn.getColumn().get())) {
|
||||
context.addFailReason("group by sets doesn't contain the target partition");
|
||||
return null;
|
||||
}
|
||||
return visit(aggregate, context);
|
||||
@ -389,6 +482,7 @@ public class MaterializedViewUtils {
|
||||
}
|
||||
for (NamedExpression namedExpression : windowExpressions) {
|
||||
if (!checkWindowPartition(namedExpression, context)) {
|
||||
context.addFailReason("window partition sets doesn't contain the target partition");
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -421,29 +515,52 @@ public class MaterializedViewUtils {
|
||||
originalPartitionbyExprSet.add(((SlotReference) groupExpr).getColumn().get());
|
||||
}
|
||||
});
|
||||
if (!originalPartitionbyExprSet.contains(context.getMvPartitionColumn().getColumn().get())) {
|
||||
SlotReference contextPartitionColumn = getContextPartitionColumn(context);
|
||||
if (contextPartitionColumn == null) {
|
||||
return false;
|
||||
}
|
||||
if (!originalPartitionbyExprSet.contains(contextPartitionColumn.getColumn().get())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private SlotReference getContextPartitionColumn(IncrementCheckerContext context) {
|
||||
if (!context.getMvPartitionColumn().isColumnFromTable()) {
|
||||
context.addFailReason(String.format("context partition column should be slot from column, "
|
||||
+ "context column is %s",
|
||||
context.getMvPartitionColumn()));
|
||||
return null;
|
||||
}
|
||||
return (SlotReference) context.getMvPartitionColumn();
|
||||
}
|
||||
}
|
||||
|
||||
private static final class IncrementCheckerContext {
|
||||
private final SlotReference mvPartitionColumn;
|
||||
private NamedExpression mvPartitionColumn;
|
||||
private Optional<Expression> partitionExpression = Optional.empty();
|
||||
private final Multimap<TableIdentifier, CatalogRelation> tableAndCatalogRelationMap;
|
||||
private final Multimap<TableIf, Column> partitionRelatedTableAndColumnMap = HashMultimap.create();
|
||||
private final Set<String> failReasons = new HashSet<>();
|
||||
private final CascadesContext cascadesContext;
|
||||
|
||||
public IncrementCheckerContext(SlotReference mvPartitionColumn,
|
||||
Multimap<TableIdentifier, CatalogRelation> tableAndCatalogRelationMap) {
|
||||
public IncrementCheckerContext(NamedExpression mvPartitionColumn,
|
||||
Multimap<TableIdentifier, CatalogRelation> tableAndCatalogRelationMap,
|
||||
CascadesContext cascadesContext) {
|
||||
this.mvPartitionColumn = mvPartitionColumn;
|
||||
this.tableAndCatalogRelationMap = tableAndCatalogRelationMap;
|
||||
this.cascadesContext = cascadesContext;
|
||||
}
|
||||
|
||||
public SlotReference getMvPartitionColumn() {
|
||||
public NamedExpression getMvPartitionColumn() {
|
||||
return mvPartitionColumn;
|
||||
}
|
||||
|
||||
public void setMvPartitionColumn(NamedExpression mvPartitionColumn) {
|
||||
this.mvPartitionColumn = mvPartitionColumn;
|
||||
}
|
||||
|
||||
public void addTableColumn(TableIf relatedTable, Column partitionColumn) {
|
||||
partitionRelatedTableAndColumnMap.put(relatedTable, partitionColumn);
|
||||
}
|
||||
@ -459,20 +576,56 @@ public class MaterializedViewUtils {
|
||||
public void addTableAndRelation(TableIf tableIf, CatalogRelation relation) {
|
||||
tableAndCatalogRelationMap.put(new TableIdentifier(tableIf), relation);
|
||||
}
|
||||
|
||||
public Set<String> getFailReasons() {
|
||||
return failReasons;
|
||||
}
|
||||
|
||||
public void addFailReason(String failReason) {
|
||||
this.failReasons.add(failReason);
|
||||
}
|
||||
|
||||
public CascadesContext getCascadesContext() {
|
||||
return cascadesContext;
|
||||
}
|
||||
|
||||
public Optional<Expression> getPartitionExpression() {
|
||||
return partitionExpression;
|
||||
}
|
||||
|
||||
public void setPartitionExpression(Expression partitionExpression) {
|
||||
this.partitionExpression = Optional.ofNullable(partitionExpression);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The related table info that mv relate
|
||||
*/
|
||||
public static final class RelatedTableInfo {
|
||||
private BaseTableInfo tableInfo;
|
||||
private boolean pctPossible;
|
||||
private String column;
|
||||
private final BaseTableInfo tableInfo;
|
||||
private final boolean pctPossible;
|
||||
private final String column;
|
||||
private final Set<String> failReasons = new HashSet<>();
|
||||
// This records the partition expression if exist
|
||||
private Optional<Expression> partitionExpression;
|
||||
|
||||
public RelatedTableInfo(BaseTableInfo tableInfo, boolean pctPossible, String column) {
|
||||
public RelatedTableInfo(BaseTableInfo tableInfo, boolean pctPossible, String column, String failReason,
|
||||
Expression partitionExpression) {
|
||||
this.tableInfo = tableInfo;
|
||||
this.pctPossible = pctPossible;
|
||||
this.column = column;
|
||||
this.failReasons.add(failReason);
|
||||
this.partitionExpression = Optional.ofNullable(partitionExpression);
|
||||
}
|
||||
|
||||
public static RelatedTableInfo failWith(String failReason) {
|
||||
return new RelatedTableInfo(null, false, null, failReason,
|
||||
null);
|
||||
}
|
||||
|
||||
public static RelatedTableInfo successWith(BaseTableInfo tableInfo, boolean pctPossible, String column,
|
||||
Expression partitionExpression) {
|
||||
return new RelatedTableInfo(tableInfo, pctPossible, column, "", partitionExpression);
|
||||
}
|
||||
|
||||
public BaseTableInfo getTableInfo() {
|
||||
@ -486,5 +639,17 @@ public class MaterializedViewUtils {
|
||||
public String getColumn() {
|
||||
return column;
|
||||
}
|
||||
|
||||
public void addFailReason(String failReason) {
|
||||
this.failReasons.add(failReason);
|
||||
}
|
||||
|
||||
public String getFailReason() {
|
||||
return String.join(",", failReasons);
|
||||
}
|
||||
|
||||
public Optional<Expression> getPartitionExpression() {
|
||||
return partitionExpression;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -24,6 +24,8 @@ import org.apache.doris.nereids.trees.expressions.functions.Function;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.agg.RollUpTrait;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
@ -63,7 +65,8 @@ public abstract class AggFunctionRollUpHandler {
|
||||
protected static List<Expression> extractArguments(Expression functionWithAny, Function actualFunction) {
|
||||
Set<Object> exprSetToRemove = functionWithAny.collectToSet(expr -> !(expr instanceof Any));
|
||||
return actualFunction.collectFirst(expr ->
|
||||
exprSetToRemove.stream().noneMatch(exprToRemove -> exprToRemove.equals(expr)));
|
||||
exprSetToRemove.stream().noneMatch(exprToRemove -> exprToRemove.equals(expr)))
|
||||
.map(expr -> ImmutableList.of((Expression) expr)).orElse(ImmutableList.of());
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -23,6 +23,7 @@ import org.apache.doris.nereids.rules.expression.rules.DigitalMaskingConvert;
|
||||
import org.apache.doris.nereids.rules.expression.rules.FoldConstantRule;
|
||||
import org.apache.doris.nereids.rules.expression.rules.InPredicateDedup;
|
||||
import org.apache.doris.nereids.rules.expression.rules.InPredicateToEqualToRule;
|
||||
import org.apache.doris.nereids.rules.expression.rules.MergeDateTrunc;
|
||||
import org.apache.doris.nereids.rules.expression.rules.NormalizeBinaryPredicatesRule;
|
||||
import org.apache.doris.nereids.rules.expression.rules.SimplifyArithmeticComparisonRule;
|
||||
import org.apache.doris.nereids.rules.expression.rules.SimplifyArithmeticRule;
|
||||
@ -53,6 +54,7 @@ public class ExpressionNormalization extends ExpressionRewrite {
|
||||
DigitalMaskingConvert.INSTANCE,
|
||||
SimplifyArithmeticComparisonRule.INSTANCE,
|
||||
ConvertAggStateCast.INSTANCE,
|
||||
MergeDateTrunc.INSTANCE,
|
||||
CheckCast.INSTANCE
|
||||
)
|
||||
);
|
||||
|
||||
@ -0,0 +1,78 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.rules.expression.rules;
|
||||
|
||||
import org.apache.doris.nereids.rules.expression.ExpressionPatternMatcher;
|
||||
import org.apache.doris.nereids.rules.expression.ExpressionPatternRuleFactory;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.scalar.DateTrunc;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.Interval.TimeUnit;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.Literal;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Rewrite rule to convert
|
||||
* For example:
|
||||
* date_trunc(date_trunc(data_slot, 'hour'), 'day') -> date_trunc(data_slot, 'day')
|
||||
*/
|
||||
public class MergeDateTrunc implements ExpressionPatternRuleFactory {
|
||||
|
||||
public static MergeDateTrunc INSTANCE = new MergeDateTrunc();
|
||||
public static ImmutableSet<TimeUnit> UN_SUPPORT_TIME_UNIT = ImmutableSet.of(TimeUnit.WEEK, TimeUnit.QUARTER);
|
||||
|
||||
@Override
|
||||
public List<ExpressionPatternMatcher<? extends Expression>> buildRules() {
|
||||
return ImmutableList.of(
|
||||
matchesTopType(DateTrunc.class)
|
||||
.when(dateTrunc -> dateTrunc.getArgument(0) instanceof DateTrunc)
|
||||
.then(MergeDateTrunc::rewrite)
|
||||
);
|
||||
}
|
||||
|
||||
private static Expression rewrite(DateTrunc dateTrunc) {
|
||||
|
||||
Expression parentTimeUnitArgument = dateTrunc.getArgument(1);
|
||||
if (!(parentTimeUnitArgument instanceof Literal)) {
|
||||
return dateTrunc;
|
||||
}
|
||||
Optional<TimeUnit> parentTimeUnit = TimeUnit.of(((Literal) parentTimeUnitArgument).getStringValue());
|
||||
DateTrunc childDateTrunc = (DateTrunc) dateTrunc.getArgument(0);
|
||||
Expression childTimeUnitArgument = childDateTrunc.getArgument(1);
|
||||
if (!(childTimeUnitArgument instanceof Literal)) {
|
||||
return dateTrunc;
|
||||
}
|
||||
Optional<TimeUnit> childTimeUnit = TimeUnit.of(((Literal) childTimeUnitArgument).getStringValue());
|
||||
if (!parentTimeUnit.isPresent() || !childTimeUnit.isPresent()) {
|
||||
return dateTrunc;
|
||||
}
|
||||
if (UN_SUPPORT_TIME_UNIT.contains(parentTimeUnit.get())
|
||||
|| UN_SUPPORT_TIME_UNIT.contains(childTimeUnit.get())) {
|
||||
return dateTrunc;
|
||||
}
|
||||
if (parentTimeUnit.get().getLevel() < childTimeUnit.get().getLevel()) {
|
||||
return dateTrunc;
|
||||
}
|
||||
return new DateTrunc(childDateTrunc.getArgument(0), new VarcharLiteral(parentTimeUnit.get().toString()));
|
||||
}
|
||||
}
|
||||
@ -271,7 +271,7 @@ public interface TreeNode<NODE_TYPE extends TreeNode<NODE_TYPE>> {
|
||||
/**
|
||||
* Collect the nodes that satisfied the predicate firstly.
|
||||
*/
|
||||
default <T> List<T> collectFirst(Predicate<TreeNode<NODE_TYPE>> predicate) {
|
||||
default <T> Optional<T> collectFirst(Predicate<TreeNode<NODE_TYPE>> predicate) {
|
||||
List<TreeNode<NODE_TYPE>> result = new ArrayList<>();
|
||||
foreach(node -> {
|
||||
if (result.isEmpty() && predicate.test(node)) {
|
||||
@ -279,7 +279,7 @@ public interface TreeNode<NODE_TYPE extends TreeNode<NODE_TYPE>> {
|
||||
}
|
||||
return !result.isEmpty();
|
||||
});
|
||||
return (List<T>) ImmutableList.copyOf(result);
|
||||
return result.isEmpty() ? Optional.empty() : Optional.of((T) result.get(0));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -25,6 +25,9 @@ import org.apache.doris.nereids.types.DataType;
|
||||
import org.apache.doris.nereids.types.DateType;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.commons.lang3.EnumUtils;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Interval for timestamp calculation.
|
||||
@ -61,30 +64,46 @@ public class Interval extends Expression implements LeafExpression, AlwaysNotNul
|
||||
* Supported time unit.
|
||||
*/
|
||||
public enum TimeUnit {
|
||||
YEAR("YEAR", false),
|
||||
MONTH("MONTH", false),
|
||||
WEEK("WEEK", false),
|
||||
DAY("DAY", false),
|
||||
HOUR("HOUR", true),
|
||||
MINUTE("MINUTE", true),
|
||||
SECOND("SECOND", true);
|
||||
YEAR("YEAR", false, 800),
|
||||
MONTH("MONTH", false, 700),
|
||||
QUARTER("QUARTER", false, 600),
|
||||
WEEK("WEEK", false, 500),
|
||||
DAY("DAY", false, 400),
|
||||
HOUR("HOUR", true, 300),
|
||||
MINUTE("MINUTE", true, 200),
|
||||
SECOND("SECOND", true, 100);
|
||||
|
||||
private final String description;
|
||||
|
||||
private final boolean isDateTimeUnit;
|
||||
/**
|
||||
* Time unit level, second level is low, year level is high
|
||||
*/
|
||||
private final int level;
|
||||
|
||||
TimeUnit(String description, boolean isDateTimeUnit) {
|
||||
TimeUnit(String description, boolean isDateTimeUnit, int level) {
|
||||
this.description = description;
|
||||
this.isDateTimeUnit = isDateTimeUnit;
|
||||
this.level = level;
|
||||
}
|
||||
|
||||
public boolean isDateTimeUnit() {
|
||||
return isDateTimeUnit;
|
||||
}
|
||||
|
||||
public int getLevel() {
|
||||
return level;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return description;
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct time unit by name
|
||||
*/
|
||||
public static Optional<TimeUnit> of(String name) {
|
||||
return Optional.ofNullable(EnumUtils.getEnumIgnoreCase(TimeUnit.class, name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -40,7 +40,10 @@ import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils.RelatedTableInfo;
|
||||
import org.apache.doris.nereids.trees.expressions.Cast;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.scalar.DateTrunc;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.Literal;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
|
||||
@ -51,7 +54,6 @@ import org.apache.doris.qe.SessionVariable;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -59,6 +61,7 @@ import java.util.stream.Collectors;
|
||||
* MTMVPartitionDefinition
|
||||
*/
|
||||
public class MTMVPartitionDefinition {
|
||||
public static final String PARTITION_BY_FUNCTION_NAME = "date_trunc";
|
||||
private MTMVPartitionType partitionType;
|
||||
private String partitionCol;
|
||||
private Expression functionCallExpression;
|
||||
@ -78,25 +81,39 @@ public class MTMVPartitionDefinition {
|
||||
return mtmvPartitionInfo;
|
||||
}
|
||||
String partitionColName;
|
||||
String timeUnit;
|
||||
if (this.partitionType == MTMVPartitionType.EXPR) {
|
||||
Expr expr;
|
||||
if (functionCallExpression instanceof UnboundFunction) {
|
||||
UnboundFunction function = (UnboundFunction) functionCallExpression;
|
||||
expr = new FunctionCallExpr(function.getName(),
|
||||
new FunctionParams(convertToLegacyArguments(function.children())));
|
||||
String functionName = ((UnboundFunction) functionCallExpression).getName();
|
||||
if (functionCallExpression instanceof UnboundFunction
|
||||
&& functionName.equalsIgnoreCase(PARTITION_BY_FUNCTION_NAME)) {
|
||||
partitionColName = functionCallExpression.getArgument(0) instanceof UnboundSlot
|
||||
? ((UnboundSlot) functionCallExpression.getArgument(0)).getName() : null;
|
||||
timeUnit = functionCallExpression.getArguments().get(1).isLiteral()
|
||||
? ((Literal) functionCallExpression.getArgument(1)).getStringValue() : null;
|
||||
} else {
|
||||
throw new AnalysisException(
|
||||
"unsupported auto partition expr " + functionCallExpression.toString());
|
||||
}
|
||||
partitionColName = getColNameFromExpr(expr);
|
||||
mtmvPartitionInfo.setExpr(expr);
|
||||
} else {
|
||||
partitionColName = this.partitionCol;
|
||||
timeUnit = null;
|
||||
}
|
||||
mtmvPartitionInfo.setPartitionCol(partitionColName);
|
||||
RelatedTableInfo relatedTableInfo = getRelatedTableInfo(planner, ctx, logicalQuery, partitionColName);
|
||||
RelatedTableInfo relatedTableInfo = getRelatedTableInfo(planner, ctx, logicalQuery, partitionColName, timeUnit);
|
||||
mtmvPartitionInfo.setRelatedCol(relatedTableInfo.getColumn());
|
||||
mtmvPartitionInfo.setRelatedTable(relatedTableInfo.getTableInfo());
|
||||
if (relatedTableInfo.getPartitionExpression().isPresent()) {
|
||||
// Set mv partition expr by relatedTableInfo, this is used for partition rollup and so on
|
||||
if (relatedTableInfo.getPartitionExpression().get().getExpressionName()
|
||||
.equalsIgnoreCase(PARTITION_BY_FUNCTION_NAME)) {
|
||||
DateTrunc dateTrunc = (DateTrunc) relatedTableInfo.getPartitionExpression().get();
|
||||
// todo use new expression?
|
||||
mtmvPartitionInfo.setExpr(new FunctionCallExpr(dateTrunc.getName(),
|
||||
new FunctionParams(convertToLegacyArguments(dateTrunc.children()))));
|
||||
mtmvPartitionInfo.setPartitionType(MTMVPartitionType.EXPR);
|
||||
this.partitionType = MTMVPartitionType.EXPR;
|
||||
}
|
||||
}
|
||||
if (this.partitionType == MTMVPartitionType.EXPR) {
|
||||
try {
|
||||
MTMVPartitionExprFactory.getExprService(mtmvPartitionInfo.getExpr()).analyze(mtmvPartitionInfo);
|
||||
@ -107,38 +124,10 @@ public class MTMVPartitionDefinition {
|
||||
return mtmvPartitionInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* getColNameFromExpr
|
||||
*
|
||||
* @param expr expr
|
||||
* @return String
|
||||
*/
|
||||
public static String getColNameFromExpr(Expr expr) {
|
||||
if (!(expr instanceof FunctionCallExpr)) {
|
||||
throw new AnalysisException(
|
||||
"auto create partition only support function call expr is: "
|
||||
+ MTMVPartitionInfo.MTMV_PARTITION_FUNCTIONS);
|
||||
}
|
||||
FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr;
|
||||
List<Expr> paramsExpr = functionCallExpr.getParams().exprs();
|
||||
String name = functionCallExpr.getFnName().getFunction();
|
||||
if (MTMVPartitionInfo.MTMV_PARTITION_FUNCTIONS.contains(name)) {
|
||||
for (Expr param : paramsExpr) {
|
||||
if (param instanceof SlotRef) {
|
||||
return ((SlotRef) param).getColumnName();
|
||||
}
|
||||
}
|
||||
throw new AnalysisException("can not find colName");
|
||||
} else {
|
||||
throw new AnalysisException(
|
||||
"auto create partition only support function call expr is: "
|
||||
+ MTMVPartitionInfo.MTMV_PARTITION_FUNCTIONS);
|
||||
}
|
||||
}
|
||||
|
||||
private RelatedTableInfo getRelatedTableInfo(NereidsPlanner planner, ConnectContext ctx, LogicalPlan
|
||||
logicalQuery,
|
||||
String partitionColName) {
|
||||
String partitionColName,
|
||||
String timeUnit) {
|
||||
CascadesContext cascadesContext = planner.getCascadesContext();
|
||||
SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable();
|
||||
Set<String> tempDisableRules = sessionVariable.getDisableNereidsRuleNames();
|
||||
@ -149,12 +138,13 @@ public class MTMVPartitionDefinition {
|
||||
try {
|
||||
Plan mvRewrittenPlan =
|
||||
planner.plan(logicalQuery, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
|
||||
Optional<RelatedTableInfo> relatedTableInfo = MaterializedViewUtils
|
||||
.getRelatedTableInfo(partitionColName, mvRewrittenPlan);
|
||||
if (!relatedTableInfo.isPresent() || !relatedTableInfo.get().isPctPossible()) {
|
||||
throw new AnalysisException("Unable to find a suitable base table for partitioning");
|
||||
RelatedTableInfo relatedTableInfo = MaterializedViewUtils
|
||||
.getRelatedTableInfo(partitionColName, timeUnit, mvRewrittenPlan, cascadesContext);
|
||||
if (!relatedTableInfo.isPctPossible()) {
|
||||
throw new AnalysisException(String.format("Unable to find a suitable base table for partitioning,"
|
||||
+ " the fail reason is %s", relatedTableInfo.getFailReason()));
|
||||
}
|
||||
MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.get().getTableInfo());
|
||||
MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.getTableInfo());
|
||||
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
|
||||
try {
|
||||
partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames());
|
||||
@ -162,14 +152,14 @@ public class MTMVPartitionDefinition {
|
||||
throw new AnalysisException(e.getMessage(), e);
|
||||
}
|
||||
|
||||
if (!partitionColumnNames.contains(relatedTableInfo.get().getColumn())) {
|
||||
throw new AnalysisException("error related column: " + relatedTableInfo.get().getColumn());
|
||||
if (!partitionColumnNames.contains(relatedTableInfo.getColumn())) {
|
||||
throw new AnalysisException("error related column: " + relatedTableInfo.getColumn());
|
||||
}
|
||||
if (!(mtmvBaseRealtedTable instanceof HMSExternalTable)
|
||||
&& partitionColumnNames.size() != 1) {
|
||||
throw new AnalysisException("only hms table support multi column partition.");
|
||||
}
|
||||
return relatedTableInfo.get();
|
||||
return relatedTableInfo;
|
||||
} finally {
|
||||
// after operate, roll back the disable rules
|
||||
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
|
||||
@ -178,15 +168,20 @@ public class MTMVPartitionDefinition {
|
||||
}
|
||||
|
||||
private static List<Expr> convertToLegacyArguments(List<Expression> children) {
|
||||
return children.stream().map(child -> {
|
||||
if (child instanceof UnboundSlot) {
|
||||
return new SlotRef(null, ((UnboundSlot) child).getName());
|
||||
} else if (child instanceof Literal) {
|
||||
return new StringLiteral(((Literal) child).getStringValue());
|
||||
} else {
|
||||
throw new AnalysisException("unsupported argument " + child.toString());
|
||||
}
|
||||
}).collect(Collectors.toList());
|
||||
return children.stream().map(MTMVPartitionDefinition::convertToLegacyRecursion).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private static Expr convertToLegacyRecursion(Expression expression) {
|
||||
if (expression instanceof Slot) {
|
||||
return new SlotRef(null, ((Slot) expression).getName());
|
||||
} else if (expression instanceof Literal) {
|
||||
return new StringLiteral(((Literal) expression).getStringValue());
|
||||
} else if (expression instanceof Cast) {
|
||||
// mv partition roll up only need the slot in cast
|
||||
return convertToLegacyRecursion(((Cast) expression).child());
|
||||
} else {
|
||||
throw new AnalysisException("unsupported argument " + expression.toString());
|
||||
}
|
||||
}
|
||||
|
||||
public MTMVPartitionType getPartitionType() {
|
||||
|
||||
Reference in New Issue
Block a user