[feature](mtmv) pick some mtmv pr from master (#37651)

cherry-pick from master
pr: #36318
commitId: c1999479

pr: #36111
commitId: 35ebef62

pr: #36175
commitId: 4c8e66b4

pr: #36414
commitId: 5e009b5a

pr: #36770
commitId: 19e2126c

pr: #36567
commitId: 3da83514
This commit is contained in:
seawinde
2024-07-12 10:35:54 +08:00
committed by GitHub
parent 6214d6421f
commit ffa9e49bc7
34 changed files with 1892 additions and 234 deletions

View File

@ -169,6 +169,9 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_ENABLE_DUPLICATE_WITHOUT_KEYS_BY_DEFAULT =
"enable_duplicate_without_keys_by_default";
public static final String PROPERTIES_GRACE_PERIOD = "grace_period";
public static final String PROPERTIES_ENABLE_NONDETERMINISTIC_FUNCTION =
"enable_nondeterministic_function";
public static final String PROPERTIES_EXCLUDED_TRIGGER_TABLES = "excluded_trigger_tables";
public static final String PROPERTIES_REFRESH_PARTITION_NUM = "refresh_partition_num";
public static final String PROPERTIES_WORKLOAD_GROUP = "workload_group";

View File

@ -30,14 +30,15 @@ import java.util.Optional;
import java.util.Set;
public class MTMVPropertyUtil {
public static final Set<String> mvPropertyKeys = Sets.newHashSet(
public static final Set<String> MV_PROPERTY_KEYS = Sets.newHashSet(
PropertyAnalyzer.PROPERTIES_GRACE_PERIOD,
PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES,
PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM,
PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP,
PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT,
PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT,
PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT
PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT,
PropertyAnalyzer.PROPERTIES_ENABLE_NONDETERMINISTIC_FUNCTION
);
public static void analyzeProperty(String key, String value) {
@ -63,6 +64,8 @@ public class MTMVPropertyUtil {
case PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT:
analyzePartitionSyncLimit(value);
break;
case PropertyAnalyzer.PROPERTIES_ENABLE_NONDETERMINISTIC_FUNCTION:
break;
default:
throw new AnalysisException("illegal key:" + key);

View File

@ -19,6 +19,7 @@ package org.apache.doris.nereids.rules.exploration.mv;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.jobs.executor.Rewriter;
import org.apache.doris.nereids.rules.analysis.NormalizeRepeat;
import org.apache.doris.nereids.rules.exploration.mv.AbstractMaterializedViewAggregateRule.AggregateExpressionRewriteContext.ExpressionRewriteMode;
import org.apache.doris.nereids.rules.exploration.mv.StructInfo.PlanCheckContext;
@ -26,9 +27,11 @@ import org.apache.doris.nereids.rules.exploration.mv.StructInfo.PlanSplitContext
import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping;
import org.apache.doris.nereids.rules.exploration.mv.rollup.AggFunctionRollUpHandler;
import org.apache.doris.nereids.rules.exploration.mv.rollup.BothCombinatorRollupHandler;
import org.apache.doris.nereids.rules.exploration.mv.rollup.ContainDistinctFunctionRollupHandler;
import org.apache.doris.nereids.rules.exploration.mv.rollup.DirectRollupHandler;
import org.apache.doris.nereids.rules.exploration.mv.rollup.MappingRollupHandler;
import org.apache.doris.nereids.rules.exploration.mv.rollup.SingleCombinatorRollupHandler;
import org.apache.doris.nereids.rules.rewrite.EliminateGroupByKey;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
@ -53,6 +56,7 @@ import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -71,7 +75,8 @@ public abstract class AbstractMaterializedViewAggregateRule extends AbstractMate
ImmutableList.of(DirectRollupHandler.INSTANCE,
MappingRollupHandler.INSTANCE,
SingleCombinatorRollupHandler.INSTANCE,
BothCombinatorRollupHandler.INSTANCE);
BothCombinatorRollupHandler.INSTANCE,
ContainDistinctFunctionRollupHandler.INSTANCE);
protected static final AggregateExpressionRewriter AGGREGATE_EXPRESSION_REWRITER =
new AggregateExpressionRewriter();
@ -82,7 +87,8 @@ public abstract class AbstractMaterializedViewAggregateRule extends AbstractMate
StructInfo viewStructInfo,
SlotMapping viewToQuerySlotMapping,
Plan tempRewritedPlan,
MaterializationContext materializationContext) {
MaterializationContext materializationContext,
CascadesContext cascadesContext) {
// get view and query aggregate and top plan correspondingly
Pair<Plan, LogicalAggregate<Plan>> viewTopPlanAndAggPair = splitToTopPlanAndAggregate(viewStructInfo);
if (viewTopPlanAndAggPair == null) {
@ -107,26 +113,31 @@ public abstract class AbstractMaterializedViewAggregateRule extends AbstractMate
boolean queryContainsGroupSets = queryAggregate.getSourceRepeat().isPresent();
// If group by expression between query and view is equals, try to rewrite expression directly
if (!queryContainsGroupSets && isGroupByEquals(queryTopPlanAndAggPair, viewTopPlanAndAggPair,
viewToQuerySlotMapping, queryStructInfo, viewStructInfo, materializationContext)) {
viewToQuerySlotMapping, queryStructInfo, viewStructInfo, materializationContext,
cascadesContext)) {
List<Expression> rewrittenQueryExpressions = rewriteExpression(queryTopPlan.getOutput(),
queryTopPlan,
materializationContext.getShuttledExprToScanExprMapping(),
viewToQuerySlotMapping,
true,
queryStructInfo.getTableBitSet());
boolean isRewrittenQueryExpressionValid = true;
if (!rewrittenQueryExpressions.isEmpty()) {
List<NamedExpression> projects = new ArrayList<>();
for (Expression expression : rewrittenQueryExpressions) {
if (expression.containsType(AggregateFunction.class)) {
// record the reason and then try to roll up aggregate function
materializationContext.recordFailReason(queryStructInfo,
"rewritten expression contains aggregate functions when group equals aggregate rewrite",
() -> String.format("aggregate functions = %s\n", rewrittenQueryExpressions));
return null;
isRewrittenQueryExpressionValid = false;
}
projects.add(expression instanceof NamedExpression
? (NamedExpression) expression : new Alias(expression));
}
return new LogicalProject<>(projects, tempRewritedPlan);
if (isRewrittenQueryExpressionValid) {
return new LogicalProject<>(projects, tempRewritedPlan);
}
}
// if fails, record the reason and then try to roll up aggregate function
materializationContext.recordFailReason(queryStructInfo,
@ -314,7 +325,8 @@ public abstract class AbstractMaterializedViewAggregateRule extends AbstractMate
SlotMapping viewToQuerySlotMapping,
StructInfo queryStructInfo,
StructInfo viewStructInfo,
MaterializationContext materializationContext) {
MaterializationContext materializationContext,
CascadesContext cascadesContext) {
Plan queryTopPlan = queryTopPlanAndAggPair.key();
Plan viewTopPlan = viewTopPlanAndAggPair.key();
LogicalAggregate<Plan> queryAggregate = queryTopPlanAndAggPair.value();
@ -325,11 +337,64 @@ public abstract class AbstractMaterializedViewAggregateRule extends AbstractMate
queryAggregate.getGroupByExpressions(), queryTopPlan, queryStructInfo.getTableBitSet())) {
queryGroupShuttledExpression.add(queryExpression);
}
// try to eliminate group by dimension by function dependency if group by expression is not in query
Map<Expression, Expression> viewShuttledExpressionQueryBasedToGroupByExpressionMap = new HashMap<>();
Map<Expression, Expression> groupByExpressionToViewShuttledExpressionQueryBasedMap = new HashMap<>();
List<Expression> viewGroupByExpressions = viewAggregate.getGroupByExpressions();
List<? extends Expression> viewGroupByShuttledExpressions = ExpressionUtils.shuttleExpressionWithLineage(
viewGroupByExpressions, viewTopPlan, viewStructInfo.getTableBitSet());
for (int index = 0; index < viewGroupByExpressions.size(); index++) {
Expression viewExpression = viewGroupByExpressions.get(index);
Expression viewGroupExpressionQueryBased = ExpressionUtils.replace(
viewGroupByShuttledExpressions.get(index),
viewToQuerySlotMapping.toSlotReferenceMap());
viewShuttledExpressionQueryBasedToGroupByExpressionMap.put(viewGroupExpressionQueryBased,
viewExpression);
groupByExpressionToViewShuttledExpressionQueryBasedMap.put(viewExpression,
viewGroupExpressionQueryBased
);
}
if (queryGroupShuttledExpression.equals(viewShuttledExpressionQueryBasedToGroupByExpressionMap.values())) {
// return true, if equals directly
return true;
}
List<NamedExpression> projects = new ArrayList<>();
for (Expression expression : queryGroupShuttledExpression) {
if (!viewShuttledExpressionQueryBasedToGroupByExpressionMap.containsKey(expression)) {
// query group expression is not in view group by expression
return false;
}
Expression chosenExpression = viewShuttledExpressionQueryBasedToGroupByExpressionMap.get(expression);
projects.add(chosenExpression instanceof NamedExpression
? (NamedExpression) chosenExpression : new Alias(chosenExpression));
}
LogicalProject<LogicalAggregate<Plan>> project = new LogicalProject<>(projects, viewAggregate);
// try to eliminate group by expression which is not in query group by expression
Plan rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
childContext -> {
Rewriter.getCteChildrenRewriter(childContext,
ImmutableList.of(Rewriter.topDown(new EliminateGroupByKey()))).execute();
return childContext.getRewritePlan();
}, project, project);
Optional<LogicalAggregate<Plan>> aggreagateOptional =
rewrittenPlan.collectFirst(LogicalAggregate.class::isInstance);
if (!aggreagateOptional.isPresent()) {
return false;
}
List<Expression> viewEliminatedGroupByExpressions = aggreagateOptional.get().getGroupByExpressions();
if (viewEliminatedGroupByExpressions.size() != queryGroupShuttledExpression.size()) {
return false;
}
Set<Expression> viewGroupShuttledExpressionQueryBased = new HashSet<>();
for (Expression viewExpression : ExpressionUtils.shuttleExpressionWithLineage(
viewAggregate.getGroupByExpressions(), viewTopPlan, viewStructInfo.getTableBitSet())) {
for (Expression viewExpression : aggreagateOptional.get().getGroupByExpressions()) {
if (!groupByExpressionToViewShuttledExpressionQueryBasedMap.containsKey(viewExpression)) {
return false;
}
viewGroupShuttledExpressionQueryBased.add(
ExpressionUtils.replace(viewExpression, viewToQuerySlotMapping.toSlotReferenceMap()));
groupByExpressionToViewShuttledExpressionQueryBasedMap.get(viewExpression));
}
return queryGroupShuttledExpression.equals(viewGroupShuttledExpressionQueryBased);
}
@ -356,11 +421,12 @@ public abstract class AbstractMaterializedViewAggregateRule extends AbstractMate
expressionEntry.getValue());
for (AggFunctionRollUpHandler rollUpHandler : ROLL_UP_HANDLERS) {
if (!rollUpHandler.canRollup(queryAggregateFunction, queryAggregateFunctionShuttled,
mvExprToMvScanExprQueryBasedPair)) {
mvExprToMvScanExprQueryBasedPair, mvExprToMvScanExprQueryBased)) {
continue;
}
Function rollupFunction = rollUpHandler.doRollup(queryAggregateFunction,
queryAggregateFunctionShuttled, mvExprToMvScanExprQueryBasedPair);
queryAggregateFunctionShuttled, mvExprToMvScanExprQueryBasedPair,
mvExprToMvScanExprQueryBased);
if (rollupFunction != null) {
return rollupFunction;
}
@ -544,7 +610,7 @@ public abstract class AbstractMaterializedViewAggregateRule extends AbstractMate
/**
* AggregateExpressionRewriteContext
*/
protected static class AggregateExpressionRewriteContext {
public static class AggregateExpressionRewriteContext {
private boolean valid = true;
private final ExpressionRewriteMode expressionRewriteMode;
private final Map<Expression, Expression> mvExprToMvScanExprQueryBasedMapping;
@ -587,7 +653,7 @@ public abstract class AbstractMaterializedViewAggregateRule extends AbstractMate
/**
* The expression rewrite mode, which decide how the expression in query is rewritten by mv
*/
protected enum ExpressionRewriteMode {
public enum ExpressionRewriteMode {
/**
* Try to use the expression in mv directly, and doesn't handle aggregate function
*/

View File

@ -41,7 +41,8 @@ public abstract class AbstractMaterializedViewJoinRule extends AbstractMateriali
StructInfo viewStructInfo,
SlotMapping targetToSourceMapping,
Plan tempRewritedPlan,
MaterializationContext materializationContext) {
MaterializationContext materializationContext,
CascadesContext cascadesContext) {
// Rewrite top projects, represent the query projects by view
List<Expression> expressionsRewritten = rewriteExpression(
queryStructInfo.getExpressions(),

View File

@ -200,7 +200,9 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
}
if (queryToViewSlotMapping == null) {
materializationContext.recordFailReason(queryStructInfo,
"Query to view slot mapping is null", () -> "");
"Query to view slot mapping is null", () ->
String.format("queryToViewTableMapping relation mapping is %s",
queryToViewTableMapping));
continue;
}
SlotMapping viewToQuerySlotMapping = queryToViewSlotMapping.inverse();
@ -250,7 +252,7 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
}
// Rewrite query by view
rewrittenPlan = rewriteQueryByView(matchMode, queryStructInfo, viewStructInfo, viewToQuerySlotMapping,
rewrittenPlan, materializationContext);
rewrittenPlan, materializationContext, cascadesContext);
rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
childContext -> {
Rewriter.getWholeTreeRewriter(childContext).execute();
@ -274,6 +276,11 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
}
if (invalidPartitions == null) {
// if mv can not offer any partition for query, query rewrite bail out to avoid cycle run
materializationContext.recordFailReason(queryStructInfo,
"mv can not offer any partition for query",
() -> String.format("mv partition info %s",
((AsyncMaterializationContext) materializationContext).getMtmv()
.getMvPartitionInfo()));
return rewriteResults;
}
boolean partitionNeedUnion = needUnionRewrite(invalidPartitions, cascadesContext);
@ -281,16 +288,26 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
invalidPartitions;
if (partitionNeedUnion) {
MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv();
Plan originPlanWithFilter = StructInfo.addFilterOnTableScan(queryPlan, invalidPartitions.value(),
mtmv.getMvPartitionInfo().getPartitionCol(), cascadesContext);
if (finalInvalidPartitions.value().isEmpty() || originPlanWithFilter == null) {
Pair<Plan, Boolean> planAndNeedAddFilterPair =
StructInfo.addFilterOnTableScan(queryPlan, invalidPartitions.value(),
mtmv.getMvPartitionInfo().getRelatedCol(), cascadesContext);
if (planAndNeedAddFilterPair == null) {
materializationContext.recordFailReason(queryStructInfo,
"Add filter to base table fail when union rewrite",
() -> String.format("invalidPartitions are %s, queryPlan is %s, partition column is %s",
invalidPartitions, queryPlan.treeString(),
mtmv.getMvPartitionInfo().getPartitionCol()));
continue;
}
if (finalInvalidPartitions.value().isEmpty() || !planAndNeedAddFilterPair.value()) {
// if invalid base table filter is empty or doesn't need to add filter on base table,
// only need remove mv invalid partition
rewrittenPlan = rewrittenPlan.accept(new PartitionRemover(), invalidPartitions.key());
} else {
// For rewrittenPlan which contains materialized view should remove invalid partition ids
List<Plan> children = Lists.newArrayList(
rewrittenPlan.accept(new PartitionRemover(), invalidPartitions.key()),
originPlanWithFilter);
planAndNeedAddFilterPair.key());
// Union query materialized view and source table
rewrittenPlan = new LogicalUnion(Qualifier.ALL,
queryPlan.getOutput().stream().map(NamedExpression.class::cast)
@ -452,6 +469,7 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
// If related base table create partitions or mv is created with ttl, need base table union
Sets.difference(queryUsedBaseTablePartitionNameSet, mvValidBaseTablePartitionNameSet)
.copyInto(baseTableNeedUnionPartitionNameSet);
// Construct result map
Map<BaseTableInfo, Set<String>> mvPartitionNeedRemoveNameMap = new HashMap<>();
if (!mvNeedRemovePartitionNameSet.isEmpty()) {
mvPartitionNeedRemoveNameMap.put(new BaseTableInfo(mtmv), mvNeedRemovePartitionNameSet);
@ -467,7 +485,8 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
* Rewrite query by view, for aggregate or join rewriting should be different inherit class implementation
*/
protected Plan rewriteQueryByView(MatchMode matchMode, StructInfo queryStructInfo, StructInfo viewStructInfo,
SlotMapping viewToQuerySlotMapping, Plan tempRewritedPlan, MaterializationContext materializationContext) {
SlotMapping viewToQuerySlotMapping, Plan tempRewritedPlan, MaterializationContext materializationContext,
CascadesContext cascadesContext) {
return tempRewritedPlan;
}

View File

@ -105,7 +105,8 @@ public class MaterializedViewAggregateOnNoneAggregateRule extends AbstractMateri
@Override
protected Plan rewriteQueryByView(MatchMode matchMode, StructInfo queryStructInfo, StructInfo viewStructInfo,
SlotMapping viewToQuerySlotMapping, Plan tempRewritedPlan, MaterializationContext materializationContext) {
SlotMapping viewToQuerySlotMapping, Plan tempRewritedPlan, MaterializationContext materializationContext,
CascadesContext cascadesContext) {
// check the expression used in group by and group out expression in query
Pair<Plan, LogicalAggregate<Plan>> queryTopPlanAndAggPair = splitToTopPlanAndAggregate(queryStructInfo);
if (queryTopPlanAndAggPair == null) {

View File

@ -42,7 +42,8 @@ public abstract class MaterializedViewScanRule extends AbstractMaterializedViewR
StructInfo viewStructInfo,
SlotMapping targetToSourceMapping,
Plan tempRewritedPlan,
MaterializationContext materializationContext) {
MaterializationContext materializationContext,
CascadesContext cascadesContext) {
// Rewrite top projects, represent the query projects by view
List<Expression> expressionsRewritten = rewriteExpression(
queryStructInfo.getExpressions(),

View File

@ -54,6 +54,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
import org.apache.doris.nereids.trees.plans.visitor.NondeterministicFunctionCollector;
import org.apache.doris.nereids.util.ExpressionUtils;
import com.google.common.collect.HashMultimap;
@ -62,7 +63,9 @@ import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashSet;
@ -257,6 +260,17 @@ public class MaterializedViewUtils {
rewrittenPlan);
}
/**
* Extract nondeterministic function form plan, if the function is in whiteExpressionSet,
* the function would be considered as deterministic function and will not return
* in the result expression result
*/
public static List<Expression> extractNondeterministicFunction(Plan plan) {
List<Expression> nondeterministicFunctions = new ArrayList<>();
plan.accept(NondeterministicFunctionCollector.INSTANCE, nondeterministicFunctions);
return nondeterministicFunctions;
}
private static final class TableQueryOperatorChecker extends DefaultPlanVisitor<Boolean, Void> {
public static final TableQueryOperatorChecker INSTANCE = new TableQueryOperatorChecker();
@ -301,58 +315,12 @@ public class MaterializedViewUtils {
@Override
public Void visitLogicalProject(LogicalProject<? extends Plan> project, IncrementCheckerContext context) {
NamedExpression mvPartitionColumn = context.getMvPartitionColumn();
List<Slot> output = project.getOutput();
if (context.getMvPartitionColumn().isColumnFromTable()) {
return visit(project, context);
boolean isValid = checkPartition(output, project, context);
if (!isValid) {
return null;
}
for (Slot projectSlot : output) {
if (!projectSlot.equals(mvPartitionColumn.toSlot())) {
continue;
}
if (projectSlot.isColumnFromTable()) {
context.setMvPartitionColumn(projectSlot);
} else {
// should be only use date_trunc
Expression shuttledExpression =
ExpressionUtils.shuttleExpressionWithLineage(projectSlot, project, new BitSet());
// merge date_trunc
shuttledExpression = new ExpressionNormalization().rewrite(shuttledExpression,
new ExpressionRewriteContext(context.getCascadesContext()));
List<Expression> expressions = shuttledExpression.collectToList(Expression.class::isInstance);
for (Expression expression : expressions) {
if (SUPPORT_EXPRESSION_TYPES.stream().noneMatch(
supportExpression -> supportExpression.isAssignableFrom(expression.getClass()))) {
context.addFailReason(
String.format("partition column use invalid implicit expression, invalid "
+ "expression is %s", expression));
return null;
}
}
List<DateTrunc> dataTruncExpressions =
shuttledExpression.collectToList(DateTrunc.class::isInstance);
if (dataTruncExpressions.size() != 1) {
// mv time unit level is little then query
context.addFailReason("partition column time unit level should be "
+ "greater than sql select column");
return null;
}
Optional<Slot> columnExpr =
shuttledExpression.getArgument(0).collectFirst(Slot.class::isInstance);
if (!columnExpr.isPresent() || !columnExpr.get().isColumnFromTable()) {
context.addFailReason(String.format("partition reference column should be direct column "
+ "rather then expression except date_trunc, columnExpr is %s", columnExpr));
return null;
}
context.setPartitionExpression(shuttledExpression);
context.setMvPartitionColumn(columnExpr.get());
}
return visit(project, context);
}
context.addFailReason(String.format("partition reference column should be direct column "
+ "rather then expression except date_trunc, current project is %s", project));
return null;
return visit(project, context);
}
@Override
@ -454,18 +422,8 @@ public class MaterializedViewUtils {
context.addFailReason("group by sets is empty, doesn't contain the target partition");
return null;
}
Set<Column> originalGroupbyExprSet = new HashSet<>();
groupByExprSet.forEach(groupExpr -> {
if (groupExpr instanceof SlotReference && groupExpr.isColumnFromTable()) {
originalGroupbyExprSet.add(((SlotReference) groupExpr).getColumn().get());
}
});
SlotReference contextPartitionColumn = getContextPartitionColumn(context);
if (contextPartitionColumn == null) {
return null;
}
if (!originalGroupbyExprSet.contains(contextPartitionColumn.getColumn().get())) {
context.addFailReason("group by sets doesn't contain the target partition");
boolean isValid = checkPartition(groupByExprSet, aggregate, context);
if (!isValid) {
return null;
}
return visit(aggregate, context);
@ -497,6 +455,8 @@ public class MaterializedViewUtils {
|| plan instanceof LogicalWindow) {
return super.visit(plan, context);
}
context.addFailReason(String.format("Unsupported plan operate in track partition, "
+ "the invalid plan node is %s", plan.getClass().getSimpleName()));
return null;
}
@ -532,6 +492,99 @@ public class MaterializedViewUtils {
}
return (SlotReference) context.getMvPartitionColumn();
}
/**
* Given a partition named expression and expressionsToCheck, check the partition is valid
* example 1:
* partition expression is date_trunc(date_alias#25, 'hour') AS `date_trunc(date_alias, 'hour')`#30
* expressionsToCheck is date_trunc(date_alias, 'hour')#30
* expressionsToCheck is the slot to partition expression, but they are expression
* example 2:
* partition expression is L_SHIPDATE#10
* expressionsToCheck isL_SHIPDATE#10
* both of them are slot
* example 3:
* partition expression is date_trunc(L_SHIPDATE#10, 'hour')#30
* expressionsToCheck is L_SHIPDATE#10
* all above should check successfully
* */
private static boolean checkPartition(Collection<? extends Expression> expressionsToCheck, Plan plan,
IncrementCheckerContext context) {
NamedExpression partitionColumn = context.getMvPartitionColumn();
for (Expression projectSlot : expressionsToCheck) {
if (projectSlot.isColumnFromTable() && projectSlot.equals(partitionColumn.toSlot())) {
continue;
}
// check the expression which use partition column
Expression expressionToCheck =
ExpressionUtils.shuttleExpressionWithLineage(projectSlot, plan, new BitSet());
// merge date_trunc
expressionToCheck = new ExpressionNormalization().rewrite(expressionToCheck,
new ExpressionRewriteContext(context.getCascadesContext()));
Expression partitionExpression = context.getPartitionExpression().isPresent()
? context.getPartitionExpression().get() :
ExpressionUtils.shuttleExpressionWithLineage(partitionColumn, plan, new BitSet());
// merge date_trunc
partitionExpression = new ExpressionNormalization().rewrite(partitionExpression,
new ExpressionRewriteContext(context.getCascadesContext()));
Set<SlotReference> expressionToCheckColumns =
expressionToCheck.collectToSet(SlotReference.class::isInstance);
Set<SlotReference> partitionColumns =
partitionExpression.collectToSet(SlotReference.class::isInstance);
if (Sets.intersection(expressionToCheckColumns, partitionColumns).isEmpty()
|| expressionToCheckColumns.isEmpty() || partitionColumns.isEmpty()) {
// this expression doesn't use partition column
continue;
}
if (expressionToCheckColumns.size() > 1 || partitionColumns.size() > 1) {
context.addFailReason(
String.format("partition expression use more than one slot reference, invalid "
+ "expressionToCheckColumns is %s, partitionColumnDateColumns is %s",
expressionToCheckColumns, partitionColumns));
return false;
}
List<Expression> expressions = expressionToCheck.collectToList(Expression.class::isInstance);
for (Expression expression : expressions) {
if (SUPPORT_EXPRESSION_TYPES.stream().noneMatch(
supportExpression -> supportExpression.isAssignableFrom(expression.getClass()))) {
context.addFailReason(
String.format("column to check use invalid implicit expression, invalid "
+ "expression is %s", expression));
return false;
}
}
List<Expression> partitionExpressions = partitionExpression.collectToList(
Expression.class::isInstance);
for (Expression expression : partitionExpressions) {
if (SUPPORT_EXPRESSION_TYPES.stream().noneMatch(
supportExpression -> supportExpression.isAssignableFrom(expression.getClass()))) {
context.addFailReason(
String.format("partition column use invalid implicit expression, invalid "
+ "expression is %s", expression));
return false;
}
}
List<DateTrunc> expressionToCheckDataTruncList =
expressionToCheck.collectToList(DateTrunc.class::isInstance);
List<DateTrunc> partitionColumnDateTrucList =
partitionExpression.collectToList(DateTrunc.class::isInstance);
if (expressionToCheckDataTruncList.size() > 1 || partitionColumnDateTrucList.size() > 1) {
// mv time unit level is little then query
context.addFailReason("partition column time unit level should be "
+ "greater than sql select column");
return false;
}
if (!partitionColumn.isColumnFromTable()) {
context.setMvPartitionColumn(partitionColumns.iterator().next());
}
if (!context.getPartitionExpression().isPresent()) {
context.setPartitionExpression(partitionExpression);
}
}
return true;
}
}
private static final class IncrementCheckerContext {

View File

@ -728,26 +728,32 @@ public class StructInfo {
/**
* Add filter on table scan according to table filter map
*
* @return Pair(Plan, Boolean) first is the added filter plan, value is the identifier that represent whether
* need to add filter.
* return null if add filter fail.
*/
public static Plan addFilterOnTableScan(Plan queryPlan, Map<BaseTableInfo, Set<String>> partitionOnOriginPlan,
String partitionColumn,
CascadesContext parentCascadesContext) {
public static Pair<Plan, Boolean> addFilterOnTableScan(Plan queryPlan, Map<BaseTableInfo,
Set<String>> partitionOnOriginPlan, String partitionColumn, CascadesContext parentCascadesContext) {
// Firstly, construct filter form invalid partition, this filter should be added on origin plan
PredicateAddContext predicateAddContext = new PredicateAddContext(partitionOnOriginPlan, partitionColumn);
Plan queryPlanWithUnionFilter = queryPlan.accept(new PredicateAdder(),
predicateAddContext);
if (!predicateAddContext.isAddSuccess()) {
if (!predicateAddContext.isHandleSuccess()) {
return null;
}
if (!predicateAddContext.isNeedAddFilter()) {
return Pair.of(queryPlan, false);
}
// Deep copy the plan to avoid the plan output is the same with the later union output, this may cause
// exec by mistake
queryPlanWithUnionFilter = new LogicalPlanDeepCopier().deepCopy(
(LogicalPlan) queryPlanWithUnionFilter, new DeepCopierContext());
// rbo rewrite after adding filter on origin plan
return MaterializedViewUtils.rewriteByRules(parentCascadesContext, context -> {
return Pair.of(MaterializedViewUtils.rewriteByRules(parentCascadesContext, context -> {
Rewriter.getWholeTreeRewriter(context).execute();
return context.getRewritePlan();
}, queryPlanWithUnionFilter, queryPlan);
}, queryPlanWithUnionFilter, queryPlan), true);
}
/**

View File

@ -146,6 +146,11 @@ public class RelationMapping extends Mapping {
return new TableIdentifier(tableIf);
}
@Override
public String toString() {
return "RelationMapping { mappedRelationMap=" + mappedRelationMap + '}';
}
@Override
public boolean equals(Object o) {
if (this == o) {

View File

@ -27,6 +27,7 @@ import org.apache.doris.nereids.trees.expressions.functions.agg.RollUpTrait;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
@ -39,7 +40,8 @@ public abstract class AggFunctionRollUpHandler {
*/
public boolean canRollup(AggregateFunction queryAggregateFunction,
Expression queryAggregateFunctionShuttled,
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair) {
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair,
Map<Expression, Expression> mvExprToMvScanExprQueryBasedMap) {
Expression viewExpression = mvExprToMvScanExprQueryBasedPair.key();
if (!(viewExpression instanceof RollUpTrait) || !((RollUpTrait) viewExpression).canRollUp()) {
return false;
@ -54,7 +56,8 @@ public abstract class AggFunctionRollUpHandler {
public abstract Function doRollup(
AggregateFunction queryAggregateFunction,
Expression queryAggregateFunctionShuttled,
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair);
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair,
Map<Expression, Expression> mvExprToMvScanExprQueryBasedMap);
/**
* Extract the function arguments by functionWithAny pattern

View File

@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunctio
import org.apache.doris.nereids.trees.expressions.functions.agg.RollUpTrait;
import org.apache.doris.nereids.trees.expressions.functions.combinator.Combinator;
import java.util.Map;
import java.util.Objects;
/**
@ -38,10 +39,11 @@ public class BothCombinatorRollupHandler extends AggFunctionRollUpHandler {
@Override
public boolean canRollup(AggregateFunction queryAggregateFunction,
Expression queryAggregateFunctionShuttled,
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair) {
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair,
Map<Expression, Expression> mvExprToMvScanExprQueryBasedMap) {
Expression viewFunction = mvExprToMvScanExprQueryBasedPair.key();
if (!super.canRollup(queryAggregateFunction, queryAggregateFunctionShuttled,
mvExprToMvScanExprQueryBasedPair)) {
mvExprToMvScanExprQueryBasedPair, mvExprToMvScanExprQueryBasedMap)) {
return false;
}
if (queryAggregateFunction instanceof Combinator && viewFunction instanceof Combinator) {
@ -57,7 +59,8 @@ public class BothCombinatorRollupHandler extends AggFunctionRollUpHandler {
@Override
public Function doRollup(AggregateFunction queryAggregateFunction,
Expression queryAggregateFunctionShuttled,
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair) {
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair,
Map<Expression, Expression> mvExprToMvScanExprQueryBasedMap) {
Expression rollupParam = mvExprToMvScanExprQueryBasedPair.value();
return ((RollUpTrait) queryAggregateFunction).constructRollUp(rollupParam);
}

View File

@ -0,0 +1,133 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.exploration.mv.rollup;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.trees.expressions.Any;
import org.apache.doris.nereids.trees.expressions.BinaryArithmetic;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.functions.Function;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.expressions.functions.agg.Avg;
import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
import org.apache.doris.nereids.trees.expressions.functions.agg.Max;
import org.apache.doris.nereids.trees.expressions.functions.agg.Min;
import org.apache.doris.nereids.trees.expressions.functions.agg.Sum;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
import com.google.common.collect.ImmutableSet;
import java.util.Map;
import java.util.Set;
/**
* Try to roll up function which contains distinct, if the param in function is in
* materialized view group by dimension.
* For example
* materialized view def is select empid, deptno, count(salary) from distinctQuery group by empid, deptno;
* query is select deptno, count(distinct empid) from distinctQuery group by deptno;
* should rewrite successfully, count(distinct empid) should use the group by empid dimension in query.
*/
public class ContainDistinctFunctionRollupHandler extends AggFunctionRollUpHandler {
public static final ContainDistinctFunctionRollupHandler INSTANCE = new ContainDistinctFunctionRollupHandler();
public static Set<AggregateFunction> SUPPORTED_AGGREGATE_FUNCTION_SET = ImmutableSet.of(
new Max(true, Any.INSTANCE), new Min(true, Any.INSTANCE),
new Max(false, Any.INSTANCE), new Min(false, Any.INSTANCE),
new Count(true, Any.INSTANCE), new Sum(true, Any.INSTANCE),
new Avg(true, Any.INSTANCE));
@Override
public boolean canRollup(AggregateFunction queryAggregateFunction,
Expression queryAggregateFunctionShuttled,
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair,
Map<Expression, Expression> mvExprToMvScanExprQueryBased) {
Set<AggregateFunction> queryAggregateFunctions =
queryAggregateFunctionShuttled.collectToSet(AggregateFunction.class::isInstance);
if (queryAggregateFunctions.size() > 1) {
return false;
}
for (AggregateFunction aggregateFunction : queryAggregateFunctions) {
if (SUPPORTED_AGGREGATE_FUNCTION_SET.stream()
.noneMatch(supportFunction -> Any.equals(supportFunction, aggregateFunction))) {
return false;
}
if (aggregateFunction.getArguments().size() > 1) {
return false;
}
}
Set<Expression> mvExpressionsQueryBased = mvExprToMvScanExprQueryBased.keySet();
Set<Slot> aggregateFunctionParamSlots = queryAggregateFunctionShuttled.collectToSet(Slot.class::isInstance);
if (aggregateFunctionParamSlots.stream().anyMatch(slot -> !mvExpressionsQueryBased.contains(slot))) {
return false;
}
return true;
}
@Override
public Function doRollup(AggregateFunction queryAggregateFunction,
Expression queryAggregateFunctionShuttled, Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair,
Map<Expression, Expression> mvExprToMvScanExprQueryBasedMap) {
Expression argument = queryAggregateFunction.children().get(0);
RollupResult<Boolean> rollupResult = RollupResult.of(true);
Expression rewrittenArgument = argument.accept(new DefaultExpressionRewriter<RollupResult<Boolean>>() {
@Override
public Expression visitSlot(Slot slot, RollupResult<Boolean> context) {
if (!mvExprToMvScanExprQueryBasedMap.containsKey(slot)) {
context.param = false;
return slot;
}
return mvExprToMvScanExprQueryBasedMap.get(slot);
}
@Override
public Expression visit(Expression expr, RollupResult<Boolean> context) {
if (!context.param) {
return expr;
}
if (expr instanceof Literal || expr instanceof BinaryArithmetic || expr instanceof Slot) {
return super.visit(expr, context);
}
context.param = false;
return expr;
}
}, rollupResult);
if (!rollupResult.param) {
return null;
}
return (Function) queryAggregateFunction.withChildren(rewrittenArgument);
}
private static class RollupResult<T> {
public T param;
private RollupResult(T param) {
this.param = param;
}
public static <T> RollupResult<T> of(T param) {
return new RollupResult<>(param);
}
public T getParam() {
return param;
}
}
}

View File

@ -24,6 +24,8 @@ import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunctio
import org.apache.doris.nereids.trees.expressions.functions.agg.RollUpTrait;
import org.apache.doris.nereids.trees.expressions.functions.combinator.Combinator;
import java.util.Map;
/**
* Roll up directly, for example,
* query is select c1, sum(c2) from t1 group by c1
@ -38,10 +40,11 @@ public class DirectRollupHandler extends AggFunctionRollUpHandler {
public boolean canRollup(
AggregateFunction queryAggregateFunction,
Expression queryAggregateFunctionShuttled,
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair) {
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair,
Map<Expression, Expression> mvExprToMvScanExprQueryBasedMap) {
Expression viewExpression = mvExprToMvScanExprQueryBasedPair.key();
if (!super.canRollup(queryAggregateFunction, queryAggregateFunctionShuttled,
mvExprToMvScanExprQueryBasedPair)) {
mvExprToMvScanExprQueryBasedPair, mvExprToMvScanExprQueryBasedMap)) {
return false;
}
return queryAggregateFunctionShuttled.equals(viewExpression)
@ -53,7 +56,8 @@ public class DirectRollupHandler extends AggFunctionRollUpHandler {
@Override
public Function doRollup(AggregateFunction queryAggregateFunction,
Expression queryAggregateFunctionShuttled,
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair) {
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair,
Map<Expression, Expression> mvExprToMvScanExprQueryBasedMap) {
Expression rollupParam = mvExprToMvScanExprQueryBasedPair.value();
if (rollupParam == null) {
return null;

View File

@ -137,12 +137,13 @@ public class MappingRollupHandler extends AggFunctionRollUpHandler {
@Override
public boolean canRollup(AggregateFunction queryAggregateFunction,
Expression queryAggregateFunctionShuttled,
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair) {
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair,
Map<Expression, Expression> mvExprToMvScanExprQueryBasedMap) {
// handle complex functions roll up by mapping and combinator expression
// eg: query is count(distinct param), mv sql is bitmap_union(to_bitmap(param))
Expression viewExpression = mvExprToMvScanExprQueryBasedPair.key();
if (!super.canRollup(queryAggregateFunction, queryAggregateFunctionShuttled,
mvExprToMvScanExprQueryBasedPair)) {
mvExprToMvScanExprQueryBasedPair, mvExprToMvScanExprQueryBasedMap)) {
return false;
}
Function viewFunction = (Function) viewExpression;
@ -174,7 +175,8 @@ public class MappingRollupHandler extends AggFunctionRollUpHandler {
@Override
public Function doRollup(AggregateFunction queryAggregateFunction,
Expression queryAggregateFunctionShuttled,
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair) {
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair,
Map<Expression, Expression> mvExprToMvScanExprQueryBasedMap) {
Expression rollupParam = mvExprToMvScanExprQueryBasedPair.value();
return ((RollUpTrait) queryAggregateFunction).constructRollUp(rollupParam);
}

View File

@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.expressions.functions.combinator.Combinato
import org.apache.doris.nereids.trees.expressions.functions.combinator.StateCombinator;
import org.apache.doris.nereids.trees.expressions.functions.combinator.UnionCombinator;
import java.util.Map;
import java.util.Objects;
/**
@ -44,10 +45,11 @@ public class SingleCombinatorRollupHandler extends AggFunctionRollUpHandler {
@Override
public boolean canRollup(AggregateFunction queryAggregateFunction,
Expression queryAggregateFunctionShuttled,
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair) {
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair,
Map<Expression, Expression> mvExprToMvScanExprQueryBasedMap) {
Expression viewFunction = mvExprToMvScanExprQueryBasedPair.key();
if (!super.canRollup(queryAggregateFunction, queryAggregateFunctionShuttled,
mvExprToMvScanExprQueryBasedPair)) {
mvExprToMvScanExprQueryBasedPair, mvExprToMvScanExprQueryBasedMap)) {
return false;
}
if (!(queryAggregateFunction instanceof Combinator)
@ -62,7 +64,8 @@ public class SingleCombinatorRollupHandler extends AggFunctionRollUpHandler {
@Override
public Function doRollup(AggregateFunction queryAggregateFunction,
Expression queryAggregateFunctionShuttled,
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair) {
Pair<Expression, Expression> mvExprToMvScanExprQueryBasedPair,
Map<Expression, Expression> mvExprToMvScanExprQueryBasedMap) {
FunctionRegistry functionRegistry = Env.getCurrentEnv().getFunctionRegistry();
String combinatorName = queryAggregateFunction.getName() + AggCombinerFunctionBuilder.MERGE_SUFFIX;
Expression rollupParam = mvExprToMvScanExprQueryBasedPair.value();

View File

@ -26,6 +26,7 @@ import org.apache.doris.nereids.types.DataType;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Optional;
/**
* ExpressionTrait.
@ -76,4 +77,25 @@ public interface ExpressionTrait extends TreeNode<Expression> {
default boolean foldable() {
return true;
}
/**
* Identify the expression is deterministic or not
*/
default boolean isDeterministic() {
boolean isDeterministic = true;
List<Expression> children = this.children();
if (children.isEmpty()) {
return isDeterministic;
}
for (Expression child : children) {
Optional<ExpressionTrait> nonDeterministic =
child.collectFirst(expressionTreeNode -> expressionTreeNode instanceof ExpressionTrait
&& !((ExpressionTrait) expressionTreeNode).isDeterministic());
if (nonDeterministic.isPresent()) {
isDeterministic = false;
break;
}
}
return isDeterministic;
}
}

View File

@ -19,9 +19,16 @@ package org.apache.doris.nereids.trees.expressions.functions;
/**
* Nondeterministic functions.
*
* <p>
* e.g. 'rand()', 'random()'.
*
*/
public interface Nondeterministic extends ExpressionTrait {
/**
* Identify the function is deterministic or not, such as UnixTimestamp, when it's children is not empty
* it's deterministic
*/
default boolean isDeterministic() {
return false;
}
}

View File

@ -20,7 +20,6 @@ package org.apache.doris.nereids.trees.expressions.functions.scalar;
import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature;
import org.apache.doris.nereids.trees.expressions.functions.Nondeterministic;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.DateTimeType;
@ -40,8 +39,7 @@ import java.util.List;
/**
* ScalarFunction 'unix_timestamp'. This class is generated by GenerateFunction.
*/
public class UnixTimestamp extends ScalarFunction
implements ExplicitlyCastableSignature, Nondeterministic {
public class UnixTimestamp extends ScalarFunction implements ExplicitlyCastableSignature {
// we got changes when computeSignature
private static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
@ -142,4 +140,9 @@ public class UnixTimestamp extends ScalarFunction
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitUnixTimestamp(this, context);
}
@Override
public boolean isDeterministic() {
return !this.children.isEmpty();
}
}

View File

@ -29,6 +29,7 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator;
@ -49,7 +50,6 @@ import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTable
import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
@ -227,7 +227,6 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
unboundRelation.getNameParts());
TableIf table = RelationUtil.getTable(tableQualifier, Env.getCurrentEnv());
if (predicates.getPredicates().containsKey(table)) {
predicates.setAddSuccess(true);
return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.getPredicates().get(table))),
unboundRelation);
}
@ -262,48 +261,55 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
if (predicates.isEmpty()) {
return catalogRelation;
}
TableIf table = catalogRelation.getTable();
if (predicates.getPredicates() != null) {
TableIf table = catalogRelation.getTable();
if (predicates.getPredicates().containsKey(table)) {
predicates.setAddSuccess(true);
return new LogicalFilter<>(
ImmutableSet.of(ExpressionUtils.or(predicates.getPredicates().get(table))),
catalogRelation);
}
}
if (predicates.getPartition() != null && predicates.getPartitionName() != null) {
if (!(catalogRelation instanceof LogicalOlapScan)) {
if (!(table instanceof MTMVRelatedTableIf)) {
return catalogRelation;
}
for (Map.Entry<BaseTableInfo, Set<String>> filterTableEntry : predicates.getPartition().entrySet()) {
LogicalOlapScan olapScan = (LogicalOlapScan) catalogRelation;
OlapTable targetTable = olapScan.getTable();
if (!Objects.equals(new BaseTableInfo(targetTable), filterTableEntry.getKey())) {
if (!Objects.equals(new BaseTableInfo(table), filterTableEntry.getKey())) {
continue;
}
Slot partitionSlot = null;
for (Slot slot : olapScan.getOutput()) {
for (Slot slot : catalogRelation.getOutput()) {
if (((SlotReference) slot).getName().equals(predicates.getPartitionName())) {
partitionSlot = slot;
break;
}
}
if (partitionSlot == null) {
predicates.setHandleSuccess(false);
return catalogRelation;
}
// if partition has no data, doesn't add filter
Set<PartitionItem> partitionHasDataItems = new HashSet<>();
MTMVRelatedTableIf targetTable = (MTMVRelatedTableIf) table;
for (String partitionName : filterTableEntry.getValue()) {
Partition partition = targetTable.getPartition(partitionName);
if (!targetTable.selectNonEmptyPartitionIds(Lists.newArrayList(partition.getId())).isEmpty()) {
// Add filter only when partition has filter
partitionHasDataItems.add(targetTable.getPartitionInfo().getItem(partition.getId()));
if (!(targetTable instanceof OlapTable)) {
// check partition is have data or not, only support olap table
break;
}
if (!((OlapTable) targetTable).selectNonEmptyPartitionIds(
Lists.newArrayList(partition.getId())).isEmpty()) {
// Add filter only when partition has data
partitionHasDataItems.add(
((OlapTable) targetTable).getPartitionInfo().getItem(partition.getId()));
}
}
if (partitionHasDataItems.isEmpty()) {
predicates.setNeedAddFilter(false);
}
if (!partitionHasDataItems.isEmpty()) {
Set<Expression> partitionExpressions =
constructPredicates(partitionHasDataItems, partitionSlot);
predicates.setAddSuccess(true);
return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(partitionExpressions)),
catalogRelation);
}
@ -322,7 +328,9 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
private final Map<TableIf, Set<Expression>> predicates;
private final Map<BaseTableInfo, Set<String>> partition;
private final String partitionName;
private boolean addSuccess = false;
private boolean handleSuccess = true;
// when add filter by partition, if partition has no data, doesn't need to add filter. should be false
private boolean needAddFilter = true;
public PredicateAddContext(Map<TableIf, Set<Expression>> predicates) {
this(predicates, null, null);
@ -356,12 +364,20 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
return predicates == null && partition == null;
}
public boolean isAddSuccess() {
return addSuccess;
public boolean isHandleSuccess() {
return handleSuccess;
}
public void setAddSuccess(boolean addSuccess) {
this.addSuccess = addSuccess;
public void setHandleSuccess(boolean handleSuccess) {
this.handleSuccess = handleSuccess;
}
public boolean isNeedAddFilter() {
return needAddFilter;
}
public void setNeedAddFilter(boolean needAddFilter) {
this.needAddFilter = needAddFilter;
}
}
}

View File

@ -54,7 +54,6 @@ import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils;
import org.apache.doris.nereids.trees.TreeNode;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
@ -63,7 +62,6 @@ import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
import org.apache.doris.nereids.trees.plans.visitor.NondeterministicFunctionCollector;
import org.apache.doris.nereids.types.AggStateType;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.NullType;
@ -80,7 +78,6 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -160,7 +157,7 @@ public class CreateMTMVInfo {
throw new AnalysisException(message);
}
analyzeProperties();
analyzeQuery(ctx);
analyzeQuery(ctx, this.mvProperties);
// analyze column
final boolean finalEnableMergeOnWrite = false;
Set<String> keysSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
@ -191,7 +188,7 @@ public class CreateMTMVInfo {
if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) {
throw new AnalysisException("Not support dynamic partition properties on async materialized view");
}
for (String key : MTMVPropertyUtil.mvPropertyKeys) {
for (String key : MTMVPropertyUtil.MV_PROPERTY_KEYS) {
if (properties.containsKey(key)) {
MTMVPropertyUtil.analyzeProperty(key, properties.get(key));
mvProperties.put(key, properties.get(key));
@ -203,7 +200,7 @@ public class CreateMTMVInfo {
/**
* analyzeQuery
*/
public void analyzeQuery(ConnectContext ctx) {
public void analyzeQuery(ConnectContext ctx, Map<String, String> mvProperties) {
// create table as select
StatementContext statementContext = ctx.getStatementContext();
NereidsPlanner planner = new NereidsPlanner(statementContext);
@ -216,7 +213,7 @@ public class CreateMTMVInfo {
// can not contain VIEW or MTMV
analyzeBaseTables(planner.getAnalyzedPlan());
// can not contain Random function
analyzeExpressions(planner.getAnalyzedPlan());
analyzeExpressions(planner.getAnalyzedPlan(), mvProperties);
// can not contain partition or tablets
boolean containTableQueryOperator = MaterializedViewUtils.containTableQueryOperator(planner.getAnalyzedPlan());
if (containTableQueryOperator) {
@ -342,11 +339,17 @@ public class CreateMTMVInfo {
}
}
private void analyzeExpressions(Plan plan) {
List<TreeNode<Expression>> functionCollectResult = new ArrayList<>();
plan.accept(NondeterministicFunctionCollector.INSTANCE, functionCollectResult);
private void analyzeExpressions(Plan plan, Map<String, String> mvProperties) {
boolean enableNondeterministicFunction = Boolean.parseBoolean(
mvProperties.get(PropertyAnalyzer.PROPERTIES_ENABLE_NONDETERMINISTIC_FUNCTION));
if (enableNondeterministicFunction) {
return;
}
List<Expression> functionCollectResult = MaterializedViewUtils.extractNondeterministicFunction(plan);
if (!CollectionUtils.isEmpty(functionCollectResult)) {
throw new AnalysisException("can not contain invalid expression");
throw new AnalysisException(String.format(
"can not contain invalid expression, the expression is %s",
functionCollectResult.stream().map(Expression::toString).collect(Collectors.joining(","))));
}
}

View File

@ -17,31 +17,34 @@
package org.apache.doris.nereids.trees.plans.visitor;
import org.apache.doris.nereids.trees.TreeNode;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.functions.Nondeterministic;
import org.apache.doris.nereids.trees.expressions.functions.ExpressionTrait;
import org.apache.doris.nereids.trees.expressions.functions.FunctionTrait;
import org.apache.doris.nereids.trees.plans.Plan;
import java.util.List;
import java.util.Set;
/**
* Collect the nondeterministic expr in plan, these expressions will be put into context
*/
public class NondeterministicFunctionCollector
extends DefaultPlanVisitor<Void, List<TreeNode<Expression>>> {
extends DefaultPlanVisitor<Void, List<Expression>> {
public static final NondeterministicFunctionCollector INSTANCE
= new NondeterministicFunctionCollector();
public static final NondeterministicFunctionCollector INSTANCE = new NondeterministicFunctionCollector();
@Override
public Void visit(Plan plan, List<TreeNode<Expression>> collectedExpressions) {
public Void visit(Plan plan, List<Expression> collectedExpressions) {
List<? extends Expression> expressions = plan.getExpressions();
if (expressions.isEmpty()) {
return super.visit(plan, collectedExpressions);
}
expressions.forEach(expression -> {
collectedExpressions.addAll(expression.collect(Nondeterministic.class::isInstance));
});
for (Expression expression : expressions) {
Set<Expression> nondeterministicFunctions =
expression.collect(expr -> !((ExpressionTrait) expr).isDeterministic()
&& expr instanceof FunctionTrait);
collectedExpressions.addAll(nondeterministicFunctions);
}
return super.visit(plan, collectedExpressions);
}
}