diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java index ea057e8bfc..5b54554eac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/AbstractMaterializedViewRule.java @@ -17,13 +17,12 @@ package org.apache.doris.nereids.rules.exploration.mv; -import org.apache.doris.analysis.PartitionKeyDesc; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionInfo; -import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Id; import org.apache.doris.common.Pair; import org.apache.doris.mtmv.BaseTableInfo; @@ -34,8 +33,7 @@ import org.apache.doris.nereids.jobs.executor.Rewriter; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.rules.exploration.ExplorationRuleFactory; import org.apache.doris.nereids.rules.exploration.mv.Predicates.SplitPredicate; -import org.apache.doris.nereids.rules.exploration.mv.StructInfo.InvalidPartitionRemover; -import org.apache.doris.nereids.rules.exploration.mv.StructInfo.QueryScanPartitionsCollector; +import org.apache.doris.nereids.rules.exploration.mv.StructInfo.PartitionRemover; import org.apache.doris.nereids.rules.exploration.mv.mapping.ExpressionMapping; import org.apache.doris.nereids.rules.exploration.mv.mapping.RelationMapping; import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping; @@ -60,13 +58,13 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.TypeUtils; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.statistics.Statistics; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -74,6 +72,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.BitSet; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; @@ -185,8 +184,8 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac "Query to view table mapping is null", () -> ""); return rewriteResults; } - int materializedViewRelationMappingMaxCount = cascadesContext.getConnectContext().getSessionVariable() - .getMaterializedViewRelationMappingMaxCount(); + SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable(); + int materializedViewRelationMappingMaxCount = sessionVariable.getMaterializedViewRelationMappingMaxCount(); if (queryToViewTableMappings.size() > materializedViewRelationMappingMaxCount) { LOG.warn("queryToViewTableMappings is over limit and be intercepted"); queryToViewTableMappings = queryToViewTableMappings.subList(0, materializedViewRelationMappingMaxCount); @@ -260,71 +259,60 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac if (rewrittenPlan == null) { continue; } - // check the partitions used by rewritten plan is valid or not - Multimap, Partition> invalidPartitionsQueryUsed = - calcUsedInvalidMvPartitions(rewrittenPlan, materializationContext, cascadesContext); - // All partition used by query is valid - if (!invalidPartitionsQueryUsed.isEmpty() && !cascadesContext.getConnectContext().getSessionVariable() - .isEnableMaterializedViewUnionRewrite()) { - materializationContext.recordFailReason(queryStructInfo, - "Check partition query used validation fail", - () -> String.format("the partition used by query is invalid by materialized view," - + "invalid partition info query used is %s", - invalidPartitionsQueryUsed.values().stream() - .map(Partition::getName) - .collect(Collectors.toSet()))); - continue; - } - boolean partitionValid = invalidPartitionsQueryUsed.isEmpty(); - if (checkCanUnionRewrite(invalidPartitionsQueryUsed, queryPlan, cascadesContext)) { - // construct filter on originalPlan - Map> filterOnOriginPlan; + Pair>, Map>> invalidPartitions; + if (materializationContext instanceof AsyncMaterializationContext) { try { - filterOnOriginPlan = Predicates.constructFilterByPartitions(invalidPartitionsQueryUsed, - queryToViewSlotMapping); - if (filterOnOriginPlan.isEmpty()) { - materializationContext.recordFailReason(queryStructInfo, - "construct invalid partition filter on query fail", - () -> String.format("the invalid partitions used by query is %s, query plan is %s", - invalidPartitionsQueryUsed.values().stream().map(Partition::getName) - .collect(Collectors.toSet()), - queryStructInfo.getOriginalPlan().treeString())); - continue; - } - } catch (org.apache.doris.common.AnalysisException e) { + invalidPartitions = calcInvalidPartitions(queryPlan, rewrittenPlan, + (AsyncMaterializationContext) materializationContext, cascadesContext); + } catch (AnalysisException e) { materializationContext.recordFailReason(queryStructInfo, - "construct invalid partition filter on query analysis fail", - () -> String.format("the invalid partitions used by query is %s, query plan is %s", - invalidPartitionsQueryUsed.values().stream().map(Partition::getName) - .collect(Collectors.toSet()), - queryStructInfo.getOriginalPlan().treeString())); + "Calc invalid partitions fail", + () -> String.format("Calc invalid partitions fail, mv partition names are %s", + ((AsyncMaterializationContext) materializationContext).getMtmv().getPartitions())); + LOG.warn("Calc invalid partitions fail", e); continue; } - // For rewrittenPlan which contains materialized view should remove invalid partition ids - List children = Lists.newArrayList( - rewrittenPlan.accept(new InvalidPartitionRemover(), Pair.of( - materializationContext.getMaterializationQualifier(), - invalidPartitionsQueryUsed.values().stream().map(Partition::getId) - .collect(Collectors.toSet()))), - StructInfo.addFilterOnTableScan(queryPlan, filterOnOriginPlan, cascadesContext)); - // Union query materialized view and source table - rewrittenPlan = new LogicalUnion(Qualifier.ALL, - queryPlan.getOutput().stream().map(NamedExpression.class::cast).collect(Collectors.toList()), - children.stream() - .map(plan -> plan.getOutput().stream() - .map(slot -> (SlotReference) slot.toSlot()).collect(Collectors.toList())) - .collect(Collectors.toList()), - ImmutableList.of(), - false, - children); - partitionValid = true; - } - if (!partitionValid) { - materializationContext.recordFailReason(queryStructInfo, - "materialized view partition is invalid union fail", - () -> String.format("invalidPartitionsQueryUsed = %s,\n query plan = %s", - invalidPartitionsQueryUsed, queryPlan.treeString())); - continue; + if (invalidPartitions == null) { + // if mv can not offer any partition for query, query rewrite bail out to avoid cycle run + return rewriteResults; + } + boolean partitionNeedUnion = needUnionRewrite(invalidPartitions, cascadesContext); + final Pair>, Map>> finalInvalidPartitions = + invalidPartitions; + if (partitionNeedUnion && !sessionVariable.isEnableMaterializedViewUnionRewrite()) { + // if use invalid partition but not enable union rewrite + materializationContext.recordFailReason(queryStructInfo, + "Partition query used is invalid", + () -> String.format("the partition used by query is invalid by materialized view," + + "invalid partition info query used is %s", finalInvalidPartitions)); + continue; + } + if (partitionNeedUnion) { + MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv(); + Plan originPlanWithFilter = StructInfo.addFilterOnTableScan(queryPlan, invalidPartitions.value(), + mtmv.getMvPartitionInfo().getPartitionCol(), cascadesContext); + if (finalInvalidPartitions.value().isEmpty() || originPlanWithFilter == null) { + // only need remove mv invalid partition + rewrittenPlan = rewrittenPlan.accept(new PartitionRemover(), invalidPartitions.key()); + } else { + // For rewrittenPlan which contains materialized view should remove invalid partition ids + List children = Lists.newArrayList( + rewrittenPlan.accept(new PartitionRemover(), invalidPartitions.key()), + originPlanWithFilter); + // Union query materialized view and source table + rewrittenPlan = new LogicalUnion(Qualifier.ALL, + queryPlan.getOutput().stream().map(NamedExpression.class::cast) + .collect(Collectors.toList()), + children.stream() + .map(plan -> plan.getOutput().stream() + .map(slot -> (SlotReference) slot.toSlot()) + .collect(Collectors.toList())) + .collect(Collectors.toList()), + ImmutableList.of(), + false, + children); + } + } } rewrittenPlan = normalizeExpressions(rewrittenPlan, queryPlan); if (!isOutputValid(queryPlan, rewrittenPlan)) { @@ -350,29 +338,11 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac return rewriteResults; } - private boolean checkCanUnionRewrite(Multimap, Partition> - invalidPartitionsQueryUsed, Plan queryPlan, CascadesContext cascadesContext) { - if (invalidPartitionsQueryUsed.isEmpty() - || !cascadesContext.getConnectContext().getSessionVariable().isEnableMaterializedViewUnionRewrite()) { - return false; - } - // if mv can not offer valid partition data for query, bail out union rewrite - Map> mvRelatedTablePartitionMap = new LinkedHashMap<>(); - invalidPartitionsQueryUsed.keySet().forEach(invalidPartition -> - mvRelatedTablePartitionMap.put(invalidPartition.key().getRelatedTableInfo().getTableId(), - new HashSet<>())); - queryPlan.accept(new QueryScanPartitionsCollector(), mvRelatedTablePartitionMap); - Set partitionKeyDescSetQueryUsed = mvRelatedTablePartitionMap.values().stream() - .flatMap(Collection::stream) - .map(PartitionItem::toPartitionKeyDesc) - .collect(Collectors.toSet()); - Set mvInvalidPartitionKeyDescSet = new HashSet<>(); - for (Map.Entry, Collection> entry : - invalidPartitionsQueryUsed.asMap().entrySet()) { - entry.getValue().forEach(invalidPartition -> mvInvalidPartitionKeyDescSet.add( - entry.getKey().value().getItem(invalidPartition.getId()).toPartitionKeyDesc())); - } - return !mvInvalidPartitionKeyDescSet.containsAll(partitionKeyDescSetQueryUsed); + private boolean needUnionRewrite( + Pair>, Map>> invalidPartitions, + CascadesContext cascadesContext) { + return invalidPartitions != null + && (!invalidPartitions.key().isEmpty() || !invalidPartitions.value().isEmpty()); } // Normalize expression such as nullable property and output slot id @@ -400,52 +370,87 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac /** * Partition will be pruned in query then add the pruned partitions to select partitions field of * catalog relation. - * Maybe only just some partitions is valid in materialized view, so we should check if the mv can - * offer the partitions which query used or not. - * - * @return the invalid partition name set + * Maybe only some partitions is invalid in materialized view, or base table maybe add new partition + * So we should calc the invalid partition used in query + * @return the key in pair is mvNeedRemovePartitionNameSet, the value in pair is baseTableNeedUnionPartitionNameSet */ - protected Multimap, Partition> calcUsedInvalidMvPartitions( - Plan rewrittenPlan, - MaterializationContext materializationContext, - CascadesContext cascadesContext) { - if (materializationContext instanceof AsyncMaterializationContext) { - // check partition is valid or not - MTMV mtmv = ((AsyncMaterializationContext) materializationContext).getMtmv(); - PartitionInfo mvPartitionInfo = mtmv.getPartitionInfo(); - if (PartitionType.UNPARTITIONED.equals(mvPartitionInfo.getType())) { - // if not partition, if rewrite success, it means mv is available - return ImmutableMultimap.of(); - } - // check mv related table partition is valid or not - MTMVPartitionInfo mvCustomPartitionInfo = mtmv.getMvPartitionInfo(); - BaseTableInfo relatedPartitionTable = mvCustomPartitionInfo.getRelatedTableInfo(); - if (relatedPartitionTable == null) { - return ImmutableMultimap.of(); - } - // get mv valid partitions - Set mvDataValidPartitionIdSet = MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, - cascadesContext.getConnectContext(), System.currentTimeMillis()).stream() - .map(Partition::getId) - .collect(Collectors.toSet()); - // get partitions query used - Set mvPartitionSetQueryUsed = rewrittenPlan.collectToList(node -> node instanceof LogicalOlapScan - && Objects.equals(((CatalogRelation) node).getTable().getName(), mtmv.getName())) - .stream() - .map(node -> ((LogicalOlapScan) node).getSelectedPartitionIds()) - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - // get invalid partition ids - Set invalidMvPartitionIdSet = new HashSet<>(mvPartitionSetQueryUsed); - invalidMvPartitionIdSet.removeAll(mvDataValidPartitionIdSet); - ImmutableMultimap.Builder, Partition> invalidPartitionMapBuilder = - ImmutableMultimap.builder(); - Pair partitionInfo = Pair.of(mvCustomPartitionInfo, mvPartitionInfo); - invalidMvPartitionIdSet.forEach(invalidPartitionId -> - invalidPartitionMapBuilder.put(partitionInfo, mtmv.getPartition(invalidPartitionId))); - return invalidPartitionMapBuilder.build(); + protected Pair>, Map>> calcInvalidPartitions( + Plan queryPlan, Plan rewrittenPlan, + AsyncMaterializationContext materializationContext, CascadesContext cascadesContext) + throws AnalysisException { + Set mvNeedRemovePartitionNameSet = new HashSet<>(); + Set baseTableNeedUnionPartitionNameSet = new HashSet<>(); + // check partition is valid or not + MTMV mtmv = materializationContext.getMtmv(); + PartitionInfo mvPartitionInfo = mtmv.getPartitionInfo(); + if (PartitionType.UNPARTITIONED.equals(mvPartitionInfo.getType())) { + // if not partition, if rewrite success, it means mv is available + return Pair.of(ImmutableMap.of(), ImmutableMap.of()); } - return ImmutableMultimap.of(); + MTMVPartitionInfo mvCustomPartitionInfo = mtmv.getMvPartitionInfo(); + BaseTableInfo relatedPartitionTable = mvCustomPartitionInfo.getRelatedTableInfo(); + if (relatedPartitionTable == null) { + return Pair.of(ImmutableMap.of(), ImmutableMap.of()); + } + // Collect the mv related base table partitions which query used + Map> queryUsedBaseTablePartitions = new LinkedHashMap<>(); + queryUsedBaseTablePartitions.put(relatedPartitionTable, new HashSet<>()); + queryPlan.accept(new StructInfo.QueryScanPartitionsCollector(), queryUsedBaseTablePartitions); + Set queryUsedBaseTablePartitionNameSet = queryUsedBaseTablePartitions.get(relatedPartitionTable) + .stream() + .map(Partition::getName) + .collect(Collectors.toSet()); + + Collection mvValidPartitions = MTMVRewriteUtil.getMTMVCanRewritePartitions(mtmv, + cascadesContext.getConnectContext(), System.currentTimeMillis()); + Set mvValidPartitionNameSet = new HashSet<>(); + Set mvValidBaseTablePartitionNameSet = new HashSet<>(); + Set mvValidHasDataRelatedBaseTableNameSet = new HashSet<>(); + Pair>, Map> partitionMapping = mtmv.calculateDoublyPartitionMappings(); + for (Partition mvValidPartition : mvValidPartitions) { + mvValidPartitionNameSet.add(mvValidPartition.getName()); + Set relatedBaseTablePartitions = partitionMapping.key().get(mvValidPartition.getName()); + if (relatedBaseTablePartitions != null) { + mvValidBaseTablePartitionNameSet.addAll(relatedBaseTablePartitions); + } + if (!mtmv.selectNonEmptyPartitionIds(ImmutableList.of(mvValidPartition.getId())).isEmpty()) { + if (relatedBaseTablePartitions != null) { + mvValidHasDataRelatedBaseTableNameSet.addAll(relatedBaseTablePartitions); + } + } + } + if (Sets.intersection(mvValidHasDataRelatedBaseTableNameSet, queryUsedBaseTablePartitionNameSet).isEmpty()) { + // if mv can not offer any partition for query, query rewrite bail out + return null; + } + // Check when mv partition relates base table partition data change or delete partition + Set rewrittenPlanUsePartitionNameSet = new HashSet<>(); + List mvOlapScanList = rewrittenPlan.collectToList(node -> + node instanceof LogicalOlapScan + && Objects.equals(((CatalogRelation) node).getTable().getName(), mtmv.getName())); + for (Object olapScanObj : mvOlapScanList) { + LogicalOlapScan olapScan = (LogicalOlapScan) olapScanObj; + olapScan.getSelectedPartitionIds().forEach(id -> + rewrittenPlanUsePartitionNameSet.add(olapScan.getTable().getPartition(id).getName())); + } + // If rewritten plan use but not in mv valid partition name set, need remove in mv and base table union + Sets.difference(rewrittenPlanUsePartitionNameSet, mvValidPartitionNameSet) + .copyInto(mvNeedRemovePartitionNameSet); + for (String partitionName : mvNeedRemovePartitionNameSet) { + baseTableNeedUnionPartitionNameSet.addAll(partitionMapping.key().get(partitionName)); + } + // If related base table create partitions or mv is created with ttl, need base table union + Sets.difference(queryUsedBaseTablePartitionNameSet, mvValidBaseTablePartitionNameSet) + .copyInto(baseTableNeedUnionPartitionNameSet); + Map> mvPartitionNeedRemoveNameMap = new HashMap<>(); + if (!mvNeedRemovePartitionNameSet.isEmpty()) { + mvPartitionNeedRemoveNameMap.put(new BaseTableInfo(mtmv), mvNeedRemovePartitionNameSet); + } + Map> baseTablePartitionNeedUnionNameMap = new HashMap<>(); + if (!baseTableNeedUnionPartitionNameSet.isEmpty()) { + baseTablePartitionNeedUnionNameMap.put(relatedPartitionTable, baseTableNeedUnionPartitionNameSet); + } + return Pair.of(mvPartitionNeedRemoveNameMap, baseTablePartitionNeedUnionNameMap); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/Predicates.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/Predicates.java index c801e683d6..139230be5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/Predicates.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/Predicates.java @@ -17,15 +17,8 @@ package org.apache.doris.nereids.rules.exploration.mv; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.PartitionInfo; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.Pair; -import org.apache.doris.mtmv.MTMVPartitionInfo; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.rules.exploration.mv.mapping.EquivalenceClassSetMapping; -import org.apache.doris.nereids.rules.exploration.mv.mapping.Mapping.MappedSlot; import org.apache.doris.nereids.rules.exploration.mv.mapping.SlotMapping; import org.apache.doris.nereids.rules.expression.ExpressionNormalization; import org.apache.doris.nereids.rules.expression.ExpressionOptimization; @@ -34,17 +27,13 @@ import org.apache.doris.nereids.trees.expressions.EqualTo; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral; -import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand; import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.Utils; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Multimap; import com.google.common.collect.Sets; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -231,47 +220,6 @@ public class Predicates { return Utils.toSqlString("Predicates", "pulledUpPredicates", pulledUpPredicates); } - /** Construct filter by partition - * @param partitions this is the partition which filter should be constructed from - * @param queryToViewSlotMapping construct filter on slot, the slot belong the slotmapping - * */ - public static Map> constructFilterByPartitions( - Multimap, Partition> partitions, - SlotMapping queryToViewSlotMapping) throws AnalysisException { - Map> constructedFilterMap = new HashMap<>(); - for (Map.Entry, Collection> entry : - partitions.asMap().entrySet()) { - // Get the base table partition column mv related - String relatedCol = entry.getKey().key().getRelatedCol(); - TableIf relatedTableInfo = entry.getKey().key().getRelatedTable(); - // Find the query slot which mv partition col mapped to - Optional partitionSlotQueryUsed = queryToViewSlotMapping.getRelationSlotMap() - .keySet() - .stream() - .filter(mappedSlot -> mappedSlot.getSlot().isColumnFromTable() - && mappedSlot.getSlot().getName().equals(relatedCol) - && mappedSlot.getBelongedRelation() != null - && mappedSlot.getBelongedRelation().getTable().getId() == relatedTableInfo.getId()) - .findFirst(); - if (!partitionSlotQueryUsed.isPresent()) { - return ImmutableMap.of(); - } - // Constructed filter which should add on the query base table, - // after supported data roll up this method should keep logic consistency to partition mapping - Set partitionExpressions = UpdateMvByPartitionCommand.constructPredicates( - // get mv partition items - entry.getValue().stream() - .map(partition -> entry.getKey().value().getItem(partition.getId())) - .collect(Collectors.toSet()), - partitionSlotQueryUsed.get().getSlot()); - // Put partition expressions on query base table - constructedFilterMap.computeIfPresent(relatedTableInfo, - (key, existExpressions) -> Sets.union(existExpressions, partitionExpressions)); - constructedFilterMap.computeIfAbsent(relatedTableInfo, key -> partitionExpressions); - } - return constructedFilterMap; - } - /** * The split different representation for predicate expression, such as equal, range and residual predicate. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java index 955431d5ce..a98127866c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/StructInfo.java @@ -17,10 +17,10 @@ package org.apache.doris.nereids.rules.exploration.mv; -import org.apache.doris.catalog.PartitionInfo; -import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Pair; +import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.jobs.executor.Rewriter; import org.apache.doris.nereids.jobs.joinorder.hypergraph.HyperGraph; @@ -45,6 +45,7 @@ import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation; import org.apache.doris.nereids.trees.plans.algebra.Filter; import org.apache.doris.nereids.trees.plans.algebra.Join; import org.apache.doris.nereids.trees.plans.algebra.Project; +import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.PredicateAddContext; import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand.PredicateAdder; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation; @@ -65,6 +66,7 @@ import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.BitSet; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -668,43 +670,49 @@ public class StructInfo { } /** - * Add predicates on base table when materialized view scan contains invalid partitions + * Add or remove partition on base table and mv when materialized view scan contains invalid partitions */ - public static class InvalidPartitionRemover extends DefaultPlanRewriter, Set>> { - // materialized view scan is always LogicalOlapScan, so just handle LogicalOlapScan + public static class PartitionRemover extends DefaultPlanRewriter>> { @Override - public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, Pair, Set> context) { - if (olapScan.getTable().getFullQualifiers().equals(context.key())) { - List selectedPartitionIds = olapScan.getSelectedPartitionIds(); - return olapScan.withSelectedPartitionIds(selectedPartitionIds.stream() - .filter(partitionId -> !context.value().contains(partitionId)) - .collect(Collectors.toList())); + public Plan visitLogicalOlapScan(LogicalOlapScan olapScan, + Map> context) { + // todo Support other partition table + BaseTableInfo tableInfo = new BaseTableInfo(olapScan.getTable()); + if (!context.containsKey(tableInfo)) { + return olapScan; } - return olapScan; + Set targetPartitionNameSet = context.get(tableInfo); + List selectedPartitionIds = new ArrayList<>(olapScan.getSelectedPartitionIds()); + // need remove partition + selectedPartitionIds = selectedPartitionIds.stream() + .filter(partitionId -> !targetPartitionNameSet.contains( + olapScan.getTable().getPartition(partitionId).getName())) + .collect(Collectors.toList()); + return olapScan.withSelectedPartitionIds(selectedPartitionIds); } } /** - * Collect partitions which scan used according to given table + * Collect partitions on base table */ - public static class QueryScanPartitionsCollector extends DefaultPlanVisitor>> { + public static class QueryScanPartitionsCollector extends DefaultPlanVisitor>> { @Override public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation, - Map> context) { + Map> targetTablePartitionMap) { TableIf table = catalogRelation.getTable(); - if (!context.containsKey(table.getId())) { + BaseTableInfo relatedPartitionTable = new BaseTableInfo(table); + if (!targetTablePartitionMap.containsKey(relatedPartitionTable)) { return catalogRelation; } - // Only support check olap partition currently + // todo Support other type partition table if (catalogRelation instanceof LogicalOlapScan) { LogicalOlapScan logicalOlapScan = (LogicalOlapScan) catalogRelation; - PartitionInfo partitionInfo = logicalOlapScan.getTable().getPartitionInfo(); - logicalOlapScan.getSelectedPartitionIds().stream() - .map(partitionInfo::getItem) - .forEach(partitionItem -> context.computeIfPresent(table.getId(), (key, oldValue) -> { - oldValue.add(partitionItem); - return oldValue; - })); + for (Long partitionId : logicalOlapScan.getSelectedPartitionIds()) { + Set partitions = targetTablePartitionMap.computeIfAbsent(relatedPartitionTable, + key -> new HashSet<>()); + partitions.add(logicalOlapScan.getTable().getPartition(partitionId)); + } } return catalogRelation; } @@ -713,10 +721,16 @@ public class StructInfo { /** * Add filter on table scan according to table filter map */ - public static Plan addFilterOnTableScan(Plan queryPlan, Map> filterOnOriginPlan, + public static Plan addFilterOnTableScan(Plan queryPlan, Map> partitionOnOriginPlan, + String partitionColumn, CascadesContext parentCascadesContext) { // Firstly, construct filter form invalid partition, this filter should be added on origin plan - Plan queryPlanWithUnionFilter = queryPlan.accept(new PredicateAdder(), filterOnOriginPlan); + PredicateAddContext predicateAddContext = new PredicateAddContext(partitionOnOriginPlan, partitionColumn); + Plan queryPlanWithUnionFilter = queryPlan.accept(new PredicateAdder(), + predicateAddContext); + if (!predicateAddContext.isAddSuccess()) { + return null; + } // Deep copy the plan to avoid the plan output is the same with the later union output, this may cause // exec by mistake queryPlanWithUnionFilter = new LogicalPlanDeepCopier().deepCopy( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index 7c97b0f881..22cca77062 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -20,12 +20,15 @@ package org.apache.doris.nereids.trees.plans.commands; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.ListPartitionItem; import org.apache.doris.catalog.MTMV; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionItem; import org.apache.doris.catalog.PartitionKey; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.nereids.analyzer.UnboundRelation; import org.apache.doris.nereids.analyzer.UnboundSlot; import org.apache.doris.nereids.analyzer.UnboundTableSinkCreator; @@ -36,6 +39,7 @@ import org.apache.doris.nereids.trees.expressions.InPredicate; import org.apache.doris.nereids.trees.expressions.IsNull; import org.apache.doris.nereids.trees.expressions.LessThan; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral; import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; @@ -45,6 +49,7 @@ import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTable import org.apache.doris.nereids.trees.plans.logical.LogicalCTE; import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalSink; import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias; @@ -65,6 +70,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -92,7 +98,7 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand { constructTableWithPredicates(mv, partitionNames, tableWithPartKey); List parts = constructPartsForMv(partitionNames); Plan plan = parser.parseSingle(mv.getQuerySql()); - plan = plan.accept(new PredicateAdder(), predicates); + plan = plan.accept(new PredicateAdder(), new PredicateAddContext(predicates)); if (plan instanceof Sink) { plan = plan.child(0); } @@ -203,14 +209,15 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand { /** * Add predicates on base table when mv can partition update, Also support plan that contain cte and view */ - public static class PredicateAdder extends DefaultPlanRewriter>> { + public static class PredicateAdder extends DefaultPlanRewriter { // record view and cte name parts, these should be ignored and visit it's actual plan public Set> virtualRelationNamePartSet = new HashSet<>(); @Override - public Plan visitUnboundRelation(UnboundRelation unboundRelation, Map> predicates) { - if (predicates.isEmpty()) { + public Plan visitUnboundRelation(UnboundRelation unboundRelation, PredicateAddContext predicates) { + + if (predicates.getPredicates() == null || predicates.getPredicates().isEmpty()) { return unboundRelation; } if (virtualRelationNamePartSet.contains(unboundRelation.getNameParts())) { @@ -219,15 +226,16 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand { List tableQualifier = RelationUtil.getQualifierName(ConnectContext.get(), unboundRelation.getNameParts()); TableIf table = RelationUtil.getTable(tableQualifier, Env.getCurrentEnv()); - if (predicates.containsKey(table)) { - return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.get(table))), + if (predicates.getPredicates().containsKey(table)) { + predicates.setAddSuccess(true); + return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.getPredicates().get(table))), unboundRelation); } return unboundRelation; } @Override - public Plan visitLogicalCTE(LogicalCTE cte, Map> predicates) { + public Plan visitLogicalCTE(LogicalCTE cte, PredicateAddContext predicates) { if (predicates.isEmpty()) { return cte; } @@ -240,7 +248,7 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand { @Override public Plan visitLogicalSubQueryAlias(LogicalSubQueryAlias subQueryAlias, - Map> predicates) { + PredicateAddContext predicates) { if (predicates.isEmpty()) { return subQueryAlias; } @@ -250,16 +258,110 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand { @Override public Plan visitLogicalCatalogRelation(LogicalCatalogRelation catalogRelation, - Map> predicates) { + PredicateAddContext predicates) { if (predicates.isEmpty()) { return catalogRelation; } - TableIf table = catalogRelation.getTable(); - if (predicates.containsKey(table)) { - return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.get(table))), - catalogRelation); + if (predicates.getPredicates() != null) { + TableIf table = catalogRelation.getTable(); + if (predicates.getPredicates().containsKey(table)) { + predicates.setAddSuccess(true); + return new LogicalFilter<>( + ImmutableSet.of(ExpressionUtils.or(predicates.getPredicates().get(table))), + catalogRelation); + } + } + if (predicates.getPartition() != null && predicates.getPartitionName() != null) { + if (!(catalogRelation instanceof LogicalOlapScan)) { + return catalogRelation; + } + for (Map.Entry> filterTableEntry : predicates.getPartition().entrySet()) { + LogicalOlapScan olapScan = (LogicalOlapScan) catalogRelation; + OlapTable targetTable = olapScan.getTable(); + if (!Objects.equals(new BaseTableInfo(targetTable), filterTableEntry.getKey())) { + continue; + } + Slot partitionSlot = null; + for (Slot slot : olapScan.getOutput()) { + if (((SlotReference) slot).getName().equals(predicates.getPartitionName())) { + partitionSlot = slot; + break; + } + } + if (partitionSlot == null) { + return catalogRelation; + } + // if partition has no data, doesn't add filter + Set partitionHasDataItems = new HashSet<>(); + for (String partitionName : filterTableEntry.getValue()) { + Partition partition = targetTable.getPartition(partitionName); + if (!targetTable.selectNonEmptyPartitionIds(Lists.newArrayList(partition.getId())).isEmpty()) { + // Add filter only when partition has filter + partitionHasDataItems.add(targetTable.getPartitionInfo().getItem(partition.getId())); + } + } + if (!partitionHasDataItems.isEmpty()) { + Set partitionExpressions = + constructPredicates(partitionHasDataItems, partitionSlot); + predicates.setAddSuccess(true); + return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(partitionExpressions)), + catalogRelation); + } + } } return catalogRelation; } } + + /** + * Predicate context, which support add predicate by expression or by partition name + * Add by predicates has high priority + */ + public static class PredicateAddContext { + + private final Map> predicates; + private final Map> partition; + private final String partitionName; + private boolean addSuccess = false; + + public PredicateAddContext(Map> predicates) { + this(predicates, null, null); + } + + public PredicateAddContext(Map> partition, + String partitionName) { + this(null, partition, partitionName); + } + + public PredicateAddContext(Map> predicates, Map> partition, + String partitionName) { + this.predicates = predicates; + this.partition = partition; + this.partitionName = partitionName; + } + + public Map> getPredicates() { + return predicates; + } + + public Map> getPartition() { + return partition; + } + + public String getPartitionName() { + return partitionName; + } + + public boolean isEmpty() { + return predicates == null && partition == null; + } + + public boolean isAddSuccess() { + return addSuccess; + } + + public void setAddSuccess(boolean addSuccess) { + this.addSuccess = addSuccess; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 7277ef7a40..b1d6d877e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1680,7 +1680,7 @@ public class SessionVariable implements Serializable, Writable { "When the materialized view is not enough to provide all the data for the query, " + "whether to allow the union of the base table and the materialized view to " + "respond to the query"}) - public boolean enableMaterializedViewUnionRewrite = false; + public boolean enableMaterializedViewUnionRewrite = true; @VariableMgr.VarAttr(name = ENABLE_MATERIALIZED_VIEW_NEST_REWRITE, needForward = true, description = {"是否允许嵌套物化视图改写", diff --git a/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out b/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out index 8559da6305..f998aaf593 100644 --- a/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out +++ b/regression-test/data/nereids_rules_p0/mv/partition_mv_rewrite.out @@ -1,29 +1,129 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !query_all_direct_before -- +-- !query_1_0_before -- 2023-10-17 2023-10-17 2 3 199.00 2023-10-18 2023-10-18 2 3 109.20 2023-10-19 2023-10-19 2 3 99.50 --- !query_all_direct_after -- +-- !query_1_0_after -- 2023-10-17 2023-10-17 2 3 199.00 2023-10-18 2023-10-18 2 3 109.20 2023-10-19 2023-10-19 2 3 99.50 --- !query_partition_before -- +-- !query_2_0_before -- 2023-10-18 2023-10-18 2 3 109.20 2023-10-19 2023-10-19 2 3 99.50 --- !query_partition_after -- +-- !query_2_0_after -- 2023-10-18 2023-10-18 2 3 109.20 2023-10-19 2023-10-19 2 3 99.50 --- !query_all_before -- +-- !query_3_0_before -- 2023-10-17 2023-10-17 2 3 199.00 2023-10-18 2023-10-18 2 3 109.20 2023-10-19 2023-10-19 2 3 99.50 --- !query_all_after -- +-- !query_3_0_after -- 2023-10-17 2023-10-17 2 3 199.00 2023-10-18 2023-10-18 2 3 109.20 2023-10-19 2023-10-19 2 3 99.50 +-- !query_4_0_before -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_4_0_after -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_5_0_before -- +2023-10-17 2023-10-17 2 3 199.00 +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 +2023-10-21 \N 2 3 \N + +-- !query_5_0_after -- +2023-10-17 2023-10-17 2 3 199.00 +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 +2023-10-21 \N 2 3 \N + +-- !query_6_0_before -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_6_0_after -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_7_0_before -- +2023-10-17 2023-10-17 2 3 199.00 +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 +2023-10-21 \N 2 3 \N + +-- !query_7_0_after -- +2023-10-17 2023-10-17 2 3 199.00 +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 +2023-10-21 \N 2 3 \N + +-- !query_8_0_before -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_8_0_after -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_9_0_before -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 +2023-10-21 \N 2 3 \N + +-- !query_9_0_after -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 +2023-10-21 \N 2 3 \N + +-- !query_10_0_before -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_10_0_after -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_11_0_before -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 +2023-10-21 \N 2 3 \N + +-- !query_11_0_after -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 +2023-10-21 \N 2 3 \N + +-- !query_12_0_before -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_12_0_after -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_14_0_before -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_14_0_after -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_16_0_before -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + +-- !query_16_0_after -- +2023-10-18 2023-10-18 2 3 109.20 +2023-10-19 2023-10-19 2 3 99.50 + diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index 47d990ebd6..bd6f75dc30 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -1034,6 +1034,32 @@ class Suite implements GroovyInterceptable { return debugPoint } + void waitingPartitionIsExpected(String tableName, String partitionName, boolean expectedStatus) { + Thread.sleep(2000); + String showPartitions = "show partitions from ${tableName}" + Boolean status = null; + List> result + long startTime = System.currentTimeMillis() + long timeoutTimestamp = startTime + 1 * 60 * 1000 // 1 min + do { + result = sql(showPartitions) + if (!result.isEmpty()) { + for (List row : result) { + def existPartitionName = row.get(1).toString() + if (Objects.equals(existPartitionName, partitionName)) { + def statusStr = row.get(row.size() - 2).toString() + status = Boolean.valueOf(statusStr) + } + } + } + Thread.sleep(500); + } while (timeoutTimestamp > System.currentTimeMillis() && !Objects.equals(status, expectedStatus)) + if (!Objects.equals(status, expectedStatus)) { + logger.info("partition status is not expected") + } + Assert.assertEquals(expectedStatus, status) + } + void waitingMTMVTaskFinished(String jobName) { Thread.sleep(2000); String showTasks = "select TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg from tasks('type'='mv') where JobName = '${jobName}' order by CreateTime ASC" diff --git a/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy b/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy index 1d34e9617d..ec3d2912df 100644 --- a/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy +++ b/regression-test/suites/nereids_rules_p0/mv/partition_mv_rewrite.groovy @@ -1,3 +1,5 @@ +import java.text.SimpleDateFormat + // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -22,7 +24,6 @@ suite("partition_mv_rewrite") { sql "set runtime_filter_mode=OFF" sql "SET enable_fallback_to_original_planner=false" sql "SET enable_materialized_view_rewrite=true" - sql "SET enable_nereids_timeout = false" sql """ drop table if exists orders @@ -42,7 +43,7 @@ suite("partition_mv_rewrite") { ) DUPLICATE KEY(o_orderkey, o_custkey) PARTITION BY RANGE(o_orderdate)( - FROM ('2023-10-17') TO ('2023-10-20') INTERVAL 1 DAY + FROM ('2023-10-16') TO ('2023-11-01') INTERVAL 1 DAY ) DISTRIBUTED BY HASH(o_orderkey) BUCKETS 3 PROPERTIES ( @@ -54,6 +55,7 @@ suite("partition_mv_rewrite") { drop table if exists lineitem """ + // test pre init partition sql""" CREATE TABLE IF NOT EXISTS lineitem ( l_orderkey integer not null, @@ -75,7 +77,7 @@ suite("partition_mv_rewrite") { ) DUPLICATE KEY(l_orderkey, l_partkey, l_suppkey, l_linenumber) PARTITION BY RANGE(l_shipdate) - (FROM ('2023-10-17') TO ('2023-10-20') INTERVAL 1 DAY) + (FROM ('2023-10-16') TO ('2023-11-01') INTERVAL 1 DAY) DISTRIBUTED BY HASH(l_orderkey) BUCKETS 3 PROPERTIES ( "replication_num" = "1" @@ -140,8 +142,8 @@ suite("partition_mv_rewrite") { BUILD IMMEDIATE REFRESH AUTO ON MANUAL partition by(l_shipdate) DISTRIBUTED BY RANDOM BUCKETS 2 - PROPERTIES ('replication_num' = '1') - AS + PROPERTIES ('replication_num' = '1') + AS ${mv_def_sql} """ @@ -158,37 +160,303 @@ suite("partition_mv_rewrite") { sql("${partition_sql}") contains("${mv_name}(${mv_name})") } - // partition is invalid, so can not use partition 2023-10-17 to rewrite + // base table partition data change sql """ - insert into lineitem values + insert into lineitem values (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'); """ - // wait partition is invalid - sleep(5000) - // only can use valid partition - sql "SET enable_materialized_view_union_rewrite=false" - // Test query all partition when disable enable_materialized_view_union_rewrite - order_qt_query_all_direct_before "${all_partition_sql}" + waitingPartitionIsExpected("${mv_name}", "p_20231017_20231018", false) + + sql "SET enable_materialized_view_union_rewrite=false;" + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_1_0_before "${all_partition_sql}" + sql "SET enable_materialized_view_rewrite=true" explain { sql("${all_partition_sql}") + // should rewrite fail when union rewrite disable if sub partition is invalid notContains("${mv_name}(${mv_name})") } - order_qt_query_all_direct_after "${all_partition_sql}" + order_qt_query_1_0_after "${all_partition_sql}" - // Test query part partition when disable enable_materialized_view_union_rewrite - order_qt_query_partition_before "${partition_sql}" + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_2_0_before "${partition_sql}" + sql "SET enable_materialized_view_rewrite=true" explain { sql("${partition_sql}") + // should rewrite successfully when union rewrite disable if doesn't query invalid partition contains("${mv_name}(${mv_name})") } - order_qt_query_partition_after "${partition_sql}" + order_qt_query_2_0_after "${partition_sql}" - // Test query part partition when enable enable_materialized_view_union_rewrite + // enable union rewrite sql "SET enable_materialized_view_union_rewrite=true" - order_qt_query_all_before "${all_partition_sql}" + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_3_0_before "${all_partition_sql}" + sql "SET enable_materialized_view_rewrite=true" explain { sql("${all_partition_sql}") + // should rewrite successful when union rewrite enalbe if sub partition is invalid contains("${mv_name}(${mv_name})") } - order_qt_query_all_after "${all_partition_sql}" + order_qt_query_3_0_after "${all_partition_sql}" + + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_4_0_before "${partition_sql}" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${partition_sql}") + // should rewrite successfully when union rewrite enable if doesn't query invalid partition + contains("${mv_name}(${mv_name})") + } + order_qt_query_4_0_after "${partition_sql}" + + + // base table add partition + sql "REFRESH MATERIALIZED VIEW ${mv_name} AUTO" + sql "SET enable_materialized_view_union_rewrite=false" + waitingMTMVTaskFinished(getJobName(db, mv_name)) + sql """ + insert into lineitem values + (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-21', '2023-10-21', '2023-10-21', 'a', 'b', 'yyyyyyyyy'); + """ + + + waitingPartitionIsExpected("${mv_name}", "p_20231021_20231022", false) + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_5_0_before "${all_partition_sql}" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${all_partition_sql}") + // should rewrite fail when union rewrite disable if base table add new partition + notContains("${mv_name}(${mv_name})") + } + order_qt_query_5_0_after "${all_partition_sql}" + + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_6_0_before "${partition_sql}" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${partition_sql}") + // should rewrite successfully when union rewrite disable if doesn't query new partition + contains("${mv_name}(${mv_name})") + } + order_qt_query_6_0_after "${partition_sql}" + + // enable union rewrite + sql "SET enable_materialized_view_union_rewrite=true" + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_7_0_before "${all_partition_sql}" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${all_partition_sql}") + // should rewrite successful when union rewrite enalbe if base table add new partition + contains("${mv_name}(${mv_name})") + } + order_qt_query_7_0_after "${all_partition_sql}" + + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_8_0_before "${partition_sql}" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${partition_sql}") + // should rewrite successfully when union rewrite enable if doesn't query new partition + contains("${mv_name}(${mv_name})") + } + order_qt_query_8_0_after "${partition_sql}" + + + // base table delete partition test + sql "REFRESH MATERIALIZED VIEW ${mv_name} AUTO" + sql "SET enable_materialized_view_union_rewrite=false" + waitingMTMVTaskFinished(getJobName(db, mv_name)) + sql """ ALTER TABLE lineitem DROP PARTITION IF EXISTS p_20231017 FORCE; + """ + // show partitions will cause error, tmp comment +// waitingPartitionIsExpected("${mv_name}", "p_20231017_20231018", false) + + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_9_0_before "${all_partition_sql}" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${all_partition_sql}") + // should rewrite fail when union rewrite disable if base table delete partition + notContains("${mv_name}(${mv_name})") + } + order_qt_query_9_0_after "${all_partition_sql}" + + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_10_0_before "${partition_sql}" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${partition_sql}") + // should rewrite successfully when union rewrite disable if doesn't query deleted partition + contains("${mv_name}(${mv_name})") + } + order_qt_query_10_0_after "${partition_sql}" + + // enable union rewrite + sql "SET enable_materialized_view_union_rewrite=true" + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_11_0_before "${all_partition_sql}" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${all_partition_sql}") + // should rewrite successful when union rewrite enalbe if base table delete partition + contains("${mv_name}(${mv_name})") + } + order_qt_query_11_0_after "${all_partition_sql}" + + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_12_0_before "${partition_sql}" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${partition_sql}") + // should rewrite successfully when union rewrite enable if doesn't query deleted partition + contains("${mv_name}(${mv_name})") + } + order_qt_query_12_0_after "${partition_sql}" + sql """ DROP MATERIALIZED VIEW IF EXISTS mv_10086""" + + // test mv with ttl + def today_str = new SimpleDateFormat("yyyy-MM-dd").format(new Date()).toString(); + + sql """ + drop table if exists lineitem_static; + """ + sql""" + CREATE TABLE IF NOT EXISTS lineitem_static ( + l_orderkey integer not null, + l_partkey integer not null, + l_suppkey integer not null, + l_linenumber integer not null, + l_quantity decimalv3(15,2) not null, + l_extendedprice decimalv3(15,2) not null, + l_discount decimalv3(15,2) not null, + l_tax decimalv3(15,2) not null, + l_returnflag char(1) not null, + l_linestatus char(1) not null, + l_shipdate date not null, + l_commitdate date not null, + l_receiptdate date not null, + l_shipinstruct char(25) not null, + l_shipmode char(10) not null, + l_comment varchar(44) not null + ) + DUPLICATE KEY(l_orderkey, l_partkey, l_suppkey, l_linenumber) + PARTITION BY RANGE(l_shipdate) + ( + PARTITION `p1` VALUES LESS THAN ("2023-10-18"), + PARTITION `p2` VALUES [("2023-10-18"), ("2023-10-20")), + PARTITION `other` VALUES LESS THAN (MAXVALUE) + ) + DISTRIBUTED BY HASH(l_orderkey) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1" + ); + """ + sql """ + insert into lineitem_static values + (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'), + (2, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy'), + (3, 2, 3, 6, 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', '2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx'); + """ + sql """ + insert into lineitem_static values + (1, 2, 3, 4, 5.5, 6.5, 7.5, 8.5, 'o', 'k', '${today_str}', '${today_str}', '${today_str}', 'a', 'b', 'yyyyyyyyy'); + """ + + def ttl_mv_def_sql = """ + select l_shipdate, o_orderdate, l_partkey, + l_suppkey, sum(o_totalprice) as sum_total + from lineitem_static + left join orders on l_orderkey = o_orderkey and l_shipdate = o_orderdate + group by + l_shipdate, + o_orderdate, + l_partkey, + l_suppkey; + """ + def ttl_all_partition_sql = """ + select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total + from lineitem_static + left join orders on l_orderkey = o_orderkey and l_shipdate = o_orderdate + group by + l_shipdate, + o_orderdate, + l_partkey, + l_suppkey; + """ + def ttl_partition_sql = """ + select l_shipdate, o_orderdate, l_partkey, l_suppkey, sum(o_totalprice) as sum_total + from lineitem_static + left join orders on l_orderkey = o_orderkey and l_shipdate = o_orderdate + where (l_shipdate>= '2023-10-18' and l_shipdate <= '2023-10-19') + group by + l_shipdate, + o_orderdate, + l_partkey, + l_suppkey; + """ + def ttl_mv_name = "mv_10000" + + sql """analyze table lineitem_static with sync;""" + + def create_ttl_mtmv = { db_name, mv_inner_name, mv_inner_sql -> + sql """DROP MATERIALIZED VIEW IF EXISTS ${mv_inner_name}""" + sql""" + CREATE MATERIALIZED VIEW ${mv_inner_name} + BUILD IMMEDIATE REFRESH COMPLETE ON MANUAL + PARTITION BY(l_shipdate) + DISTRIBUTED BY RANDOM BUCKETS 2 + PROPERTIES ( + 'replication_num' = '1', + 'partition_sync_limit' = 2, + 'partition_sync_time_unit' = 'DAY', + 'partition_date_format' = 'yyyy-MM-dd') + AS ${mv_inner_sql} + """ + waitingMTMVTaskFinished(getJobName(db_name, mv_inner_name)) + } + + create_ttl_mtmv(db, ttl_mv_name, ttl_mv_def_sql) + + // test when mv is ttl + sql "SET enable_materialized_view_union_rewrite=false" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${ttl_all_partition_sql}") + // should rewrite fail when union rewrite disable and mv is ttl + notContains("${ttl_mv_name}(${ttl_mv_name})") + } + + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_14_0_before "${ttl_partition_sql}" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${ttl_partition_sql}") + // should rewrite fail when union rewrite disable and query the partition which is not in mv + notContains("${ttl_mv_name}(${ttl_mv_name})") + } + order_qt_query_14_0_after "${ttl_partition_sql}" + + // enable union rewrite + sql "SET enable_materialized_view_union_rewrite=true" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${ttl_all_partition_sql}") + // should rewrite successful when union rewrite enalbe and mv is ttl, query the partition which is in mv + contains("${ttl_mv_name}(${ttl_mv_name})") + } + + sql "SET enable_materialized_view_rewrite=false" + order_qt_query_16_0_before "${ttl_partition_sql}" + sql "SET enable_materialized_view_rewrite=true" + explain { + sql("${ttl_partition_sql}") + // should rewrite fail when union rewrite enalbe and query the partition which is not in mv + notContains("${ttl_mv_name}(${ttl_mv_name})") + } + order_qt_query_16_0_after "${ttl_partition_sql}" + + sql """ DROP MATERIALIZED VIEW IF EXISTS ${ttl_mv_name}""" }