[improvement](mtmv) Support union rewrite when the materialized view is not enough to provide all the data for the query (#33800)
When the materialized view is not enough to provide all the data for the query, if the materialized view is increment update by partition. we can union materialized view and origin query to reponse the query. this depends on https://github.com/apache/doris/pull/33362 such as materialized view def is as following: > CREATE MATERIALIZED VIEW mv_10086 > BUILD IMMEDIATE REFRESH AUTO ON MANUAL > partition by(l_shipdate) > DISTRIBUTED BY RANDOM BUCKETS 2 > PROPERTIES ('replication_num' = '1') > AS > select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total > from lineitem > left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate > group by > l_shipdate, > o_orderdate, > l_partkey, > l_suppkey; the materialized view data is as following: +------------+-------------+-----------+-----------+-----------+ | l_shipdate | o_orderdate | l_partkey | l_suppkey | sum_total | +------------+-------------+-----------+-----------+-----------+ | 2023-10-18 | 2023-10-18 | 2 | 3 | 109.20 | | 2023-10-17 | 2023-10-17 | 2 | 3 | 99.50 | | 2023-10-19 | 2023-10-19 | 2 | 3 | 99.50 | +------------+-------------+-----------+-----------+-----------+ when we insert data to partition `2023-10-17`, if we run query as following ``` select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total from lineitem left join orders on lineitem.l_orderkey = orders.o_orderkey and l_shipdate = o_orderdate group by l_shipdate, o_orderdate, l_partkey, l_suppkey; ``` query rewrite by materialzied view will fail with message `Check partition query used validation fail` if we turn on the switch `SET enable_materialized_view_union_rewrite = true;` default true we run the query above again, it will success and will use union all materialized view and origin query to response the query correctly. the plan is as following: ``` | Explain String(Nereids Planner) | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS: | | l_shipdate[#52] | | o_orderdate[#53] | | l_partkey[#54] | | l_suppkey[#55] | | sum_total[#56] | | PARTITION: UNPARTITIONED | | | | HAS_COLO_PLAN_NODE: false | | | | VRESULT SINK | | MYSQL_PROTOCAL | | | | 11:VEXCHANGE | | offset: 0 | | distribute expr lists: | | | | PLAN FRAGMENT 1 | | | | PARTITION: HASH_PARTITIONED: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45] | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 11 | | UNPARTITIONED | | | | 10:VUNION(756) | | | | | |----9:VAGGREGATE (merge finalize)(753) | | | | output: sum(partial_sum(o_totalprice)[#46])[#51] | | | | group by: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45] | | | | cardinality=2 | | | | distribute expr lists: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45] | | | | | | | 8:VEXCHANGE | | | offset: 0 | | | distribute expr lists: l_shipdate[#42] | | | | | 1:VEXCHANGE | | offset: 0 | | distribute expr lists: | | | | PLAN FRAGMENT 2 | | | | PARTITION: HASH_PARTITIONED: o_orderkey[#21], o_orderdate[#25] | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 08 | | HASH_PARTITIONED: l_shipdate[#42], o_orderdate[#43], l_partkey[#44], l_suppkey[#45] | | | | 7:VAGGREGATE (update serialize)(747) | | | STREAMING | | | output: partial_sum(o_totalprice[#41])[#46] | | | group by: l_shipdate[#37], o_orderdate[#38], l_partkey[#39], l_suppkey[#40] | | | cardinality=2 | | | distribute expr lists: l_shipdate[#37] | | | | | 6:VHASH JOIN(741) | | | join op: RIGHT OUTER JOIN(PARTITIONED)[] | | | equal join conjunct: (o_orderkey[#21] = l_orderkey[#5]) | | | equal join conjunct: (o_orderdate[#25] = l_shipdate[#15]) | | | runtime filters: RF000[min_max] <- l_orderkey[#5](2/2/2048), RF001[bloom] <- l_orderkey[#5](2/2/2048), RF002[min_max] <- l_shipdate[#15](1/1/2048), RF003[bloom] <- l_shipdate[#15](1/1/2048) | | | cardinality=2 | | | vec output tuple id: 4 | | | output tuple id: 4 | | | vIntermediate tuple ids: 3 | | | hash output slot ids: 6 7 24 25 15 | | | final projections: l_shipdate[#36], o_orderdate[#32], l_partkey[#34], l_suppkey[#35], o_totalprice[#31] | | | final project output tuple id: 4 | | | distribute expr lists: o_orderkey[#21], o_orderdate[#25] | | | distribute expr lists: l_orderkey[#5], l_shipdate[#15] | | | | | |----3:VEXCHANGE | | | offset: 0 | | | distribute expr lists: l_orderkey[#5] | | | | | 5:VEXCHANGE | | offset: 0 | | distribute expr lists: | | | | PLAN FRAGMENT 3 | | | | PARTITION: RANDOM | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 05 | | HASH_PARTITIONED: o_orderkey[#21], o_orderdate[#25] | | | | 4:VOlapScanNode(722) | | TABLE: union_db.orders(orders), PREAGGREGATION: ON | | runtime filters: RF000[min_max] -> o_orderkey[#21], RF001[bloom] -> o_orderkey[#21], RF002[min_max] -> o_orderdate[#25], RF003[bloom] -> o_orderdate[#25] | | partitions=3/3 (p_20231017,p_20231018,p_20231019), tablets=9/9, tabletList=161188,161190,161192 ... | | cardinality=3, avgRowSize=0.0, numNodes=1 | | pushAggOp=NONE | | | | PLAN FRAGMENT 4 | | | | PARTITION: HASH_PARTITIONED: l_orderkey[#5] | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 03 | | HASH_PARTITIONED: l_orderkey[#5], l_shipdate[#15] | | | | 2:VOlapScanNode(729) | | TABLE: union_db.lineitem(lineitem), PREAGGREGATION: ON | | PREDICATES: (l_shipdate[#15] >= '2023-10-17') AND (l_shipdate[#15] < '2023-10-18') | | partitions=1/3 (p_20231017), tablets=3/3, tabletList=161223,161225,161227 | | cardinality=2, avgRowSize=0.0, numNodes=1 | | pushAggOp=NONE | | | | PLAN FRAGMENT 5 | | | | PARTITION: RANDOM | | | | HAS_COLO_PLAN_NODE: false | | | | STREAM DATA SINK | | EXCHANGE ID: 01 | | RANDOM | | | | 0:VOlapScanNode(718) | | TABLE: union_db.mv_10086(mv_10086), PREAGGREGATION: ON | | partitions=2/3 (p_20231018_20231019,p_20231019_20231020), tablets=4/4, tabletList=161251,161253,161265 ... | | cardinality=2, avgRowSize=0.0, numNodes=1 | | pushAggOp=NONE | | | | MaterializedView | | MaterializedViewRewriteSuccessAndChose: | | Names: mv_10086 | | MaterializedViewRewriteSuccessButNotChose: | | | | MaterializedViewRewriteFail: | +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ ```
This commit is contained in:
@ -17,27 +17,25 @@
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.NereidsPlanner;
|
||||
import org.apache.doris.nereids.StatementContext;
|
||||
import org.apache.doris.nereids.analyzer.UnboundResultSink;
|
||||
import org.apache.doris.nereids.analyzer.UnboundTableSink;
|
||||
import org.apache.doris.nereids.jobs.executor.Rewriter;
|
||||
import org.apache.doris.nereids.parser.NereidsParser;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.rules.RuleType;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.MaterializedViewUtils;
|
||||
import org.apache.doris.nereids.rules.rewrite.EliminateSort;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalTableSink;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.OriginStatement;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.stream.Collectors;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
/**
|
||||
* The cache for materialized view cache
|
||||
@ -70,27 +68,25 @@ public class MTMVCache {
|
||||
if (mvSqlStatementContext.getConnectContext().getStatementContext() == null) {
|
||||
mvSqlStatementContext.getConnectContext().setStatementContext(mvSqlStatementContext);
|
||||
}
|
||||
unboundMvPlan = unboundMvPlan.accept(new DefaultPlanVisitor<LogicalPlan, Void>() {
|
||||
// convert to table sink to eliminate sort under table sink, because sort under result sink can not be
|
||||
// eliminated
|
||||
@Override
|
||||
public LogicalPlan visitUnboundResultSink(UnboundResultSink<? extends Plan> unboundResultSink,
|
||||
Void context) {
|
||||
return new UnboundTableSink<>(mtmv.getFullQualifiers(),
|
||||
mtmv.getBaseSchema().stream().map(Column::getName).collect(Collectors.toList()),
|
||||
Lists.newArrayList(),
|
||||
mtmv.getPartitions().stream().map(Partition::getName).collect(Collectors.toList()),
|
||||
unboundResultSink.child());
|
||||
}
|
||||
}, null);
|
||||
// Can not convert to table sink, because use the same column from different table when self join
|
||||
// the out slot is wrong
|
||||
Plan originPlan = planner.plan(unboundMvPlan, PhysicalProperties.ANY, ExplainLevel.REWRITTEN_PLAN);
|
||||
// eliminate logicalTableSink because sink operator is useless in query rewrite by materialized view
|
||||
Plan mvPlan = planner.getCascadesContext().getRewritePlan().accept(new DefaultPlanRewriter<Object>() {
|
||||
// Eliminate result sink because sink operator is useless in query rewrite by materialized view
|
||||
// and the top sort can also be removed
|
||||
Plan mvPlan = originPlan.accept(new DefaultPlanRewriter<Object>() {
|
||||
@Override
|
||||
public Plan visitLogicalTableSink(LogicalTableSink<? extends Plan> logicalTableSink, Object context) {
|
||||
return logicalTableSink.child().accept(this, context);
|
||||
public Plan visitLogicalResultSink(LogicalResultSink<? extends Plan> logicalResultSink, Object context) {
|
||||
return logicalResultSink.child().accept(this, context);
|
||||
}
|
||||
}, null);
|
||||
// Optimize by rules to remove top sort
|
||||
CascadesContext parentCascadesContext = CascadesContext.initContext(mvSqlStatementContext, mvPlan,
|
||||
PhysicalProperties.ANY);
|
||||
mvPlan = MaterializedViewUtils.rewriteByRules(parentCascadesContext, childContext -> {
|
||||
Rewriter.getCteChildrenRewriter(childContext,
|
||||
ImmutableList.of(Rewriter.custom(RuleType.ELIMINATE_SORT, EliminateSort::new))).execute();
|
||||
return childContext.getRewritePlan();
|
||||
}, mvPlan, originPlan);
|
||||
return new MTMVCache(mvPlan, originPlan);
|
||||
}
|
||||
}
|
||||
|
||||
@ -83,8 +83,9 @@ public class CommonSubExpressionOpt extends PlanPostProcessor {
|
||||
// 'case slot whenClause2 END'
|
||||
// This is illegal.
|
||||
Expression rewritten = expr.accept(ExpressionReplacer.INSTANCE, aliasMap);
|
||||
Alias alias = new Alias(rewritten);
|
||||
aliasMap.put(expr, alias);
|
||||
// if rewritten is already alias, use it directly, because in materialized view rewriting
|
||||
// Should keep out slot immutably after rewritten successfully
|
||||
aliasMap.put(expr, rewritten instanceof Alias ? (Alias) rewritten : new Alias(rewritten));
|
||||
}
|
||||
});
|
||||
layer.addAll(aliasMap.values());
|
||||
|
||||
@ -17,27 +17,33 @@
|
||||
|
||||
package org.apache.doris.nereids.rules.exploration.mv;
|
||||
|
||||
import org.apache.doris.analysis.PartitionKeyDesc;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.mtmv.BaseTableInfo;
|
||||
import org.apache.doris.mtmv.MTMVPartitionInfo;
|
||||
import org.apache.doris.mtmv.MTMVRewriteUtil;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.jobs.executor.Rewriter;
|
||||
import org.apache.doris.nereids.properties.LogicalProperties;
|
||||
import org.apache.doris.nereids.rules.exploration.ExplorationRuleFactory;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.Predicates.SplitPredicate;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.StructInfo.InvalidPartitionRemover;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.StructInfo.QueryScanPartitionsCollector;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.mapping.ExpressionMapping;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.mapping.RelationMapping;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping;
|
||||
import org.apache.doris.nereids.trees.expressions.Alias;
|
||||
import org.apache.doris.nereids.trees.expressions.ExprId;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.Not;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.scalar.NonNullable;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.scalar.Nullable;
|
||||
@ -46,20 +52,26 @@ import org.apache.doris.nereids.trees.expressions.literal.Literal;
|
||||
import org.apache.doris.nereids.trees.plans.JoinType;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
|
||||
import org.apache.doris.nereids.util.ExpressionUtils;
|
||||
import org.apache.doris.nereids.util.TypeUtils;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMultimap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.BitSet;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
@ -129,20 +141,22 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
|
||||
*/
|
||||
protected List<StructInfo> getValidQueryStructInfos(Plan queryPlan, CascadesContext cascadesContext,
|
||||
BitSet materializedViewTableSet) {
|
||||
return MaterializedViewUtils.extractStructInfo(queryPlan, cascadesContext, materializedViewTableSet)
|
||||
.stream()
|
||||
.filter(queryStructInfo -> {
|
||||
boolean valid = checkPattern(queryStructInfo);
|
||||
if (!valid) {
|
||||
cascadesContext.getMaterializationContexts().forEach(ctx ->
|
||||
ctx.recordFailReason(queryStructInfo, "Query struct info is invalid",
|
||||
() -> String.format("query table bitmap is %s, plan is %s",
|
||||
queryStructInfo.getTableBitSet(), queryPlan.treeString())
|
||||
));
|
||||
}
|
||||
return valid;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
List<StructInfo> validStructInfos = new ArrayList<>();
|
||||
List<StructInfo> uncheckedStructInfos = MaterializedViewUtils.extractStructInfo(queryPlan, cascadesContext,
|
||||
materializedViewTableSet);
|
||||
uncheckedStructInfos.forEach(queryStructInfo -> {
|
||||
boolean valid = checkPattern(queryStructInfo);
|
||||
if (!valid) {
|
||||
cascadesContext.getMaterializationContexts().forEach(ctx ->
|
||||
ctx.recordFailReason(queryStructInfo, "Query struct info is invalid",
|
||||
() -> String.format("query table bitmap is %s, plan is %s",
|
||||
queryStructInfo.getTableBitSet(), queryPlan.treeString())
|
||||
));
|
||||
} else {
|
||||
validStructInfos.add(queryStructInfo);
|
||||
}
|
||||
});
|
||||
return validStructInfos;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -200,13 +214,13 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
|
||||
}
|
||||
Plan rewrittenPlan;
|
||||
Plan mvScan = materializationContext.getMvScanPlan();
|
||||
Plan topPlan = queryStructInfo.getTopPlan();
|
||||
Plan queryPlan = queryStructInfo.getTopPlan();
|
||||
if (compensatePredicates.isAlwaysTrue()) {
|
||||
rewrittenPlan = mvScan;
|
||||
} else {
|
||||
// Try to rewrite compensate predicates by using mv scan
|
||||
List<Expression> rewriteCompensatePredicates = rewriteExpression(compensatePredicates.toList(),
|
||||
topPlan, materializationContext.getMvExprToMvScanExprMapping(),
|
||||
queryPlan, materializationContext.getMvExprToMvScanExprMapping(),
|
||||
viewToQuerySlotMapping, true, queryStructInfo.getTableBitSet());
|
||||
if (rewriteCompensatePredicates.isEmpty()) {
|
||||
materializationContext.recordFailReason(queryStructInfo,
|
||||
@ -225,65 +239,125 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
|
||||
if (rewrittenPlan == null) {
|
||||
continue;
|
||||
}
|
||||
final Plan finalRewrittenPlan = rewriteByRules(cascadesContext, rewrittenPlan, topPlan);
|
||||
if (!isOutputValid(topPlan, finalRewrittenPlan)) {
|
||||
materializationContext.recordFailReason(queryStructInfo,
|
||||
"RewrittenPlan output logical properties is different with target group",
|
||||
() -> String.format("planOutput logical"
|
||||
+ " properties = %s,\n groupOutput logical properties = %s",
|
||||
finalRewrittenPlan.getLogicalProperties(), topPlan.getLogicalProperties()));
|
||||
continue;
|
||||
}
|
||||
rewrittenPlan = MaterializedViewUtils.rewriteByRules(cascadesContext,
|
||||
childContext -> {
|
||||
Rewriter.getWholeTreeRewriter(childContext).execute();
|
||||
return childContext.getRewritePlan();
|
||||
}, rewrittenPlan, queryPlan);
|
||||
// check the partitions used by rewritten plan is valid or not
|
||||
Set<Long> invalidPartitionsQueryUsed =
|
||||
calcInvalidPartitions(finalRewrittenPlan, materializationContext, cascadesContext);
|
||||
if (!invalidPartitionsQueryUsed.isEmpty()) {
|
||||
Multimap<Pair<MTMVPartitionInfo, PartitionInfo>, Partition> invalidPartitionsQueryUsed =
|
||||
calcUsedInvalidMvPartitions(rewrittenPlan, materializationContext, cascadesContext);
|
||||
// All partition used by query is valid
|
||||
if (!invalidPartitionsQueryUsed.isEmpty() && !cascadesContext.getConnectContext().getSessionVariable()
|
||||
.isEnableMaterializedViewUnionRewrite()) {
|
||||
materializationContext.recordFailReason(queryStructInfo,
|
||||
"Check partition query used validation fail",
|
||||
() -> String.format("the partition used by query is invalid by materialized view,"
|
||||
+ "invalid partition info query used is %s",
|
||||
materializationContext.getMTMV().getPartitions().stream()
|
||||
.filter(partition ->
|
||||
invalidPartitionsQueryUsed.contains(partition.getId()))
|
||||
invalidPartitionsQueryUsed.values().stream()
|
||||
.map(Partition::getName)
|
||||
.collect(Collectors.toSet())));
|
||||
continue;
|
||||
}
|
||||
boolean partitionValid = invalidPartitionsQueryUsed.isEmpty();
|
||||
if (checkCanUnionRewrite(invalidPartitionsQueryUsed, queryPlan, cascadesContext)) {
|
||||
// construct filter on originalPlan
|
||||
Map<TableIf, Set<Expression>> filterOnOriginPlan;
|
||||
try {
|
||||
filterOnOriginPlan = Predicates.constructFilterByPartitions(invalidPartitionsQueryUsed,
|
||||
queryToViewSlotMapping);
|
||||
if (filterOnOriginPlan.isEmpty()) {
|
||||
materializationContext.recordFailReason(queryStructInfo,
|
||||
"construct invalid partition filter on query fail",
|
||||
() -> String.format("the invalid partitions used by query is %s, query plan is %s",
|
||||
invalidPartitionsQueryUsed.values().stream().map(Partition::getName)
|
||||
.collect(Collectors.toSet()),
|
||||
queryStructInfo.getOriginalPlan().treeString()));
|
||||
continue;
|
||||
}
|
||||
} catch (org.apache.doris.common.AnalysisException e) {
|
||||
materializationContext.recordFailReason(queryStructInfo,
|
||||
"construct invalid partition filter on query analysis fail",
|
||||
() -> String.format("the invalid partitions used by query is %s, query plan is %s",
|
||||
invalidPartitionsQueryUsed.values().stream().map(Partition::getName)
|
||||
.collect(Collectors.toSet()),
|
||||
queryStructInfo.getOriginalPlan().treeString()));
|
||||
continue;
|
||||
}
|
||||
// For rewrittenPlan which contains materialized view should remove invalid partition ids
|
||||
List<Plan> children = Lists.newArrayList(
|
||||
rewrittenPlan.accept(new InvalidPartitionRemover(), Pair.of(materializationContext.getMTMV(),
|
||||
invalidPartitionsQueryUsed.values().stream()
|
||||
.map(Partition::getId).collect(Collectors.toSet()))),
|
||||
StructInfo.addFilterOnTableScan(queryPlan, filterOnOriginPlan, cascadesContext));
|
||||
// Union query materialized view and source table
|
||||
rewrittenPlan = new LogicalUnion(Qualifier.ALL,
|
||||
queryPlan.getOutput().stream().map(NamedExpression.class::cast).collect(Collectors.toList()),
|
||||
children.stream()
|
||||
.map(plan -> plan.getOutput().stream()
|
||||
.map(slot -> (SlotReference) slot.toSlot()).collect(Collectors.toList()))
|
||||
.collect(Collectors.toList()),
|
||||
ImmutableList.of(),
|
||||
false,
|
||||
children);
|
||||
partitionValid = true;
|
||||
}
|
||||
if (!partitionValid) {
|
||||
materializationContext.recordFailReason(queryStructInfo,
|
||||
"materialized view partition is invalid union fail",
|
||||
() -> String.format("invalidPartitionsQueryUsed = %s,\n query plan = %s",
|
||||
invalidPartitionsQueryUsed, queryPlan.treeString()));
|
||||
continue;
|
||||
}
|
||||
rewrittenPlan = normalizeExpressions(rewrittenPlan, queryPlan);
|
||||
if (!isOutputValid(queryPlan, rewrittenPlan)) {
|
||||
LogicalProperties logicalProperties = rewrittenPlan.getLogicalProperties();
|
||||
materializationContext.recordFailReason(queryStructInfo,
|
||||
"RewrittenPlan output logical properties is different with target group",
|
||||
() -> String.format("planOutput logical"
|
||||
+ " properties = %s,\n groupOutput logical properties = %s",
|
||||
logicalProperties, queryPlan.getLogicalProperties()));
|
||||
continue;
|
||||
}
|
||||
recordIfRewritten(queryStructInfo.getOriginalPlan(), materializationContext);
|
||||
rewriteResults.add(finalRewrittenPlan);
|
||||
rewriteResults.add(rewrittenPlan);
|
||||
}
|
||||
return rewriteResults;
|
||||
}
|
||||
|
||||
/**
|
||||
* Rewrite by rules and try to make output is the same after optimize by rules
|
||||
*/
|
||||
protected Plan rewriteByRules(CascadesContext cascadesContext, Plan rewrittenPlan, Plan originPlan) {
|
||||
List<Slot> originOutputs = originPlan.getOutput();
|
||||
if (originOutputs.size() != rewrittenPlan.getOutput().size()) {
|
||||
return null;
|
||||
private boolean checkCanUnionRewrite(Multimap<Pair<MTMVPartitionInfo, PartitionInfo>, Partition>
|
||||
invalidPartitionsQueryUsed, Plan queryPlan, CascadesContext cascadesContext) {
|
||||
if (invalidPartitionsQueryUsed.isEmpty()
|
||||
|| !cascadesContext.getConnectContext().getSessionVariable().isEnableMaterializedViewUnionRewrite()) {
|
||||
return false;
|
||||
}
|
||||
Map<Slot, ExprId> originSlotToRewrittenExprId = Maps.newLinkedHashMap();
|
||||
for (int i = 0; i < originOutputs.size(); i++) {
|
||||
originSlotToRewrittenExprId.put(originOutputs.get(i), rewrittenPlan.getOutput().get(i).getExprId());
|
||||
}
|
||||
// run rbo job on mv rewritten plan
|
||||
CascadesContext rewrittenPlanContext = CascadesContext.initContext(
|
||||
cascadesContext.getStatementContext(), rewrittenPlan,
|
||||
cascadesContext.getCurrentJobContext().getRequiredProperties());
|
||||
Rewriter.getWholeTreeRewriter(rewrittenPlanContext).execute();
|
||||
rewrittenPlan = rewrittenPlanContext.getRewritePlan();
|
||||
|
||||
// for get right nullable after rewritten, we need this map
|
||||
Map<ExprId, Slot> exprIdToNewRewrittenSlot = Maps.newLinkedHashMap();
|
||||
for (Slot slot : rewrittenPlan.getOutput()) {
|
||||
exprIdToNewRewrittenSlot.put(slot.getExprId(), slot);
|
||||
// if mv can not offer valid partition data for query, bail out union rewrite
|
||||
Map<Long, Set<PartitionItem>> mvRelatedTablePartitionMap = new LinkedHashMap<>();
|
||||
invalidPartitionsQueryUsed.keySet().forEach(invalidPartition ->
|
||||
mvRelatedTablePartitionMap.put(invalidPartition.key().getRelatedTableInfo().getTableId(),
|
||||
new HashSet<>()));
|
||||
queryPlan.accept(new QueryScanPartitionsCollector(), mvRelatedTablePartitionMap);
|
||||
Set<PartitionKeyDesc> partitionKeyDescSetQueryUsed = mvRelatedTablePartitionMap.values().stream()
|
||||
.flatMap(Collection::stream)
|
||||
.map(PartitionItem::toPartitionKeyDesc)
|
||||
.collect(Collectors.toSet());
|
||||
Set<PartitionKeyDesc> mvInvalidPartitionKeyDescSet = new HashSet<>();
|
||||
for (Map.Entry<Pair<MTMVPartitionInfo, PartitionInfo>, Collection<Partition>> entry :
|
||||
invalidPartitionsQueryUsed.asMap().entrySet()) {
|
||||
entry.getValue().forEach(invalidPartition -> mvInvalidPartitionKeyDescSet.add(
|
||||
entry.getKey().value().getItem(invalidPartition.getId()).toPartitionKeyDesc()));
|
||||
}
|
||||
return !mvInvalidPartitionKeyDescSet.containsAll(partitionKeyDescSetQueryUsed);
|
||||
}
|
||||
|
||||
// Normalize expression such as nullable property and output slot id
|
||||
protected Plan normalizeExpressions(Plan rewrittenPlan, Plan originPlan) {
|
||||
// normalize nullable
|
||||
ImmutableList<NamedExpression> convertNullable = originOutputs.stream()
|
||||
.map(s -> normalizeExpression(s, exprIdToNewRewrittenSlot.get(originSlotToRewrittenExprId.get(s))))
|
||||
.collect(ImmutableList.toImmutableList());
|
||||
return new LogicalProject<>(convertNullable, rewrittenPlan);
|
||||
List<NamedExpression> normalizeProjects = new ArrayList<>();
|
||||
for (int i = 0; i < originPlan.getOutput().size(); i++) {
|
||||
normalizeProjects.add(normalizeExpression(originPlan.getOutput().get(i), rewrittenPlan.getOutput().get(i)));
|
||||
}
|
||||
return new LogicalProject<>(normalizeProjects, rewrittenPlan);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -303,35 +377,47 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
|
||||
* catalog relation.
|
||||
* Maybe only just some partitions is valid in materialized view, so we should check if the mv can
|
||||
* offer the partitions which query used or not.
|
||||
*
|
||||
* @return the invalid partition name set
|
||||
*/
|
||||
protected Set<Long> calcInvalidPartitions(Plan rewrittenPlan, MaterializationContext materializationContext,
|
||||
protected Multimap<Pair<MTMVPartitionInfo, PartitionInfo>, Partition> calcUsedInvalidMvPartitions(
|
||||
Plan rewrittenPlan,
|
||||
MaterializationContext materializationContext,
|
||||
CascadesContext cascadesContext) {
|
||||
// check partition is valid or not
|
||||
MTMV mtmv = materializationContext.getMTMV();
|
||||
PartitionInfo mvPartitionInfo = mtmv.getPartitionInfo();
|
||||
if (PartitionType.UNPARTITIONED.equals(mvPartitionInfo.getType())) {
|
||||
// if not partition, if rewrite success, it means mv is available
|
||||
return ImmutableSet.of();
|
||||
return ImmutableMultimap.of();
|
||||
}
|
||||
// check mv related table partition is valid or not
|
||||
MTMVPartitionInfo mvCustomPartitionInfo = mtmv.getMvPartitionInfo();
|
||||
BaseTableInfo relatedPartitionTable = mvCustomPartitionInfo.getRelatedTableInfo();
|
||||
if (relatedPartitionTable == null) {
|
||||
return ImmutableSet.of();
|
||||
return ImmutableMultimap.of();
|
||||
}
|
||||
// get mv valid partitions
|
||||
Set<Long> mvDataValidPartitionIdSet = MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv,
|
||||
cascadesContext.getConnectContext(), System.currentTimeMillis()).stream()
|
||||
.map(Partition::getId)
|
||||
.collect(Collectors.toSet());
|
||||
Set<Long> queryUsedPartitionIdSet = rewrittenPlan.collectToList(node -> node instanceof LogicalOlapScan
|
||||
// get partitions query used
|
||||
Set<Long> mvPartitionSetQueryUsed = rewrittenPlan.collectToList(node -> node instanceof LogicalOlapScan
|
||||
&& Objects.equals(((CatalogRelation) node).getTable().getName(), mtmv.getName()))
|
||||
.stream()
|
||||
.map(node -> ((LogicalOlapScan) node).getSelectedPartitionIds())
|
||||
.flatMap(Collection::stream)
|
||||
.collect(Collectors.toSet());
|
||||
queryUsedPartitionIdSet.removeAll(mvDataValidPartitionIdSet);
|
||||
return queryUsedPartitionIdSet;
|
||||
// get invalid partition ids
|
||||
Set<Long> invalidMvPartitionIdSet = new HashSet<>(mvPartitionSetQueryUsed);
|
||||
invalidMvPartitionIdSet.removeAll(mvDataValidPartitionIdSet);
|
||||
ImmutableMultimap.Builder<Pair<MTMVPartitionInfo, PartitionInfo>, Partition> invalidPartitionMapBuilder =
|
||||
ImmutableMultimap.builder();
|
||||
Pair<MTMVPartitionInfo, PartitionInfo> partitionInfo = Pair.of(mvCustomPartitionInfo, mvPartitionInfo);
|
||||
invalidMvPartitionIdSet.forEach(invalidPartitionId ->
|
||||
invalidPartitionMapBuilder.put(partitionInfo, mtmv.getPartition(invalidPartitionId)));
|
||||
return invalidPartitionMapBuilder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -27,6 +27,7 @@ import org.apache.doris.mtmv.MTMVRelatedTableIf;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.memo.Group;
|
||||
import org.apache.doris.nereids.memo.StructInfoMap;
|
||||
import org.apache.doris.nereids.trees.expressions.ExprId;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
@ -49,15 +50,17 @@ import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
|
||||
|
||||
import com.google.common.collect.HashMultimap;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Multimap;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.BitSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@ -147,13 +150,19 @@ public class MaterializedViewUtils {
|
||||
StructInfoMap structInfoMap = ownerGroup.getstructInfoMap();
|
||||
structInfoMap.refresh(ownerGroup);
|
||||
Set<BitSet> queryTableSets = structInfoMap.getTableMaps();
|
||||
ImmutableList.Builder<StructInfo> structInfosBuilder = ImmutableList.builder();
|
||||
if (!queryTableSets.isEmpty()) {
|
||||
return queryTableSets.stream()
|
||||
// Just construct the struct info which mv table set contains all the query table set
|
||||
.filter(queryTableSet -> materializedViewTableSet.isEmpty()
|
||||
|| StructInfo.containsAll(materializedViewTableSet, queryTableSet))
|
||||
.map(tableMap -> structInfoMap.getStructInfo(tableMap, tableMap, ownerGroup, plan))
|
||||
.collect(Collectors.toList());
|
||||
for (BitSet queryTableSet : queryTableSets) {
|
||||
if (!materializedViewTableSet.isEmpty()
|
||||
&& !StructInfo.containsAll(materializedViewTableSet, queryTableSet)) {
|
||||
continue;
|
||||
}
|
||||
StructInfo structInfo = structInfoMap.getStructInfo(queryTableSet, queryTableSet, ownerGroup, plan);
|
||||
if (structInfo != null) {
|
||||
structInfosBuilder.add(structInfo);
|
||||
}
|
||||
}
|
||||
return structInfosBuilder.build();
|
||||
}
|
||||
}
|
||||
// if plan doesn't belong to any group, construct it directly
|
||||
@ -172,8 +181,8 @@ public class MaterializedViewUtils {
|
||||
materializedView,
|
||||
ImmutableList.of(materializedView.getQualifiedDbName()),
|
||||
// this must be empty, or it will be used to sample
|
||||
Lists.newArrayList(),
|
||||
Lists.newArrayList(),
|
||||
ImmutableList.of(),
|
||||
ImmutableList.of(),
|
||||
Optional.empty());
|
||||
mvScan = mvScan.withMaterializedIndexSelected(PreAggStatus.on(), materializedView.getBaseIndexId());
|
||||
List<NamedExpression> mvProjects = mvScan.getOutput().stream().map(NamedExpression.class::cast)
|
||||
@ -181,6 +190,43 @@ public class MaterializedViewUtils {
|
||||
return new LogicalProject<Plan>(mvProjects, mvScan);
|
||||
}
|
||||
|
||||
/**
|
||||
* Optimize by rules, this support optimize by custom rules by define different rewriter according to different
|
||||
* rules
|
||||
*/
|
||||
public static Plan rewriteByRules(
|
||||
CascadesContext cascadesContext,
|
||||
Function<CascadesContext, Plan> planRewriter,
|
||||
Plan rewrittenPlan, Plan originPlan) {
|
||||
List<Slot> originOutputs = originPlan.getOutput();
|
||||
if (originOutputs.size() != rewrittenPlan.getOutput().size()) {
|
||||
return null;
|
||||
}
|
||||
// After RBO, slot order may change, so need originSlotToRewrittenExprId which record
|
||||
// origin plan slot order
|
||||
List<ExprId> originalRewrittenPlanExprIds =
|
||||
rewrittenPlan.getOutput().stream().map(Slot::getExprId).collect(Collectors.toList());
|
||||
// run rbo job on mv rewritten plan
|
||||
CascadesContext rewrittenPlanContext = CascadesContext.initContext(
|
||||
cascadesContext.getStatementContext(), rewrittenPlan,
|
||||
cascadesContext.getCurrentJobContext().getRequiredProperties());
|
||||
rewrittenPlan = planRewriter.apply(rewrittenPlanContext);
|
||||
Map<ExprId, Slot> exprIdToNewRewrittenSlot = Maps.newLinkedHashMap();
|
||||
for (Slot slot : rewrittenPlan.getOutput()) {
|
||||
exprIdToNewRewrittenSlot.put(slot.getExprId(), slot);
|
||||
}
|
||||
List<ExprId> rewrittenPlanExprIds = rewrittenPlan.getOutput().stream()
|
||||
.map(Slot::getExprId).collect(Collectors.toList());
|
||||
// If project order doesn't change, return rewrittenPlan directly
|
||||
if (originalRewrittenPlanExprIds.equals(rewrittenPlanExprIds)) {
|
||||
return rewrittenPlan;
|
||||
}
|
||||
// If project order change, return rewrittenPlan with reordered projects
|
||||
return new LogicalProject<>(originalRewrittenPlanExprIds.stream()
|
||||
.map(exprId -> (NamedExpression) exprIdToNewRewrittenSlot.get(exprId)).collect(Collectors.toList()),
|
||||
rewrittenPlan);
|
||||
}
|
||||
|
||||
private static final class TableQueryOperatorChecker extends DefaultPlanVisitor<Boolean, Void> {
|
||||
public static final TableQueryOperatorChecker INSTANCE = new TableQueryOperatorChecker();
|
||||
|
||||
|
||||
@ -17,8 +17,15 @@
|
||||
|
||||
package org.apache.doris.nereids.rules.exploration.mv;
|
||||
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.mtmv.MTMVPartitionInfo;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.mapping.EquivalenceClassSetMapping;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.mapping.Mapping.MappedSlot;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping;
|
||||
import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
|
||||
import org.apache.doris.nereids.rules.expression.ExpressionOptimization;
|
||||
@ -27,13 +34,17 @@ import org.apache.doris.nereids.trees.expressions.EqualTo;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
|
||||
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
|
||||
import org.apache.doris.nereids.util.ExpressionUtils;
|
||||
import org.apache.doris.nereids.util.Utils;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@ -220,6 +231,47 @@ public class Predicates {
|
||||
return Utils.toSqlString("Predicates", "pulledUpPredicates", pulledUpPredicates);
|
||||
}
|
||||
|
||||
/** Construct filter by partition
|
||||
* @param partitions this is the partition which filter should be constructed from
|
||||
* @param queryToViewSlotMapping construct filter on slot, the slot belong the slotmapping
|
||||
* */
|
||||
public static Map<TableIf, Set<Expression>> constructFilterByPartitions(
|
||||
Multimap<Pair<MTMVPartitionInfo, PartitionInfo>, Partition> partitions,
|
||||
SlotMapping queryToViewSlotMapping) throws AnalysisException {
|
||||
Map<TableIf, Set<Expression>> constructedFilterMap = new HashMap<>();
|
||||
for (Map.Entry<Pair<MTMVPartitionInfo, PartitionInfo>, Collection<Partition>> entry :
|
||||
partitions.asMap().entrySet()) {
|
||||
// Get the base table partition column mv related
|
||||
String relatedCol = entry.getKey().key().getRelatedCol();
|
||||
TableIf relatedTableInfo = entry.getKey().key().getRelatedTable();
|
||||
// Find the query slot which mv partition col mapped to
|
||||
Optional<MappedSlot> partitionSlotQueryUsed = queryToViewSlotMapping.getRelationSlotMap()
|
||||
.keySet()
|
||||
.stream()
|
||||
.filter(mappedSlot -> mappedSlot.getSlot().isColumnFromTable()
|
||||
&& mappedSlot.getSlot().getName().equals(relatedCol)
|
||||
&& mappedSlot.getBelongedRelation() != null
|
||||
&& mappedSlot.getBelongedRelation().getTable().getId() == relatedTableInfo.getId())
|
||||
.findFirst();
|
||||
if (!partitionSlotQueryUsed.isPresent()) {
|
||||
return ImmutableMap.of();
|
||||
}
|
||||
// Constructed filter which should add on the query base table,
|
||||
// after supported data roll up this method should keep logic consistency to partition mapping
|
||||
Set<Expression> partitionExpressions = UpdateMvByPartitionCommand.constructPredicates(
|
||||
// get mv partition items
|
||||
entry.getValue().stream()
|
||||
.map(partition -> entry.getKey().value().getItem(partition.getId()))
|
||||
.collect(Collectors.toSet()),
|
||||
partitionSlotQueryUsed.get().getSlot());
|
||||
// Put partition expressions on query base table
|
||||
constructedFilterMap.computeIfPresent(relatedTableInfo,
|
||||
(key, existExpressions) -> Sets.union(existExpressions, partitionExpressions));
|
||||
constructedFilterMap.computeIfAbsent(relatedTableInfo, key -> partitionExpressions);
|
||||
}
|
||||
return constructedFilterMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* The split different representation for predicate expression, such as equal, range and residual predicate.
|
||||
*/
|
||||
|
||||
@ -17,13 +17,21 @@
|
||||
|
||||
package org.apache.doris.nereids.rules.exploration.mv;
|
||||
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.jobs.executor.Rewriter;
|
||||
import org.apache.doris.nereids.jobs.joinorder.hypergraph.HyperGraph;
|
||||
import org.apache.doris.nereids.jobs.joinorder.hypergraph.edge.JoinEdge;
|
||||
import org.apache.doris.nereids.jobs.joinorder.hypergraph.node.StructInfoNode;
|
||||
import org.apache.doris.nereids.memo.Group;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.Predicates.SplitPredicate;
|
||||
import org.apache.doris.nereids.trees.copier.DeepCopierContext;
|
||||
import org.apache.doris.nereids.trees.copier.LogicalPlanDeepCopier;
|
||||
import org.apache.doris.nereids.trees.expressions.EqualTo;
|
||||
import org.apache.doris.nereids.trees.expressions.ExprId;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
@ -38,11 +46,16 @@ import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.Filter;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.Join;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.Project;
|
||||
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.PredicateAdder;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.ExpressionLineageReplacer;
|
||||
import org.apache.doris.nereids.util.ExpressionUtils;
|
||||
@ -584,4 +597,61 @@ public class StructInfo {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add predicates on base table when materialized view scan contains invalid partitions
|
||||
*/
|
||||
public static class InvalidPartitionRemover extends DefaultPlanRewriter<Pair<MTMV, Set<Long>>> {
|
||||
// materialized view scan is always LogicalOlapScan, so just handle LogicalOlapScan
|
||||
@Override
|
||||
public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, Pair<MTMV, Set<Long>> context) {
|
||||
if (olapScan.getTable().getName().equals(context.key().getName())) {
|
||||
List<Long> selectedPartitionIds = olapScan.getSelectedPartitionIds();
|
||||
return olapScan.withSelectedPartitionIds(selectedPartitionIds.stream()
|
||||
.filter(partitionId -> !context.value().contains(partitionId))
|
||||
.collect(Collectors.toList()));
|
||||
}
|
||||
return olapScan;
|
||||
}
|
||||
}
|
||||
|
||||
/**Collect partitions which scan used according to given table */
|
||||
public static class QueryScanPartitionsCollector extends DefaultPlanVisitor<Plan, Map<Long, Set<PartitionItem>>> {
|
||||
@Override
|
||||
public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation,
|
||||
Map<Long, Set<PartitionItem>> context) {
|
||||
TableIf table = catalogRelation.getTable();
|
||||
if (!context.containsKey(table.getId())) {
|
||||
return catalogRelation;
|
||||
}
|
||||
// Only support check olap partition currently
|
||||
if (catalogRelation instanceof LogicalOlapScan) {
|
||||
LogicalOlapScan logicalOlapScan = (LogicalOlapScan) catalogRelation;
|
||||
PartitionInfo partitionInfo = logicalOlapScan.getTable().getPartitionInfo();
|
||||
logicalOlapScan.getSelectedPartitionIds().stream()
|
||||
.map(partitionInfo::getItem)
|
||||
.forEach(partitionItem -> context.computeIfPresent(table.getId(), (key, oldValue) -> {
|
||||
oldValue.add(partitionItem);
|
||||
return oldValue;
|
||||
}));
|
||||
}
|
||||
return catalogRelation;
|
||||
}
|
||||
}
|
||||
|
||||
/**Add filter on table scan according to table filter map */
|
||||
public static Plan addFilterOnTableScan(Plan queryPlan, Map<TableIf, Set<Expression>> filterOnOriginPlan,
|
||||
CascadesContext parentCascadesContext) {
|
||||
// Firstly, construct filter form invalid partition, this filter should be added on origin plan
|
||||
Plan queryPlanWithUnionFilter = queryPlan.accept(new PredicateAdder(), filterOnOriginPlan);
|
||||
// Deep copy the plan to avoid the plan output is the same with the later union output, this may cause
|
||||
// exec by mistake
|
||||
queryPlanWithUnionFilter = new LogicalPlanDeepCopier().deepCopy(
|
||||
(LogicalPlan) queryPlanWithUnionFilter, new DeepCopierContext());
|
||||
// rbo rewrite after adding filter on origin plan
|
||||
return MaterializedViewUtils.rewriteByRules(parentCascadesContext, context -> {
|
||||
Rewriter.getWholeTreeRewriter(context).execute();
|
||||
return context.getRewritePlan();
|
||||
}, queryPlanWithUnionFilter, queryPlan);
|
||||
}
|
||||
}
|
||||
|
||||
@ -37,7 +37,7 @@ import java.util.List;
|
||||
public class EliminateSort extends DefaultPlanRewriter<Boolean> implements CustomRewriter {
|
||||
@Override
|
||||
public Plan rewriteRoot(Plan plan, JobContext jobContext) {
|
||||
Boolean eliminateSort = false;
|
||||
Boolean eliminateSort = true;
|
||||
return plan.accept(this, eliminateSort);
|
||||
}
|
||||
|
||||
@ -76,7 +76,7 @@ public class EliminateSort extends DefaultPlanRewriter<Boolean> implements Custo
|
||||
// eliminate sort
|
||||
return visit(logicalSink, true);
|
||||
}
|
||||
return skipEliminateSort(logicalSink, eliminateSort);
|
||||
return skipEliminateSort(logicalSink, false);
|
||||
}
|
||||
|
||||
private Plan skipEliminateSort(Plan plan, Boolean eliminateSort) {
|
||||
|
||||
@ -41,6 +41,7 @@ import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.Sink;
|
||||
import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTableCommand;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
|
||||
@ -120,18 +121,27 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static Set<Expression> constructPredicates(Set<PartitionItem> partitions, String colName) {
|
||||
Set<Expression> predicates = new HashSet<>();
|
||||
UnboundSlot slot = new UnboundSlot(colName);
|
||||
return constructPredicates(partitions, slot);
|
||||
}
|
||||
|
||||
/**
|
||||
* construct predicates for partition items, the min key is the min key of range items.
|
||||
* For list partition or less than partition items, the min key is null.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static Set<Expression> constructPredicates(Set<PartitionItem> partitions, Slot colSlot) {
|
||||
Set<Expression> predicates = new HashSet<>();
|
||||
if (partitions.isEmpty()) {
|
||||
return Sets.newHashSet(BooleanLiteral.TRUE);
|
||||
}
|
||||
if (partitions.iterator().next() instanceof ListPartitionItem) {
|
||||
for (PartitionItem item : partitions) {
|
||||
predicates.add(convertListPartitionToIn(item, slot));
|
||||
predicates.add(convertListPartitionToIn(item, colSlot));
|
||||
}
|
||||
} else {
|
||||
for (PartitionItem item : partitions) {
|
||||
predicates.add(convertRangePartitionToCompare(item, slot));
|
||||
predicates.add(convertRangePartitionToCompare(item, colSlot));
|
||||
}
|
||||
}
|
||||
return predicates;
|
||||
@ -186,7 +196,10 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
|
||||
return predicate;
|
||||
}
|
||||
|
||||
static class PredicateAdder extends DefaultPlanRewriter<Map<TableIf, Set<Expression>>> {
|
||||
/**
|
||||
* Add predicates on base table when mv can partition update
|
||||
*/
|
||||
public static class PredicateAdder extends DefaultPlanRewriter<Map<TableIf, Set<Expression>>> {
|
||||
@Override
|
||||
public Plan visitUnboundRelation(UnboundRelation unboundRelation, Map<TableIf, Set<Expression>> predicates) {
|
||||
List<String> tableQualifier = RelationUtil.getQualifierName(ConnectContext.get(),
|
||||
@ -198,5 +211,16 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
|
||||
}
|
||||
return unboundRelation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation,
|
||||
Map<TableIf, Set<Expression>> predicates) {
|
||||
TableIf table = catalogRelation.getTable();
|
||||
if (predicates.containsKey(table)) {
|
||||
return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.get(table))),
|
||||
catalogRelation);
|
||||
}
|
||||
return catalogRelation;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -45,12 +45,12 @@ import java.util.stream.Collectors;
|
||||
* Get from rewrite plan and can also get from plan struct info, if from plan struct info it depends on
|
||||
* the nodes from graph.
|
||||
*/
|
||||
public class ExpressionLineageReplacer extends DefaultPlanVisitor<Void, ExpressionReplaceContext> {
|
||||
public class ExpressionLineageReplacer extends DefaultPlanVisitor<Expression, ExpressionReplaceContext> {
|
||||
|
||||
public static final ExpressionLineageReplacer INSTANCE = new ExpressionLineageReplacer();
|
||||
|
||||
@Override
|
||||
public Void visit(Plan plan, ExpressionReplaceContext context) {
|
||||
public Expression visit(Plan plan, ExpressionReplaceContext context) {
|
||||
List<? extends Expression> expressions = plan.getExpressions();
|
||||
Map<ExprId, Expression> targetExpressionMap = context.getExprIdExpressionMap();
|
||||
// Filter the namedExpression used by target and collect the namedExpression
|
||||
@ -62,7 +62,7 @@ public class ExpressionLineageReplacer extends DefaultPlanVisitor<Void, Expressi
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void visitGroupPlan(GroupPlan groupPlan, ExpressionReplaceContext context) {
|
||||
public Expression visitGroupPlan(GroupPlan groupPlan, ExpressionReplaceContext context) {
|
||||
Group group = groupPlan.getGroup();
|
||||
if (group == null) {
|
||||
return visit(groupPlan, context);
|
||||
|
||||
@ -514,6 +514,9 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public static final String MATERIALIZED_VIEW_REWRITE_SUCCESS_CANDIDATE_NUM
|
||||
= "materialized_view_rewrite_success_candidate_num";
|
||||
|
||||
public static final String ENABLE_MATERIALIZED_VIEW_UNION_REWRITE
|
||||
= "enable_materialized_view_union_rewrite";
|
||||
|
||||
public static final String CREATE_TABLE_PARTITION_MAX_NUM
|
||||
= "create_table_partition_max_num";
|
||||
|
||||
@ -1619,6 +1622,13 @@ public class SessionVariable implements Serializable, Writable {
|
||||
"The max candidate num which participate in CBO when using asynchronous materialized views"})
|
||||
public int materializedViewRewriteSuccessCandidateNum = 3;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_MATERIALIZED_VIEW_UNION_REWRITE, needForward = true,
|
||||
description = {"当物化视图不足以提供查询的全部数据时,是否允许基表和物化视图 union 来响应查询",
|
||||
"When the materialized view is not enough to provide all the data for the query, "
|
||||
+ "whether to allow the union of the base table and the materialized view to "
|
||||
+ "respond to the query"})
|
||||
public boolean enableMaterializedViewUnionRewrite = false;
|
||||
|
||||
@VariableMgr.VarAttr(name = CREATE_TABLE_PARTITION_MAX_NUM, needForward = true,
|
||||
description = {"建表时创建分区的最大数量",
|
||||
"The maximum number of partitions created during table creation"})
|
||||
@ -3628,6 +3638,10 @@ public class SessionVariable implements Serializable, Writable {
|
||||
return materializedViewRewriteSuccessCandidateNum;
|
||||
}
|
||||
|
||||
public boolean isEnableMaterializedViewUnionRewrite() {
|
||||
return enableMaterializedViewUnionRewrite;
|
||||
}
|
||||
|
||||
public int getCreateTablePartitionMaxNum() {
|
||||
return createTablePartitionMaxNum;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user