[fix](mtmv) Fix partition mv rewrite result wrong (#35236)
this is brought by https://github.com/apache/doris/pull/33800 if mv is partitioned materialzied view, the data will be wrong by using the hited materialized view when the paritions in related base partiton table are deleted, created and so on. this fix the problem. if **SET enable_materialized_view_union_rewrite=true;** this will use the materializd view and make sure the data is corrent if **SET enable_materialized_view_union_rewrite=false;** this will query base table directly to make sure the data is right
This commit is contained in:
@ -17,13 +17,12 @@
|
||||
|
||||
package org.apache.doris.nereids.rules.exploration.mv;
|
||||
|
||||
import org.apache.doris.analysis.PartitionKeyDesc;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Id;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.mtmv.BaseTableInfo;
|
||||
@ -34,8 +33,7 @@ import org.apache.doris.nereids.jobs.executor.Rewriter;
|
||||
import org.apache.doris.nereids.properties.LogicalProperties;
|
||||
import org.apache.doris.nereids.rules.exploration.ExplorationRuleFactory;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.Predicates.SplitPredicate;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.StructInfo.InvalidPartitionRemover;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.StructInfo.QueryScanPartitionsCollector;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.StructInfo.PartitionRemover;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.mapping.ExpressionMapping;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.mapping.RelationMapping;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping;
|
||||
@ -60,13 +58,13 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
|
||||
import org.apache.doris.nereids.util.ExpressionUtils;
|
||||
import org.apache.doris.nereids.util.TypeUtils;
|
||||
import org.apache.doris.qe.SessionVariable;
|
||||
import org.apache.doris.statistics.Statistics;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMultimap;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -74,6 +72,7 @@ import org.apache.logging.log4j.Logger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.BitSet;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
@ -185,8 +184,8 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
|
||||
"Query to view table mapping is null", () -> "");
|
||||
return rewriteResults;
|
||||
}
|
||||
int materializedViewRelationMappingMaxCount = cascadesContext.getConnectContext().getSessionVariable()
|
||||
.getMaterializedViewRelationMappingMaxCount();
|
||||
SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable();
|
||||
int materializedViewRelationMappingMaxCount = sessionVariable.getMaterializedViewRelationMappingMaxCount();
|
||||
if (queryToViewTableMappings.size() > materializedViewRelationMappingMaxCount) {
|
||||
LOG.warn("queryToViewTableMappings is over limit and be intercepted");
|
||||
queryToViewTableMappings = queryToViewTableMappings.subList(0, materializedViewRelationMappingMaxCount);
|
||||
@ -260,71 +259,60 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
|
||||
if (rewrittenPlan == null) {
|
||||
continue;
|
||||
}
|
||||
// check the partitions used by rewritten plan is valid or not
|
||||
Multimap<Pair<MTMVPartitionInfo, PartitionInfo>, Partition> invalidPartitionsQueryUsed =
|
||||
calcUsedInvalidMvPartitions(rewrittenPlan, materializationContext, cascadesContext);
|
||||
// All partition used by query is valid
|
||||
if (!invalidPartitionsQueryUsed.isEmpty() && !cascadesContext.getConnectContext().getSessionVariable()
|
||||
.isEnableMaterializedViewUnionRewrite()) {
|
||||
materializationContext.recordFailReason(queryStructInfo,
|
||||
"Check partition query used validation fail",
|
||||
() -> String.format("the partition used by query is invalid by materialized view,"
|
||||
+ "invalid partition info query used is %s",
|
||||
invalidPartitionsQueryUsed.values().stream()
|
||||
.map(Partition::getName)
|
||||
.collect(Collectors.toSet())));
|
||||
continue;
|
||||
}
|
||||
boolean partitionValid = invalidPartitionsQueryUsed.isEmpty();
|
||||
if (checkCanUnionRewrite(invalidPartitionsQueryUsed, queryPlan, cascadesContext)) {
|
||||
// construct filter on originalPlan
|
||||
Map<TableIf, Set<Expression>> filterOnOriginPlan;
|
||||
Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>> invalidPartitions;
|
||||
if (materializationContext instanceof AsyncMaterializationContext) {
|
||||
try {
|
||||
filterOnOriginPlan = Predicates.constructFilterByPartitions(invalidPartitionsQueryUsed,
|
||||
queryToViewSlotMapping);
|
||||
if (filterOnOriginPlan.isEmpty()) {
|
||||
materializationContext.recordFailReason(queryStructInfo,
|
||||
"construct invalid partition filter on query fail",
|
||||
() -> String.format("the invalid partitions used by query is %s, query plan is %s",
|
||||
invalidPartitionsQueryUsed.values().stream().map(Partition::getName)
|
||||
.collect(Collectors.toSet()),
|
||||
queryStructInfo.getOriginalPlan().treeString()));
|
||||
continue;
|
||||
}
|
||||
} catch (org.apache.doris.common.AnalysisException e) {
|
||||
invalidPartitions = calcInvalidPartitions(queryPlan, rewrittenPlan,
|
||||
(AsyncMaterializationContext) materializationContext, cascadesContext);
|
||||
} catch (AnalysisException e) {
|
||||
materializationContext.recordFailReason(queryStructInfo,
|
||||
"construct invalid partition filter on query analysis fail",
|
||||
() -> String.format("the invalid partitions used by query is %s, query plan is %s",
|
||||
invalidPartitionsQueryUsed.values().stream().map(Partition::getName)
|
||||
.collect(Collectors.toSet()),
|
||||
queryStructInfo.getOriginalPlan().treeString()));
|
||||
"Calc invalid partitions fail",
|
||||
() -> String.format("Calc invalid partitions fail, mv partition names are %s",
|
||||
((AsyncMaterializationContext) materializationContext).getMtmv().getPartitions()));
|
||||
LOG.warn("Calc invalid partitions fail", e);
|
||||
continue;
|
||||
}
|
||||
// For rewrittenPlan which contains materialized view should remove invalid partition ids
|
||||
List<Plan> children = Lists.newArrayList(
|
||||
rewrittenPlan.accept(new InvalidPartitionRemover(), Pair.of(
|
||||
materializationContext.getMaterializationQualifier(),
|
||||
invalidPartitionsQueryUsed.values().stream().map(Partition::getId)
|
||||
.collect(Collectors.toSet()))),
|
||||
StructInfo.addFilterOnTableScan(queryPlan, filterOnOriginPlan, cascadesContext));
|
||||
// Union query materialized view and source table
|
||||
rewrittenPlan = new LogicalUnion(Qualifier.ALL,
|
||||
queryPlan.getOutput().stream().map(NamedExpression.class::cast).collect(Collectors.toList()),
|
||||
children.stream()
|
||||
.map(plan -> plan.getOutput().stream()
|
||||
.map(slot -> (SlotReference) slot.toSlot()).collect(Collectors.toList()))
|
||||
.collect(Collectors.toList()),
|
||||
ImmutableList.of(),
|
||||
false,
|
||||
children);
|
||||
partitionValid = true;
|
||||
}
|
||||
if (!partitionValid) {
|
||||
materializationContext.recordFailReason(queryStructInfo,
|
||||
"materialized view partition is invalid union fail",
|
||||
() -> String.format("invalidPartitionsQueryUsed = %s,\n query plan = %s",
|
||||
invalidPartitionsQueryUsed, queryPlan.treeString()));
|
||||
continue;
|
||||
if (invalidPartitions == null) {
|
||||
// if mv can not offer any partition for query, query rewrite bail out to avoid cycle run
|
||||
return rewriteResults;
|
||||
}
|
||||
boolean partitionNeedUnion = needUnionRewrite(invalidPartitions, cascadesContext);
|
||||
final Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>> finalInvalidPartitions =
|
||||
invalidPartitions;
|
||||
if (partitionNeedUnion && !sessionVariable.isEnableMaterializedViewUnionRewrite()) {
|
||||
// if use invalid partition but not enable union rewrite
|
||||
materializationContext.recordFailReason(queryStructInfo,
|
||||
"Partition query used is invalid",
|
||||
() -> String.format("the partition used by query is invalid by materialized view,"
|
||||
+ "invalid partition info query used is %s", finalInvalidPartitions));
|
||||
continue;
|
||||
}
|
||||
if (partitionNeedUnion) {
|
||||
MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv();
|
||||
Plan originPlanWithFilter = StructInfo.addFilterOnTableScan(queryPlan, invalidPartitions.value(),
|
||||
mtmv.getMvPartitionInfo().getPartitionCol(), cascadesContext);
|
||||
if (finalInvalidPartitions.value().isEmpty() || originPlanWithFilter == null) {
|
||||
// only need remove mv invalid partition
|
||||
rewrittenPlan = rewrittenPlan.accept(new PartitionRemover(), invalidPartitions.key());
|
||||
} else {
|
||||
// For rewrittenPlan which contains materialized view should remove invalid partition ids
|
||||
List<Plan> children = Lists.newArrayList(
|
||||
rewrittenPlan.accept(new PartitionRemover(), invalidPartitions.key()),
|
||||
originPlanWithFilter);
|
||||
// Union query materialized view and source table
|
||||
rewrittenPlan = new LogicalUnion(Qualifier.ALL,
|
||||
queryPlan.getOutput().stream().map(NamedExpression.class::cast)
|
||||
.collect(Collectors.toList()),
|
||||
children.stream()
|
||||
.map(plan -> plan.getOutput().stream()
|
||||
.map(slot -> (SlotReference) slot.toSlot())
|
||||
.collect(Collectors.toList()))
|
||||
.collect(Collectors.toList()),
|
||||
ImmutableList.of(),
|
||||
false,
|
||||
children);
|
||||
}
|
||||
}
|
||||
}
|
||||
rewrittenPlan = normalizeExpressions(rewrittenPlan, queryPlan);
|
||||
if (!isOutputValid(queryPlan, rewrittenPlan)) {
|
||||
@ -350,29 +338,11 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
|
||||
return rewriteResults;
|
||||
}
|
||||
|
||||
private boolean checkCanUnionRewrite(Multimap<Pair<MTMVPartitionInfo, PartitionInfo>, Partition>
|
||||
invalidPartitionsQueryUsed, Plan queryPlan, CascadesContext cascadesContext) {
|
||||
if (invalidPartitionsQueryUsed.isEmpty()
|
||||
|| !cascadesContext.getConnectContext().getSessionVariable().isEnableMaterializedViewUnionRewrite()) {
|
||||
return false;
|
||||
}
|
||||
// if mv can not offer valid partition data for query, bail out union rewrite
|
||||
Map<Long, Set<PartitionItem>> mvRelatedTablePartitionMap = new LinkedHashMap<>();
|
||||
invalidPartitionsQueryUsed.keySet().forEach(invalidPartition ->
|
||||
mvRelatedTablePartitionMap.put(invalidPartition.key().getRelatedTableInfo().getTableId(),
|
||||
new HashSet<>()));
|
||||
queryPlan.accept(new QueryScanPartitionsCollector(), mvRelatedTablePartitionMap);
|
||||
Set<PartitionKeyDesc> partitionKeyDescSetQueryUsed = mvRelatedTablePartitionMap.values().stream()
|
||||
.flatMap(Collection::stream)
|
||||
.map(PartitionItem::toPartitionKeyDesc)
|
||||
.collect(Collectors.toSet());
|
||||
Set<PartitionKeyDesc> mvInvalidPartitionKeyDescSet = new HashSet<>();
|
||||
for (Map.Entry<Pair<MTMVPartitionInfo, PartitionInfo>, Collection<Partition>> entry :
|
||||
invalidPartitionsQueryUsed.asMap().entrySet()) {
|
||||
entry.getValue().forEach(invalidPartition -> mvInvalidPartitionKeyDescSet.add(
|
||||
entry.getKey().value().getItem(invalidPartition.getId()).toPartitionKeyDesc()));
|
||||
}
|
||||
return !mvInvalidPartitionKeyDescSet.containsAll(partitionKeyDescSetQueryUsed);
|
||||
private boolean needUnionRewrite(
|
||||
Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>> invalidPartitions,
|
||||
CascadesContext cascadesContext) {
|
||||
return invalidPartitions != null
|
||||
&& (!invalidPartitions.key().isEmpty() || !invalidPartitions.value().isEmpty());
|
||||
}
|
||||
|
||||
// Normalize expression such as nullable property and output slot id
|
||||
@ -400,52 +370,87 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
|
||||
/**
|
||||
* Partition will be pruned in query then add the pruned partitions to select partitions field of
|
||||
* catalog relation.
|
||||
* Maybe only just some partitions is valid in materialized view, so we should check if the mv can
|
||||
* offer the partitions which query used or not.
|
||||
*
|
||||
* @return the invalid partition name set
|
||||
* Maybe only some partitions is invalid in materialized view, or base table maybe add new partition
|
||||
* So we should calc the invalid partition used in query
|
||||
* @return the key in pair is mvNeedRemovePartitionNameSet, the value in pair is baseTableNeedUnionPartitionNameSet
|
||||
*/
|
||||
protected Multimap<Pair<MTMVPartitionInfo, PartitionInfo>, Partition> calcUsedInvalidMvPartitions(
|
||||
Plan rewrittenPlan,
|
||||
MaterializationContext materializationContext,
|
||||
CascadesContext cascadesContext) {
|
||||
if (materializationContext instanceof AsyncMaterializationContext) {
|
||||
// check partition is valid or not
|
||||
MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv();
|
||||
PartitionInfo mvPartitionInfo = mtmv.getPartitionInfo();
|
||||
if (PartitionType.UNPARTITIONED.equals(mvPartitionInfo.getType())) {
|
||||
// if not partition, if rewrite success, it means mv is available
|
||||
return ImmutableMultimap.of();
|
||||
}
|
||||
// check mv related table partition is valid or not
|
||||
MTMVPartitionInfo mvCustomPartitionInfo = mtmv.getMvPartitionInfo();
|
||||
BaseTableInfo relatedPartitionTable = mvCustomPartitionInfo.getRelatedTableInfo();
|
||||
if (relatedPartitionTable == null) {
|
||||
return ImmutableMultimap.of();
|
||||
}
|
||||
// get mv valid partitions
|
||||
Set<Long> mvDataValidPartitionIdSet = MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv,
|
||||
cascadesContext.getConnectContext(), System.currentTimeMillis()).stream()
|
||||
.map(Partition::getId)
|
||||
.collect(Collectors.toSet());
|
||||
// get partitions query used
|
||||
Set<Long> mvPartitionSetQueryUsed = rewrittenPlan.collectToList(node -> node instanceof LogicalOlapScan
|
||||
&& Objects.equals(((CatalogRelation) node).getTable().getName(), mtmv.getName()))
|
||||
.stream()
|
||||
.map(node -> ((LogicalOlapScan) node).getSelectedPartitionIds())
|
||||
.flatMap(Collection::stream)
|
||||
.collect(Collectors.toSet());
|
||||
// get invalid partition ids
|
||||
Set<Long> invalidMvPartitionIdSet = new HashSet<>(mvPartitionSetQueryUsed);
|
||||
invalidMvPartitionIdSet.removeAll(mvDataValidPartitionIdSet);
|
||||
ImmutableMultimap.Builder<Pair<MTMVPartitionInfo, PartitionInfo>, Partition> invalidPartitionMapBuilder =
|
||||
ImmutableMultimap.builder();
|
||||
Pair<MTMVPartitionInfo, PartitionInfo> partitionInfo = Pair.of(mvCustomPartitionInfo, mvPartitionInfo);
|
||||
invalidMvPartitionIdSet.forEach(invalidPartitionId ->
|
||||
invalidPartitionMapBuilder.put(partitionInfo, mtmv.getPartition(invalidPartitionId)));
|
||||
return invalidPartitionMapBuilder.build();
|
||||
protected Pair<Map<BaseTableInfo, Set<String>>, Map<BaseTableInfo, Set<String>>> calcInvalidPartitions(
|
||||
Plan queryPlan, Plan rewrittenPlan,
|
||||
AsyncMaterializationContext materializationContext, CascadesContext cascadesContext)
|
||||
throws AnalysisException {
|
||||
Set<String> mvNeedRemovePartitionNameSet = new HashSet<>();
|
||||
Set<String> baseTableNeedUnionPartitionNameSet = new HashSet<>();
|
||||
// check partition is valid or not
|
||||
MTMV mtmv = materializationContext.getMtmv();
|
||||
PartitionInfo mvPartitionInfo = mtmv.getPartitionInfo();
|
||||
if (PartitionType.UNPARTITIONED.equals(mvPartitionInfo.getType())) {
|
||||
// if not partition, if rewrite success, it means mv is available
|
||||
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
|
||||
}
|
||||
return ImmutableMultimap.of();
|
||||
MTMVPartitionInfo mvCustomPartitionInfo = mtmv.getMvPartitionInfo();
|
||||
BaseTableInfo relatedPartitionTable = mvCustomPartitionInfo.getRelatedTableInfo();
|
||||
if (relatedPartitionTable == null) {
|
||||
return Pair.of(ImmutableMap.of(), ImmutableMap.of());
|
||||
}
|
||||
// Collect the mv related base table partitions which query used
|
||||
Map<BaseTableInfo, Set<Partition>> queryUsedBaseTablePartitions = new LinkedHashMap<>();
|
||||
queryUsedBaseTablePartitions.put(relatedPartitionTable, new HashSet<>());
|
||||
queryPlan.accept(new StructInfo.QueryScanPartitionsCollector(), queryUsedBaseTablePartitions);
|
||||
Set<String> queryUsedBaseTablePartitionNameSet = queryUsedBaseTablePartitions.get(relatedPartitionTable)
|
||||
.stream()
|
||||
.map(Partition::getName)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
Collection<Partition> mvValidPartitions = MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv,
|
||||
cascadesContext.getConnectContext(), System.currentTimeMillis());
|
||||
Set<String> mvValidPartitionNameSet = new HashSet<>();
|
||||
Set<String> mvValidBaseTablePartitionNameSet = new HashSet<>();
|
||||
Set<String> mvValidHasDataRelatedBaseTableNameSet = new HashSet<>();
|
||||
Pair<Map<String, Set<String>>, Map<String, String>> partitionMapping = mtmv.calculateDoublyPartitionMappings();
|
||||
for (Partition mvValidPartition : mvValidPartitions) {
|
||||
mvValidPartitionNameSet.add(mvValidPartition.getName());
|
||||
Set<String> relatedBaseTablePartitions = partitionMapping.key().get(mvValidPartition.getName());
|
||||
if (relatedBaseTablePartitions != null) {
|
||||
mvValidBaseTablePartitionNameSet.addAll(relatedBaseTablePartitions);
|
||||
}
|
||||
if (!mtmv.selectNonEmptyPartitionIds(ImmutableList.of(mvValidPartition.getId())).isEmpty()) {
|
||||
if (relatedBaseTablePartitions != null) {
|
||||
mvValidHasDataRelatedBaseTableNameSet.addAll(relatedBaseTablePartitions);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (Sets.intersection(mvValidHasDataRelatedBaseTableNameSet, queryUsedBaseTablePartitionNameSet).isEmpty()) {
|
||||
// if mv can not offer any partition for query, query rewrite bail out
|
||||
return null;
|
||||
}
|
||||
// Check when mv partition relates base table partition data change or delete partition
|
||||
Set<String> rewrittenPlanUsePartitionNameSet = new HashSet<>();
|
||||
List<Object> mvOlapScanList = rewrittenPlan.collectToList(node ->
|
||||
node instanceof LogicalOlapScan
|
||||
&& Objects.equals(((CatalogRelation) node).getTable().getName(), mtmv.getName()));
|
||||
for (Object olapScanObj : mvOlapScanList) {
|
||||
LogicalOlapScan olapScan = (LogicalOlapScan) olapScanObj;
|
||||
olapScan.getSelectedPartitionIds().forEach(id ->
|
||||
rewrittenPlanUsePartitionNameSet.add(olapScan.getTable().getPartition(id).getName()));
|
||||
}
|
||||
// If rewritten plan use but not in mv valid partition name set, need remove in mv and base table union
|
||||
Sets.difference(rewrittenPlanUsePartitionNameSet, mvValidPartitionNameSet)
|
||||
.copyInto(mvNeedRemovePartitionNameSet);
|
||||
for (String partitionName : mvNeedRemovePartitionNameSet) {
|
||||
baseTableNeedUnionPartitionNameSet.addAll(partitionMapping.key().get(partitionName));
|
||||
}
|
||||
// If related base table create partitions or mv is created with ttl, need base table union
|
||||
Sets.difference(queryUsedBaseTablePartitionNameSet, mvValidBaseTablePartitionNameSet)
|
||||
.copyInto(baseTableNeedUnionPartitionNameSet);
|
||||
Map<BaseTableInfo, Set<String>> mvPartitionNeedRemoveNameMap = new HashMap<>();
|
||||
if (!mvNeedRemovePartitionNameSet.isEmpty()) {
|
||||
mvPartitionNeedRemoveNameMap.put(new BaseTableInfo(mtmv), mvNeedRemovePartitionNameSet);
|
||||
}
|
||||
Map<BaseTableInfo, Set<String>> baseTablePartitionNeedUnionNameMap = new HashMap<>();
|
||||
if (!baseTableNeedUnionPartitionNameSet.isEmpty()) {
|
||||
baseTablePartitionNeedUnionNameMap.put(relatedPartitionTable, baseTableNeedUnionPartitionNameSet);
|
||||
}
|
||||
return Pair.of(mvPartitionNeedRemoveNameMap, baseTablePartitionNeedUnionNameMap);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -17,15 +17,8 @@
|
||||
|
||||
package org.apache.doris.nereids.rules.exploration.mv;
|
||||
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.mtmv.MTMVPartitionInfo;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.mapping.EquivalenceClassSetMapping;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.mapping.Mapping.MappedSlot;
|
||||
import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping;
|
||||
import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
|
||||
import org.apache.doris.nereids.rules.expression.ExpressionOptimization;
|
||||
@ -34,17 +27,13 @@ import org.apache.doris.nereids.trees.expressions.EqualTo;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
|
||||
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
|
||||
import org.apache.doris.nereids.util.ExpressionUtils;
|
||||
import org.apache.doris.nereids.util.Utils;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@ -231,47 +220,6 @@ public class Predicates {
|
||||
return Utils.toSqlString("Predicates", "pulledUpPredicates", pulledUpPredicates);
|
||||
}
|
||||
|
||||
/** Construct filter by partition
|
||||
* @param partitions this is the partition which filter should be constructed from
|
||||
* @param queryToViewSlotMapping construct filter on slot, the slot belong the slotmapping
|
||||
* */
|
||||
public static Map<TableIf, Set<Expression>> constructFilterByPartitions(
|
||||
Multimap<Pair<MTMVPartitionInfo, PartitionInfo>, Partition> partitions,
|
||||
SlotMapping queryToViewSlotMapping) throws AnalysisException {
|
||||
Map<TableIf, Set<Expression>> constructedFilterMap = new HashMap<>();
|
||||
for (Map.Entry<Pair<MTMVPartitionInfo, PartitionInfo>, Collection<Partition>> entry :
|
||||
partitions.asMap().entrySet()) {
|
||||
// Get the base table partition column mv related
|
||||
String relatedCol = entry.getKey().key().getRelatedCol();
|
||||
TableIf relatedTableInfo = entry.getKey().key().getRelatedTable();
|
||||
// Find the query slot which mv partition col mapped to
|
||||
Optional<MappedSlot> partitionSlotQueryUsed = queryToViewSlotMapping.getRelationSlotMap()
|
||||
.keySet()
|
||||
.stream()
|
||||
.filter(mappedSlot -> mappedSlot.getSlot().isColumnFromTable()
|
||||
&& mappedSlot.getSlot().getName().equals(relatedCol)
|
||||
&& mappedSlot.getBelongedRelation() != null
|
||||
&& mappedSlot.getBelongedRelation().getTable().getId() == relatedTableInfo.getId())
|
||||
.findFirst();
|
||||
if (!partitionSlotQueryUsed.isPresent()) {
|
||||
return ImmutableMap.of();
|
||||
}
|
||||
// Constructed filter which should add on the query base table,
|
||||
// after supported data roll up this method should keep logic consistency to partition mapping
|
||||
Set<Expression> partitionExpressions = UpdateMvByPartitionCommand.constructPredicates(
|
||||
// get mv partition items
|
||||
entry.getValue().stream()
|
||||
.map(partition -> entry.getKey().value().getItem(partition.getId()))
|
||||
.collect(Collectors.toSet()),
|
||||
partitionSlotQueryUsed.get().getSlot());
|
||||
// Put partition expressions on query base table
|
||||
constructedFilterMap.computeIfPresent(relatedTableInfo,
|
||||
(key, existExpressions) -> Sets.union(existExpressions, partitionExpressions));
|
||||
constructedFilterMap.computeIfAbsent(relatedTableInfo, key -> partitionExpressions);
|
||||
}
|
||||
return constructedFilterMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* The split different representation for predicate expression, such as equal, range and residual predicate.
|
||||
*/
|
||||
|
||||
@ -17,10 +17,10 @@
|
||||
|
||||
package org.apache.doris.nereids.rules.exploration.mv;
|
||||
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.mtmv.BaseTableInfo;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.jobs.executor.Rewriter;
|
||||
import org.apache.doris.nereids.jobs.joinorder.hypergraph.HyperGraph;
|
||||
@ -45,6 +45,7 @@ import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.Filter;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.Join;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.Project;
|
||||
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.PredicateAddContext;
|
||||
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.PredicateAdder;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
|
||||
@ -65,6 +66,7 @@ import com.google.common.collect.Sets;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.BitSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
@ -668,43 +670,49 @@ public class StructInfo {
|
||||
}
|
||||
|
||||
/**
|
||||
* Add predicates on base table when materialized view scan contains invalid partitions
|
||||
* Add or remove partition on base table and mv when materialized view scan contains invalid partitions
|
||||
*/
|
||||
public static class InvalidPartitionRemover extends DefaultPlanRewriter<Pair<List<String>, Set<Long>>> {
|
||||
// materialized view scan is always LogicalOlapScan, so just handle LogicalOlapScan
|
||||
public static class PartitionRemover extends DefaultPlanRewriter<Map<BaseTableInfo, Set<String>>> {
|
||||
@Override
|
||||
public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, Pair<List<String>, Set<Long>> context) {
|
||||
if (olapScan.getTable().getFullQualifiers().equals(context.key())) {
|
||||
List<Long> selectedPartitionIds = olapScan.getSelectedPartitionIds();
|
||||
return olapScan.withSelectedPartitionIds(selectedPartitionIds.stream()
|
||||
.filter(partitionId -> !context.value().contains(partitionId))
|
||||
.collect(Collectors.toList()));
|
||||
public Plan visitLogicalOlapScan(LogicalOlapScan olapScan,
|
||||
Map<BaseTableInfo, Set<String>> context) {
|
||||
// todo Support other partition table
|
||||
BaseTableInfo tableInfo = new BaseTableInfo(olapScan.getTable());
|
||||
if (!context.containsKey(tableInfo)) {
|
||||
return olapScan;
|
||||
}
|
||||
return olapScan;
|
||||
Set<String> targetPartitionNameSet = context.get(tableInfo);
|
||||
List<Long> selectedPartitionIds = new ArrayList<>(olapScan.getSelectedPartitionIds());
|
||||
// need remove partition
|
||||
selectedPartitionIds = selectedPartitionIds.stream()
|
||||
.filter(partitionId -> !targetPartitionNameSet.contains(
|
||||
olapScan.getTable().getPartition(partitionId).getName()))
|
||||
.collect(Collectors.toList());
|
||||
return olapScan.withSelectedPartitionIds(selectedPartitionIds);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect partitions which scan used according to given table
|
||||
* Collect partitions on base table
|
||||
*/
|
||||
public static class QueryScanPartitionsCollector extends DefaultPlanVisitor<Plan, Map<Long, Set<PartitionItem>>> {
|
||||
public static class QueryScanPartitionsCollector extends DefaultPlanVisitor<Plan,
|
||||
Map<BaseTableInfo, Set<Partition>>> {
|
||||
@Override
|
||||
public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation,
|
||||
Map<Long, Set<PartitionItem>> context) {
|
||||
Map<BaseTableInfo, Set<Partition>> targetTablePartitionMap) {
|
||||
TableIf table = catalogRelation.getTable();
|
||||
if (!context.containsKey(table.getId())) {
|
||||
BaseTableInfo relatedPartitionTable = new BaseTableInfo(table);
|
||||
if (!targetTablePartitionMap.containsKey(relatedPartitionTable)) {
|
||||
return catalogRelation;
|
||||
}
|
||||
// Only support check olap partition currently
|
||||
// todo Support other type partition table
|
||||
if (catalogRelation instanceof LogicalOlapScan) {
|
||||
LogicalOlapScan logicalOlapScan = (LogicalOlapScan) catalogRelation;
|
||||
PartitionInfo partitionInfo = logicalOlapScan.getTable().getPartitionInfo();
|
||||
logicalOlapScan.getSelectedPartitionIds().stream()
|
||||
.map(partitionInfo::getItem)
|
||||
.forEach(partitionItem -> context.computeIfPresent(table.getId(), (key, oldValue) -> {
|
||||
oldValue.add(partitionItem);
|
||||
return oldValue;
|
||||
}));
|
||||
for (Long partitionId : logicalOlapScan.getSelectedPartitionIds()) {
|
||||
Set<Partition> partitions = targetTablePartitionMap.computeIfAbsent(relatedPartitionTable,
|
||||
key -> new HashSet<>());
|
||||
partitions.add(logicalOlapScan.getTable().getPartition(partitionId));
|
||||
}
|
||||
}
|
||||
return catalogRelation;
|
||||
}
|
||||
@ -713,10 +721,16 @@ public class StructInfo {
|
||||
/**
|
||||
* Add filter on table scan according to table filter map
|
||||
*/
|
||||
public static Plan addFilterOnTableScan(Plan queryPlan, Map<TableIf, Set<Expression>> filterOnOriginPlan,
|
||||
public static Plan addFilterOnTableScan(Plan queryPlan, Map<BaseTableInfo, Set<String>> partitionOnOriginPlan,
|
||||
String partitionColumn,
|
||||
CascadesContext parentCascadesContext) {
|
||||
// Firstly, construct filter form invalid partition, this filter should be added on origin plan
|
||||
Plan queryPlanWithUnionFilter = queryPlan.accept(new PredicateAdder(), filterOnOriginPlan);
|
||||
PredicateAddContext predicateAddContext = new PredicateAddContext(partitionOnOriginPlan, partitionColumn);
|
||||
Plan queryPlanWithUnionFilter = queryPlan.accept(new PredicateAdder(),
|
||||
predicateAddContext);
|
||||
if (!predicateAddContext.isAddSuccess()) {
|
||||
return null;
|
||||
}
|
||||
// Deep copy the plan to avoid the plan output is the same with the later union output, this may cause
|
||||
// exec by mistake
|
||||
queryPlanWithUnionFilter = new LogicalPlanDeepCopier().deepCopy(
|
||||
|
||||
@ -20,12 +20,15 @@ package org.apache.doris.nereids.trees.plans.commands;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.ListPartitionItem;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.PartitionKey;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.mtmv.BaseTableInfo;
|
||||
import org.apache.doris.nereids.analyzer.UnboundRelation;
|
||||
import org.apache.doris.nereids.analyzer.UnboundSlot;
|
||||
import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator;
|
||||
@ -36,6 +39,7 @@ import org.apache.doris.nereids.trees.expressions.InPredicate;
|
||||
import org.apache.doris.nereids.trees.expressions.IsNull;
|
||||
import org.apache.doris.nereids.trees.expressions.LessThan;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.Literal;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
|
||||
@ -45,6 +49,7 @@ import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTable
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalCTE;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias;
|
||||
@ -65,6 +70,7 @@ import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
@ -92,7 +98,7 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
|
||||
constructTableWithPredicates(mv, partitionNames, tableWithPartKey);
|
||||
List<String> parts = constructPartsForMv(partitionNames);
|
||||
Plan plan = parser.parseSingle(mv.getQuerySql());
|
||||
plan = plan.accept(new PredicateAdder(), predicates);
|
||||
plan = plan.accept(new PredicateAdder(), new PredicateAddContext(predicates));
|
||||
if (plan instanceof Sink) {
|
||||
plan = plan.child(0);
|
||||
}
|
||||
@ -203,14 +209,15 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
|
||||
/**
|
||||
* Add predicates on base table when mv can partition update, Also support plan that contain cte and view
|
||||
*/
|
||||
public static class PredicateAdder extends DefaultPlanRewriter<Map<TableIf, Set<Expression>>> {
|
||||
public static class PredicateAdder extends DefaultPlanRewriter<PredicateAddContext> {
|
||||
|
||||
// record view and cte name parts, these should be ignored and visit it's actual plan
|
||||
public Set<List<String>> virtualRelationNamePartSet = new HashSet<>();
|
||||
|
||||
@Override
|
||||
public Plan visitUnboundRelation(UnboundRelation unboundRelation, Map<TableIf, Set<Expression>> predicates) {
|
||||
if (predicates.isEmpty()) {
|
||||
public Plan visitUnboundRelation(UnboundRelation unboundRelation, PredicateAddContext predicates) {
|
||||
|
||||
if (predicates.getPredicates() == null || predicates.getPredicates().isEmpty()) {
|
||||
return unboundRelation;
|
||||
}
|
||||
if (virtualRelationNamePartSet.contains(unboundRelation.getNameParts())) {
|
||||
@ -219,15 +226,16 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
|
||||
List<String> tableQualifier = RelationUtil.getQualifierName(ConnectContext.get(),
|
||||
unboundRelation.getNameParts());
|
||||
TableIf table = RelationUtil.getTable(tableQualifier, Env.getCurrentEnv());
|
||||
if (predicates.containsKey(table)) {
|
||||
return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.get(table))),
|
||||
if (predicates.getPredicates().containsKey(table)) {
|
||||
predicates.setAddSuccess(true);
|
||||
return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.getPredicates().get(table))),
|
||||
unboundRelation);
|
||||
}
|
||||
return unboundRelation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Plan visitLogicalCTE(LogicalCTE<? extends Plan> cte, Map<TableIf, Set<Expression>> predicates) {
|
||||
public Plan visitLogicalCTE(LogicalCTE<? extends Plan> cte, PredicateAddContext predicates) {
|
||||
if (predicates.isEmpty()) {
|
||||
return cte;
|
||||
}
|
||||
@ -240,7 +248,7 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
|
||||
|
||||
@Override
|
||||
public Plan visitLogicalSubQueryAlias(LogicalSubQueryAlias<? extends Plan> subQueryAlias,
|
||||
Map<TableIf, Set<Expression>> predicates) {
|
||||
PredicateAddContext predicates) {
|
||||
if (predicates.isEmpty()) {
|
||||
return subQueryAlias;
|
||||
}
|
||||
@ -250,16 +258,110 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
|
||||
|
||||
@Override
|
||||
public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation,
|
||||
Map<TableIf, Set<Expression>> predicates) {
|
||||
PredicateAddContext predicates) {
|
||||
if (predicates.isEmpty()) {
|
||||
return catalogRelation;
|
||||
}
|
||||
TableIf table = catalogRelation.getTable();
|
||||
if (predicates.containsKey(table)) {
|
||||
return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.get(table))),
|
||||
catalogRelation);
|
||||
if (predicates.getPredicates() != null) {
|
||||
TableIf table = catalogRelation.getTable();
|
||||
if (predicates.getPredicates().containsKey(table)) {
|
||||
predicates.setAddSuccess(true);
|
||||
return new LogicalFilter<>(
|
||||
ImmutableSet.of(ExpressionUtils.or(predicates.getPredicates().get(table))),
|
||||
catalogRelation);
|
||||
}
|
||||
}
|
||||
if (predicates.getPartition() != null && predicates.getPartitionName() != null) {
|
||||
if (!(catalogRelation instanceof LogicalOlapScan)) {
|
||||
return catalogRelation;
|
||||
}
|
||||
for (Map.Entry<BaseTableInfo, Set<String>> filterTableEntry : predicates.getPartition().entrySet()) {
|
||||
LogicalOlapScan olapScan = (LogicalOlapScan) catalogRelation;
|
||||
OlapTable targetTable = olapScan.getTable();
|
||||
if (!Objects.equals(new BaseTableInfo(targetTable), filterTableEntry.getKey())) {
|
||||
continue;
|
||||
}
|
||||
Slot partitionSlot = null;
|
||||
for (Slot slot : olapScan.getOutput()) {
|
||||
if (((SlotReference) slot).getName().equals(predicates.getPartitionName())) {
|
||||
partitionSlot = slot;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (partitionSlot == null) {
|
||||
return catalogRelation;
|
||||
}
|
||||
// if partition has no data, doesn't add filter
|
||||
Set<PartitionItem> partitionHasDataItems = new HashSet<>();
|
||||
for (String partitionName : filterTableEntry.getValue()) {
|
||||
Partition partition = targetTable.getPartition(partitionName);
|
||||
if (!targetTable.selectNonEmptyPartitionIds(Lists.newArrayList(partition.getId())).isEmpty()) {
|
||||
// Add filter only when partition has filter
|
||||
partitionHasDataItems.add(targetTable.getPartitionInfo().getItem(partition.getId()));
|
||||
}
|
||||
}
|
||||
if (!partitionHasDataItems.isEmpty()) {
|
||||
Set<Expression> partitionExpressions =
|
||||
constructPredicates(partitionHasDataItems, partitionSlot);
|
||||
predicates.setAddSuccess(true);
|
||||
return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(partitionExpressions)),
|
||||
catalogRelation);
|
||||
}
|
||||
}
|
||||
}
|
||||
return catalogRelation;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Predicate context, which support add predicate by expression or by partition name
|
||||
* Add by predicates has high priority
|
||||
*/
|
||||
public static class PredicateAddContext {
|
||||
|
||||
private final Map<TableIf, Set<Expression>> predicates;
|
||||
private final Map<BaseTableInfo, Set<String>> partition;
|
||||
private final String partitionName;
|
||||
private boolean addSuccess = false;
|
||||
|
||||
public PredicateAddContext(Map<TableIf, Set<Expression>> predicates) {
|
||||
this(predicates, null, null);
|
||||
}
|
||||
|
||||
public PredicateAddContext(Map<BaseTableInfo, Set<String>> partition,
|
||||
String partitionName) {
|
||||
this(null, partition, partitionName);
|
||||
}
|
||||
|
||||
public PredicateAddContext(Map<TableIf, Set<Expression>> predicates, Map<BaseTableInfo, Set<String>> partition,
|
||||
String partitionName) {
|
||||
this.predicates = predicates;
|
||||
this.partition = partition;
|
||||
this.partitionName = partitionName;
|
||||
}
|
||||
|
||||
public Map<TableIf, Set<Expression>> getPredicates() {
|
||||
return predicates;
|
||||
}
|
||||
|
||||
public Map<BaseTableInfo, Set<String>> getPartition() {
|
||||
return partition;
|
||||
}
|
||||
|
||||
public String getPartitionName() {
|
||||
return partitionName;
|
||||
}
|
||||
|
||||
public boolean isEmpty() {
|
||||
return predicates == null && partition == null;
|
||||
}
|
||||
|
||||
public boolean isAddSuccess() {
|
||||
return addSuccess;
|
||||
}
|
||||
|
||||
public void setAddSuccess(boolean addSuccess) {
|
||||
this.addSuccess = addSuccess;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1680,7 +1680,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
"When the materialized view is not enough to provide all the data for the query, "
|
||||
+ "whether to allow the union of the base table and the materialized view to "
|
||||
+ "respond to the query"})
|
||||
public boolean enableMaterializedViewUnionRewrite = false;
|
||||
public boolean enableMaterializedViewUnionRewrite = true;
|
||||
|
||||
@VariableMgr.VarAttr(name = ENABLE_MATERIALIZED_VIEW_NEST_REWRITE, needForward = true,
|
||||
description = {"是否允许嵌套物化视图改写",
|
||||
|
||||
Reference in New Issue
Block a user