diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 7bcea8a1e5..2d3596c8ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -40,6 +40,7 @@ import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import java.util.ArrayList; @@ -84,6 +85,15 @@ public class NereidsPlanner extends Planner { logicalPlanAdapter.setColLabels(columnLabelList); } + @VisibleForTesting + public void plan(StatementBase queryStmt) { + try { + plan(queryStmt, statementContext.getConnectContext().getSessionVariable().toThrift()); + } catch (UserException e) { + throw new RuntimeException(e); + } + } + /** * Do analyze and optimize for query plan. * diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java index 9fd2d8667e..76d101148d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.OutFileClause; import org.apache.doris.analysis.Queriable; import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.analysis.StatementBase; +import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import java.util.ArrayList; @@ -82,4 +83,8 @@ public class LogicalPlanAdapter extends StatementBase implements Queriable { // TODO: generate real digest return ""; } + + public static LogicalPlanAdapter of(Plan plan) { + return new LogicalPlanAdapter((LogicalPlan) plan); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 7219a435a5..d47bf1c1aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -297,11 +297,25 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor> having, Resolver resolver) { LogicalAggregate aggregate = having.child(); - Expression newPredicates = ExpressionReplacer.INSTANCE - .visit(having.getPredicates(), resolver.getSubstitution()); + Expression newPredicates = ExpressionUtils.replace(having.getPredicates(), resolver.getSubstitution()); List newOutputExpressions = Streams.concat( aggregate.getOutputExpressions().stream(), resolver.getNewOutputSlots().stream() ).collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/mv/SelectRollup.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/mv/SelectRollup.java new file mode 100644 index 0000000000..03cc0e4b45 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/mv/SelectRollup.java @@ -0,0 +1,410 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.mv; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.annotation.Developing; +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory; +import org.apache.doris.nereids.trees.expressions.ComparisonPredicate; +import org.apache.doris.nereids.trees.expressions.EqualTo; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.InPredicate; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.NullSafeEqual; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.functions.AggregateFunction; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.trees.plans.algebra.Project; +import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.util.ExpressionUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Select rollup. + */ +@Developing +public class SelectRollup implements RewriteRuleFactory { + /////////////////////////////////////////////////////////////////////////// + // All the patterns + /////////////////////////////////////////////////////////////////////////// + @Override + public List buildRules() { + return ImmutableList.of( + // only agg above scan + // Aggregate(Scan) + logicalAggregate(logicalOlapScan().when(LogicalOlapScan::shouldSelectRollup)).then(agg -> { + LogicalOlapScan scan = agg.child(); + return agg.withChildren( + scan.withCandidateIndexIds( + selectCandidateRollupIds( + scan, + agg.getInputSlots(), + ImmutableList.of(), + extractAggFunctionAndReplaceSlot(agg, Optional.empty()) + ) + ) + ); + }).toRule(RuleType.ROLLUP_AGG_SCAN), + + // filter could push down scan. + // Aggregate(Filter(Scan)) + logicalAggregate(logicalFilter(logicalOlapScan().when(LogicalOlapScan::shouldSelectRollup))) + .then(agg -> { + LogicalFilter filter = agg.child(); + LogicalOlapScan scan = filter.child(); + ImmutableSet requiredSlots = ImmutableSet.builder() + .addAll(agg.getInputSlots()) + .addAll(filter.getInputSlots()) + .build(); + return agg.withChildren(filter.withChildren( + scan.withCandidateIndexIds( + selectCandidateRollupIds( + scan, + requiredSlots, + filter.getConjuncts(), + extractAggFunctionAndReplaceSlot(agg, Optional.empty()) + ) + ) + )); + }).toRule(RuleType.ROLLUP_AGG_FILTER_SCAN), + + // column pruning or other projections such as alias, etc. + // Aggregate(Project(Scan)) + logicalAggregate(logicalProject(logicalOlapScan().when(LogicalOlapScan::shouldSelectRollup))) + .then(agg -> { + LogicalProject project = agg.child(); + LogicalOlapScan scan = project.child(); + return agg.withChildren( + project.withChildren( + scan.withCandidateIndexIds( + selectCandidateRollupIds( + scan, + project.getInputSlots(), + ImmutableList.of(), + extractAggFunctionAndReplaceSlot(agg, + Optional.of(project)) + ) + ) + ) + ); + }).toRule(RuleType.ROLLUP_AGG_PROJECT_SCAN), + + // filter could push down and project. + // Aggregate(Project(Filter(Scan))) + logicalAggregate(logicalProject(logicalFilter(logicalOlapScan() + .when(LogicalOlapScan::shouldSelectRollup)))).then(agg -> { + LogicalProject> project = agg.child(); + LogicalFilter filter = project.child(); + LogicalOlapScan scan = filter.child(); + return agg.withChildren(project.withChildren(filter.withChildren( + scan.withCandidateIndexIds(selectCandidateRollupIds( + scan, + agg.getInputSlots(), + filter.getConjuncts(), + extractAggFunctionAndReplaceSlot(agg, Optional.of(project)) + ) + ) + ))); + }).toRule(RuleType.ROLLUP_AGG_PROJECT_FILTER_SCAN), + + // filter can't push down + // Aggregate(Filter(Project(Scan))) + logicalAggregate(logicalFilter(logicalProject(logicalOlapScan() + .when(LogicalOlapScan::shouldSelectRollup)))).then(agg -> { + LogicalFilter> filter = agg.child(); + LogicalProject project = filter.child(); + LogicalOlapScan scan = project.child(); + return agg.withChildren(filter.withChildren(project.withChildren( + scan.withCandidateIndexIds(selectCandidateRollupIds( + scan, + project.getInputSlots(), + ImmutableList.of(), + extractAggFunctionAndReplaceSlot(agg, Optional.of(project)) + ) + ) + ))); + }).toRule(RuleType.ROLLUP_AGG_FILTER_PROJECT_SCAN) + ); + } + + /////////////////////////////////////////////////////////////////////////// + // Main entrance of select rollup + /////////////////////////////////////////////////////////////////////////// + + /** + * Select candidate rollup ids. + *

+ * todo: 0. turn off pre agg, checking input aggregate functions and group by expressions, etc. + * 1. rollup contains all the required output slots. + * 2. match the most prefix index if pushdown predicates present. + */ + private List selectCandidateRollupIds( + LogicalOlapScan olapScan, + Set requiredScanOutput, + List predicates, + // not used now, reserved for checking aggregate function type match. + List aggregateFunctions) { + Preconditions.checkArgument(Sets.newHashSet(olapScan.getOutput()).containsAll(requiredScanOutput), + "Scan's output should contains all the input required scan output."); + + OlapTable table = olapScan.getTable(); + + // Scan slot exprId -> slot name + Map exprIdToName = olapScan.getOutput() + .stream() + .collect(Collectors.toMap(NamedExpression::getExprId, NamedExpression::getName)); + + // get required column names in metadata. + Set requiredColumnNames = requiredScanOutput + .stream() + .map(slot -> exprIdToName.get(slot.getExprId())) + .collect(Collectors.toSet()); + + // 1. filter rollup contains all the required columns by column name. + List containAllRequiredColumns = table.getVisibleIndex().stream() + .filter(rollup -> table.getSchemaByIndexId(rollup.getId(), true) + .stream() + .map(Column::getName) + .collect(Collectors.toSet()) + .containsAll(requiredColumnNames) + ).collect(Collectors.toList()); + + Map> split = filterCanUsePrefixIndexAndSplitByEquality(predicates, exprIdToName); + Set equalColNames = split.getOrDefault(true, ImmutableSet.of()); + Set nonEqualColNames = split.getOrDefault(false, ImmutableSet.of()); + + // 2. find matching key prefix most. + List matchingKeyPrefixMost; + if (!(equalColNames.isEmpty() && nonEqualColNames.isEmpty())) { + List matchingResult = matchKeyPrefixMost(table, containAllRequiredColumns, + equalColNames, nonEqualColNames); + matchingKeyPrefixMost = matchingResult.isEmpty() ? containAllRequiredColumns : matchingResult; + } else { + matchingKeyPrefixMost = containAllRequiredColumns; + } + + List partitionIds = olapScan.getSelectedPartitionIds(); + // 3. sort by row count, column count and index id. + return matchingKeyPrefixMost.stream() + .map(MaterializedIndex::getId) + .sorted(Comparator + // compare by row count + .comparing(rid -> partitionIds.stream() + .mapToLong(pid -> table.getPartition(pid).getIndex((Long) rid).getRowCount()) + .sum()) + // compare by column count + .thenComparing(rid -> table.getSchemaByIndexId((Long) rid).size()) + // compare by rollup index id + .thenComparing(rid -> (Long) rid)) + .collect(Collectors.toList()); + } + + /////////////////////////////////////////////////////////////////////////// + // Matching key prefix + /////////////////////////////////////////////////////////////////////////// + private List matchKeyPrefixMost( + OlapTable table, + List rollups, + Set equalColumns, + Set nonEqualColumns) { + TreeMap> collect = rollups.stream() + .collect(Collectors.toMap( + rollup -> rollupKeyPrefixMatchCount(table, rollup, equalColumns, nonEqualColumns), + Lists::newArrayList, + (l1, l2) -> { + l1.addAll(l2); + return l1; + }, + TreeMap::new) + ); + return collect.descendingMap().firstEntry().getValue(); + } + + private int rollupKeyPrefixMatchCount( + OlapTable table, + MaterializedIndex rollup, + Set equalColNames, + Set nonEqualColNames) { + int matchCount = 0; + for (Column column : table.getSchemaByIndexId(rollup.getId())) { + if (equalColNames.contains(column.getName())) { + matchCount++; + } else if (nonEqualColNames.contains(column.getName())) { + // Unequivalence predicate's columns can match only first column in rollup. + matchCount++; + break; + } else { + break; + } + } + return matchCount; + } + + /////////////////////////////////////////////////////////////////////////// + // Split conjuncts into equal-to and non-equal-to. + /////////////////////////////////////////////////////////////////////////// + + /** + * Filter the input conjuncts those can use prefix and split into 2 groups: is equal-to or non-equal-to predicate + * when comparing the key column. + */ + private Map> filterCanUsePrefixIndexAndSplitByEquality( + List conjunct, Map exprIdToColName) { + return conjunct.stream() + .map(expr -> PredicateChecker.canUsePrefixIndex(expr, exprIdToColName)) + .filter(result -> !result.equals(CheckResult.FAILURE)) + .collect(Collectors.groupingBy( + result -> result.type == ResultType.SUCCESS_EQUAL, + Collectors.mapping(result -> result.colName, Collectors.toSet()) + )); + } + + private enum ResultType { + FAILURE, + SUCCESS_EQUAL, + SUCCESS_NON_EQUAL, + } + + private static class CheckResult { + public static final CheckResult FAILURE = new CheckResult(null, ResultType.FAILURE); + private final String colName; + private final ResultType type; + + private CheckResult(String colName, ResultType result) { + this.colName = colName; + this.type = result; + } + + public static CheckResult createEqual(String name) { + return new CheckResult(name, ResultType.SUCCESS_EQUAL); + } + + public static CheckResult createNonEqual(String name) { + return new CheckResult(name, ResultType.SUCCESS_NON_EQUAL); + } + } + + /** + * Check if an expression could prefix key index. + */ + private static class PredicateChecker extends ExpressionVisitor> { + private static final PredicateChecker INSTANCE = new PredicateChecker(); + + private PredicateChecker() { + } + + public static CheckResult canUsePrefixIndex(Expression expression, Map exprIdToName) { + return expression.accept(INSTANCE, exprIdToName); + } + + @Override + public CheckResult visit(Expression expr, Map context) { + return CheckResult.FAILURE; + } + + @Override + public CheckResult visitInPredicate(InPredicate in, Map context) { + Optional slotOrCastOnSlot = ExpressionUtils.isSlotOrCastOnSlot(in.getCompareExpr()); + if (slotOrCastOnSlot.isPresent() && in.getOptions().stream().allMatch(Literal.class::isInstance)) { + return CheckResult.createEqual(context.get(slotOrCastOnSlot.get())); + } else { + return CheckResult.FAILURE; + } + } + + @Override + public CheckResult visitComparisonPredicate(ComparisonPredicate cp, Map context) { + if (cp instanceof EqualTo || cp instanceof NullSafeEqual) { + return check(cp, context, CheckResult::createEqual); + } else { + return check(cp, context, CheckResult::createNonEqual); + } + } + + private CheckResult check(ComparisonPredicate cp, Map exprIdToColumnName, + Function resultMapper) { + return check(cp).map(exprId -> resultMapper.apply(exprIdToColumnName.get(exprId))) + .orElse(CheckResult.FAILURE); + } + + private Optional check(ComparisonPredicate cp) { + Optional exprId = check(cp.left(), cp.right()); + return exprId.isPresent() ? exprId : check(cp.right(), cp.left()); + } + + private Optional check(Expression maybeSlot, Expression maybeConst) { + Optional exprIdOpt = ExpressionUtils.isSlotOrCastOnSlot(maybeSlot); + return exprIdOpt.isPresent() && maybeConst.isConstant() ? exprIdOpt : Optional.empty(); + } + } + + /** + * Do aggregate function extraction and replace aggregate function's input slots by underlying project. + *

+ * 1. extract aggregate functions in aggregate plan. + *

+ * 2. replace aggregate function's input slot by underlying project expression if project is present. + *

+ * For example: + *

+     * input arguments:
+     * agg: Aggregate(sum(v) as sum_value)
+     * underlying project: Project(a + b as v)
+     *
+     * output:
+     * sum(a + b)
+     * 
+ */ + private List extractAggFunctionAndReplaceSlot( + LogicalAggregate agg, + Optional> project) { + Optional> slotToProducerOpt = project.map(Project::getSlotToProducer); + return agg.getOutputExpressions().stream() + // extract aggregate functions. + .flatMap(e -> e.>collect(AggregateFunction.class::isInstance).stream()) + // replace aggregate function's input slot by its producing expression. + .map(expr -> slotToProducerOpt.map(slotToExpressions + -> (AggregateFunction) ExpressionUtils.replace(expr, slotToExpressions)) + .orElse(expr) + ) + .collect(Collectors.toList()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggregateDisassemble.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggregateDisassemble.java index 87363e4273..7d68752e07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggregateDisassemble.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggregateDisassemble.java @@ -24,10 +24,10 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.AggregateFunction; -import org.apache.doris.nereids.trees.expressions.visitor.ExpressionReplacer; import org.apache.doris.nereids.trees.plans.AggPhase; import org.apache.doris.nereids.trees.plans.GroupPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.util.ExpressionUtils; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -112,11 +112,11 @@ public class AggregateDisassemble extends OneRewriteRuleFactory { // 3. replace expression in globalOutputExprs and globalGroupByExprs List globalOutputExprs = aggregate.getOutputExpressions().stream() - .map(e -> ExpressionReplacer.INSTANCE.visit(e, inputSubstitutionMap)) + .map(e -> ExpressionUtils.replace(e, inputSubstitutionMap)) .map(NamedExpression.class::cast) .collect(Collectors.toList()); List globalGroupByExprs = localGroupByExprs.stream() - .map(e -> ExpressionReplacer.INSTANCE.visit(e, inputSubstitutionMap)).collect(Collectors.toList()); + .map(e -> ExpressionUtils.replace(e, inputSubstitutionMap)).collect(Collectors.toList()); // 4. generate new plan LogicalAggregate localAggregate = new LogicalAggregate<>( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/NormalizeAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/NormalizeAggregate.java index 4d42fff964..0fe139b85b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/NormalizeAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/NormalizeAggregate.java @@ -25,10 +25,10 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.AggregateFunction; -import org.apache.doris.nereids.trees.expressions.visitor.ExpressionReplacer; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.util.ExpressionUtils; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -126,7 +126,7 @@ public class NormalizeAggregate extends OneRewriteRuleFactory { root = new LogicalAggregate<>(newKeys, newOutputs, aggregate.isDisassembled(), true, aggregate.getAggPhase(), root); List projections = outputs.stream() - .map(e -> ExpressionReplacer.INSTANCE.visit(e, substitutionMap)) + .map(e -> ExpressionUtils.replace(e, substitutionMap)) .map(NamedExpression.class::cast) .collect(Collectors.toList()); root = new LogicalProject<>(projections, root); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SwapFilterAndProject.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SwapFilterAndProject.java index d8c1e56c13..7d6b0e46f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SwapFilterAndProject.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SwapFilterAndProject.java @@ -20,37 +20,30 @@ package org.apache.doris.nereids.rules.rewrite.logical; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory; -import org.apache.doris.nereids.trees.expressions.Alias; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.NamedExpression; -import org.apache.doris.nereids.trees.expressions.visitor.ExpressionReplacer; import org.apache.doris.nereids.trees.plans.GroupPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; -import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import org.apache.doris.nereids.util.ExpressionUtils; /** - * Rewrite filter -> project to project -> filter. + * Push down filter through project. + * input: + * filter(a>2, b=0) -> project(c+d as a, e as b) + * output: + * project(c+d as a, e as b) -> filter(c+d>2, e=0). */ public class SwapFilterAndProject extends OneRewriteRuleFactory { @Override public Rule build() { - return logicalFilter(logicalProject()).thenApply(ctx -> { - LogicalFilter> filter = ctx.root; + return logicalFilter(logicalProject()).then(filter -> { LogicalProject project = filter.child(); - List namedExpressionList = project.getProjects(); - Map slotToAlias = new HashMap<>(); - namedExpressionList.stream().filter(Alias.class::isInstance).forEach(s -> { - slotToAlias.put(s.toSlot(), ((Alias) s).child()); - }); - Expression rewrittenPredicate = ExpressionReplacer.INSTANCE.visit(filter.getPredicates(), slotToAlias); - LogicalFilter rewrittenFilter = - new LogicalFilter(rewrittenPredicate, project.child()); - return new LogicalProject(project.getProjects(), rewrittenFilter); + return new LogicalProject<>( + project.getProjects(), + new LogicalFilter<>( + ExpressionUtils.replace(filter.getPredicates(), project.getSlotToProducer()), + project.child() + ) + ); }).toRule(RuleType.SWAP_FILTER_AND_PROJECT); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ExpressionReplacer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ExpressionReplacer.java deleted file mode 100644 index f3fd35d695..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ExpressionReplacer.java +++ /dev/null @@ -1,39 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.trees.expressions.visitor; - -import org.apache.doris.nereids.trees.expressions.Expression; - -import java.util.Map; - -/** - * replace expr nodes by substitutionMap - */ -public class ExpressionReplacer - extends DefaultExpressionRewriter> { - public static final ExpressionReplacer INSTANCE = new ExpressionReplacer(); - - @Override - public Expression visit(Expression expr, Map substitutionMap) { - if (substitutionMap.containsKey(expr)) { - return substitutionMap.get(expr); - } - return super.visit(expr, substitutionMap); - } -} - diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Filter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Filter.java index f4371eb4f7..1e4048f582 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Filter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Filter.java @@ -18,10 +18,17 @@ package org.apache.doris.nereids.trees.plans.algebra; import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.util.ExpressionUtils; + +import java.util.List; /** * Common interface for logical/physical filter. */ public interface Filter { Expression getPredicates(); + + default List getConjuncts() { + return ExpressionUtils.extractConjunction(getPredicates()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Project.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Project.java index 1203998a5b..2cc9fec4b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Project.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Project.java @@ -17,13 +17,37 @@ package org.apache.doris.nereids.trees.plans.algebra; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; /** * Common interface for logical/physical project. */ public interface Project { List getProjects(); + + /** + * Generate a map that the key is the project output slot, corresponding value is the expression produces the slot. + * Note that alias is striped off. + */ + default Map getSlotToProducer() { + return getProjects() + .stream() + .collect(Collectors.toMap( + NamedExpression::toSlot, + namedExpr -> { + if (namedExpr instanceof Alias) { + return ((Alias) namedExpr).child(); + } else { + return namedExpr; + } + }) + ); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java index f847bdd5d6..7b13ace319 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java @@ -39,10 +39,12 @@ import java.util.Optional; /** * Logical Aggregate plan. *

- * eg:select a, sum(b), c from table group by a, c; - * groupByExprList: Column field after group by. eg: a, c; - * outputExpressionList: Column field after select. eg: a, sum(b), c; - * partitionExprList: Column field after partition by. + * For example SQL: + *

+ * select a, sum(b), c from table group by a, c; + *

+ * groupByExpressions: Column field after group by. eg: a, c; + * outputExpressions: Column field after select. eg: a, sum(b), c; *

* Each agg node only contains the select statement field of the same layer, * and other agg nodes in the subquery contain. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java index 67436166f5..36350f8fee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java @@ -27,9 +27,11 @@ import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.commons.collections.CollectionUtils; import java.util.List; import java.util.Optional; @@ -43,37 +45,42 @@ public class LogicalOlapScan extends LogicalRelation { private final List selectedTabletId; private final boolean partitionPruned; + private final List candidateIndexIds; + private final boolean rollupSelected; + public LogicalOlapScan(OlapTable table) { this(table, ImmutableList.of()); } public LogicalOlapScan(OlapTable table, List qualifier) { this(table, qualifier, Optional.empty(), Optional.empty(), - table.getPartitionIds(), false); + table.getPartitionIds(), false, ImmutableList.of(), false); } public LogicalOlapScan(Table table, List qualifier) { this(table, qualifier, Optional.empty(), Optional.empty(), - ((OlapTable) table).getPartitionIds(), false); + ((OlapTable) table).getPartitionIds(), false, ImmutableList.of(), false); } /** * Constructor for LogicalOlapScan. - * - * @param table Doris table - * @param qualifier table name qualifier */ public LogicalOlapScan(Table table, List qualifier, Optional groupExpression, Optional logicalProperties, List selectedPartitionIdList, - boolean partitionPruned) { + boolean partitionPruned, List candidateIndexIds, boolean rollupSelected) { super(PlanType.LOGICAL_OLAP_SCAN, table, qualifier, groupExpression, logicalProperties, selectedPartitionIdList); - this.selectedIndexId = getTable().getBaseIndexId(); + // TODO: use CBO manner to select best index id, according to index's statistics info, + // revisit this after rollup and materialized view selection are fully supported. + this.selectedIndexId = CollectionUtils.isEmpty(candidateIndexIds) + ? getTable().getBaseIndexId() : candidateIndexIds.get(0); this.selectedTabletId = Lists.newArrayList(); for (Partition partition : getTable().getAllPartitions()) { selectedTabletId.addAll(partition.getBaseIndex().getTabletIdsInOrder()); } this.partitionPruned = partitionPruned; + this.candidateIndexIds = candidateIndexIds; + this.rollupSelected = rollupSelected; } @Override @@ -86,7 +93,9 @@ public class LogicalOlapScan extends LogicalRelation { public String toString() { return Utils.toSqlString("LogicalOlapScan", "qualified", qualifiedName(), - "output", getOutput() + "output", getOutput(), + "candidateIndexIds", candidateIndexIds, + "selectedIndexId", selectedIndexId ); } @@ -104,18 +113,23 @@ public class LogicalOlapScan extends LogicalRelation { @Override public Plan withGroupExpression(Optional groupExpression) { return new LogicalOlapScan(table, qualifier, groupExpression, Optional.of(getLogicalProperties()), - selectedPartitionIds, partitionPruned); + selectedPartitionIds, partitionPruned, candidateIndexIds, rollupSelected); } @Override public LogicalOlapScan withLogicalProperties(Optional logicalProperties) { return new LogicalOlapScan(table, qualifier, Optional.empty(), logicalProperties, selectedPartitionIds, - partitionPruned); + partitionPruned, candidateIndexIds, rollupSelected); } public LogicalOlapScan withSelectedPartitionId(List selectedPartitionId) { - return new LogicalOlapScan(table, qualifier, Optional.empty(), Optional.of(logicalPropertiesSupplier.get()), - selectedPartitionId, true); + return new LogicalOlapScan(table, qualifier, Optional.empty(), Optional.of(getLogicalProperties()), + selectedPartitionId, true, candidateIndexIds, rollupSelected); + } + + public LogicalOlapScan withCandidateIndexIds(List candidateIndexIds) { + return new LogicalOlapScan(table, qualifier, Optional.empty(), Optional.of(getLogicalProperties()), + selectedPartitionIds, partitionPruned, candidateIndexIds, true); } @Override @@ -134,4 +148,27 @@ public class LogicalOlapScan extends LogicalRelation { public long getSelectedIndexId() { return selectedIndexId; } + + public boolean isRollupSelected() { + return rollupSelected; + } + + /** + * Should apply {@link org.apache.doris.nereids.rules.mv.SelectRollup} or not. + */ + public boolean shouldSelectRollup() { + switch (((OlapTable) table).getKeysType()) { + case AGG_KEYS: + case UNIQUE_KEYS: + return !rollupSelected; + default: + return false; + } + } + + @VisibleForTesting + public Optional getSelectRollupName() { + return rollupSelected ? Optional.ofNullable(((OlapTable) table).getIndexNameById(selectedIndexId)) + : Optional.empty(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java index da5cd93bba..79493ae1c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java @@ -18,18 +18,22 @@ package org.apache.doris.nereids.util; import org.apache.doris.nereids.trees.expressions.And; +import org.apache.doris.nereids.trees.expressions.Cast; import org.apache.doris.nereids.trees.expressions.CompoundPredicate; +import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.Or; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral; +import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -163,4 +167,62 @@ public class ExpressionUtils { } return minSlot; } + + /** + * Check whether the input expression is a {@link org.apache.doris.nereids.trees.expressions.Slot} + * or at least one {@link Cast} on a {@link org.apache.doris.nereids.trees.expressions.Slot} + *

+ * for example: + * - SlotReference to a column: + * col + * - Cast on SlotReference: + * cast(int_col as string) + * cast(cast(int_col as long) as string) + * + * @param expr input expression + * @return Return Optional[ExprId] of underlying slot reference if input expression is a slot or cast on slot. + * Otherwise, return empty optional result. + */ + public static Optional isSlotOrCastOnSlot(Expression expr) { + while (expr instanceof Cast) { + expr = expr.child(0); + } + + if (expr instanceof SlotReference) { + return Optional.of(((SlotReference) expr).getExprId()); + } else { + return Optional.empty(); + } + } + + /** + * Replace expression node in the expression tree by `replaceMap` in top-down manner. + * For example. + *

+     * input expression: a > 1
+     * replaceMap: a -> b + c
+     *
+     * output:
+     * b + c > 1
+     * 
+ */ + public static Expression replace(Expression expr, Map replaceMap) { + return expr.accept(ExpressionReplacer.INSTANCE, replaceMap); + } + + private static class ExpressionReplacer + extends DefaultExpressionRewriter> { + public static final ExpressionReplacer INSTANCE = new ExpressionReplacer(); + + private ExpressionReplacer() { + } + + @Override + public Expression visit(Expression expr, Map replaceMap) { + if (replaceMap.containsKey(expr)) { + return replaceMap.get(expr); + } + return super.visit(expr, replaceMap); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java b/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java index dbdcbfb765..01e871c1b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java @@ -268,6 +268,7 @@ public class MaterializedViewSelector { return selectedIndexId; } + // Step2: check all columns in compensating predicates are available in the view output private void checkCompensatingPredicates(Set columnsInPredicates, Map candidateIndexIdToMeta) { // When the query statement does not contain any columns in predicates, all candidate index can pass this check @@ -300,6 +301,7 @@ public class MaterializedViewSelector { * @param candidateIndexIdToMeta */ + // Step3: group by list in query is the subset of group by list in view or view contains no aggregation private void checkGrouping(OlapTable table, Set columnsInGrouping, Map candidateIndexIdToMeta) { Iterator> iterator = candidateIndexIdToMeta.entrySet().iterator(); @@ -351,6 +353,7 @@ public class MaterializedViewSelector { + Joiner.on(",").join(candidateIndexIdToMeta.keySet())); } + // Step4: aggregation functions are available in the view output private void checkAggregationFunction(OlapTable table, Set aggregatedColumnsInQueryOutput, Map candidateIndexIdToMeta) throws AnalysisException { Iterator> iterator = candidateIndexIdToMeta.entrySet().iterator(); @@ -387,6 +390,7 @@ public class MaterializedViewSelector { + Joiner.on(",").join(candidateIndexIdToMeta.keySet())); } + // Step5: columns required to compute output expr are available in the view output private void checkOutputColumns(Set columnNamesInQueryOutput, Map candidateIndexIdToMeta) { if (columnNamesInQueryOutput == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 2ee08fb24a..089f1c2e7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -237,6 +237,18 @@ public class OlapScanNode extends ScanNode { this.selectedPartitionIds = selectedPartitionIds; } + /** + * Only used for Neredis to set rollup or materialized view selection result. + */ + public void selectSelectIndexInfo( + long selectedIndexId, + boolean isPreAggregation, + String reasonOfPreAggregation) { + this.selectedIndexId = selectedIndexId; + this.isPreAggregation = isPreAggregation; + this.reasonOfPreAggregation = reasonOfPreAggregation; + } + /** * The function is used to directly select the index id of the base table as the * selectedIndexId. diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java index 29378e61c1..6cbd598262 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.glue.translator; +import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; @@ -30,12 +31,12 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.types.IntegerType; +import org.apache.doris.nereids.util.PlanConstructor; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanNode; import mockit.Injectable; -import mockit.Mocked; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -47,7 +48,8 @@ import java.util.Optional; public class PhysicalPlanTranslatorTest { @Test - public void testOlapPrune(@Mocked OlapTable t1, @Injectable LogicalProperties placeHolder) throws Exception { + public void testOlapPrune(@Injectable LogicalProperties placeHolder) throws Exception { + OlapTable t1 = PlanConstructor.newOlapTable(0, "t1", 0, KeysType.AGG_KEYS); List qualifier = new ArrayList<>(); qualifier.add("test"); List t1Output = new ArrayList<>(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/mv/SelectRollupTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/mv/SelectRollupTest.java new file mode 100644 index 0000000000..1cdf53d489 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/mv/SelectRollupTest.java @@ -0,0 +1,321 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.mv; + +import org.apache.doris.common.FeConstants; +import org.apache.doris.nereids.util.PatternMatchSupported; +import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.utframe.TestWithFeService; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +class SelectRollupTest extends TestWithFeService implements PatternMatchSupported { + + @Override + protected void runBeforeAll() throws Exception { + FeConstants.runningUnitTest = true; + createDatabase("test"); + connectContext.setDatabase("default_cluster:test"); + + createTable("CREATE TABLE `t` (\n" + + " `k1` int(11) NULL,\n" + + " `k2` int(11) NULL,\n" + + " `k3` int(11) NULL,\n" + + " `v1` int(11) SUM NULL\n" + + ") ENGINE=OLAP\n" + + "AGGREGATE KEY(`k1`, `k2`, `k3`)\n" + + "COMMENT 'OLAP'\n" + + "DISTRIBUTED BY HASH(`k1`) BUCKETS 3\n" + + "PROPERTIES (\n" + + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"in_memory\" = \"false\",\n" + + "\"storage_format\" = \"V2\",\n" + + "\"disable_auto_compaction\" = \"false\"\n" + + ");"); + addRollup("alter table t add rollup r1(k2, v1)"); + addRollup("alter table t add rollup r2(k2, k3, v1)"); + + // + // createTable("CREATE TABLE `only_base` (\n" + // + " `k1` int(11) NULL,\n" + // + " `k2` int(11) NULL,\n" + // + " `k3` int(11) NULL,\n" + // + " `v1` int(11) SUM NULL\n" + // + ") ENGINE=OLAP\n" + // + "AGGREGATE KEY(`k1`, `k2`, k3)\n" + // + "COMMENT 'OLAP'\n" + // + "DISTRIBUTED BY HASH(`k2`) BUCKETS 3\n" + // + "PROPERTIES (\n" + // + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + // + "\"in_memory\" = \"false\",\n" + // + "\"storage_format\" = \"V2\"\n" + // + ")"); + // + // createTable("CREATE TABLE `dup_tbl` (\n" + // + " `k1` int(11) NULL,\n" + // + " `k2` int(11) NULL,\n" + // + " `k3` int(11) NULL,\n" + // + " `v1` int(11) NULL\n" + // + ") ENGINE=OLAP\n" + // + "DUPLICATE KEY(`k1`, `k2`, `k3`)\n" + // + "COMMENT 'OLAP'\n" + // + "DISTRIBUTED BY HASH(`k2`) BUCKETS 3\n" + // + "PROPERTIES (\n" + // + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + // + "\"in_memory\" = \"false\",\n" + // + "\"storage_format\" = \"V2\",\n" + // + "\"disable_auto_compaction\" = \"false\"\n" + // + ");"); + // + // createTable("create table t0 (\n" + // + " user_id int, \n" + // + " date date, \n" + // + " timestamp datetime,\n" + // + " city varchar,\n" + // + " age int,\n" + // + " gender int,\n" + // + " cost bigint sum,\n" + // + " last_visit_time datetime replace,\n" + // + " max_dwell_time int max,\n" + // + " min_dwell_time int min\n" + // + " )\n" + // + "aggregate key (user_id, date, timestamp, city, age, gender)\n" + // + "distributed by hash(user_id) buckets 3 \n" + // + "properties('replication_num' = '1')"); + // + // + // // addRollup("ALTER TABLE t1 ADD ROLLUP r1(user_id, cost)"); + // + // createTable("CREATE TABLE `t1` (\n" + // + " `k1` int(11) NULL,\n" + // + " `k2` int(11) NULL,\n" + // + " `k3` int(11) NULL,\n" + // + " `k4` int(11) NULL,\n" + // + " `k5` int(11) NULL,\n" + // + " `v1` int(11) SUM NULL,\n" + // + " `v2` int(11) SUM NULL\n" + // + ") ENGINE=OLAP\n" + // + "AGGREGATE KEY(`k1`, `k2`, k3, k4, k5)\n" + // + "COMMENT 'OLAP'\n" + // + "DISTRIBUTED BY HASH(`k1`) BUCKETS 3\n" + // + "PROPERTIES (\n" + // + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + // + "\"in_memory\" = \"false\",\n" + // + "\"storage_format\" = \"V2\"\n" + // + ")"); + // addRollup("ALTER TABLE t1 ADD ROLLUP r1(k1, k2, k3, v1)"); + // addRollup("ALTER TABLE t1 ADD ROLLUP r2(k1, k2, v1)"); + // addRollup("ALTER TABLE t1 ADD ROLLUP r3(k1, v1)"); + // + // createTable("CREATE TABLE `t2` (\n" + // + " `k1` int(11) NULL,\n" + // + " `k2` int(11) NULL,\n" + // + " `k3` int(11) NULL,\n" + // + " `k4` int(11) NULL,\n" + // + " `k5` int(11) NULL,\n" + // + " `v1` int(11) SUM NULL,\n" + // + " `v2` int(11) SUM NULL\n" + // + ") ENGINE=OLAP\n" + // + "AGGREGATE KEY(`k1`, `k2`, k3, k4, k5)\n" + // + "COMMENT 'OLAP'\n" + // + "DISTRIBUTED BY HASH(`k1`) BUCKETS 3\n" + // + "PROPERTIES (\n" + // + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + // + "\"in_memory\" = \"false\",\n" + // + "\"storage_format\" = \"V2\"\n" + // + ")"); + // addRollup("ALTER TABLE t2 ADD ROLLUP r1(k1, k2, k3, v1)"); + // addRollup("ALTER TABLE t2 ADD ROLLUP r2(k1, k2, v1)"); + // addRollup("ALTER TABLE t2 ADD ROLLUP r3(k1, v1)"); + } + + @Test + public void testAggMatching() { + PlanChecker.from(connectContext) + .analyze(" select k2, sum(v1) from t group by k2") + .applyTopDown(new SelectRollup()) + .matches(logicalOlapScan().when(scan -> "r1".equals(scan.getSelectRollupName().get()))); + } + + @Test + public void testMatchingBase() { + PlanChecker.from(connectContext) + .analyze(" select k1, sum(v1) from t group by k1") + .applyTopDown(new SelectRollup()) + .matches(logicalOlapScan().when(scan -> "t".equals(scan.getSelectRollupName().get()))); + } + + @Test + void testAggFilterScan() { + PlanChecker.from(connectContext) + .analyze("select k2, sum(v1) from t where k3=0 group by k2") + .applyTopDown(new SelectRollup()) + .matches(logicalOlapScan().when(scan -> "r2".equals(scan.getSelectRollupName().get()))); + } + + @Test + void testTranslate() { + PlanChecker.from(connectContext) + .checkPlannerResult( + " select k2, sum(v1) from t group by k2", + planner -> { + } + ); + } + + @Disabled + @Test + public void testDup() throws Exception { + // System.out.println(getSQLPlanOrErrorMsg("select k2, sum(v1) from dup_tbl group by k2")); + PlanChecker.from(connectContext) + .checkPlannerResult( + "select k2, sum(v1) from dup_tbl group by k2", + planner -> { + }) + ; + } + + @Disabled + @Test + void testTranslate1() { + PlanChecker.from(connectContext) + .checkPlannerResult( + "select k1, v1 from only_base group by k1", + planner -> { + } + ); + } + + /////////////////////////////////////////////////////////////////////////// + // For debugging legacy rollup selecting. + // Add these cases for nereids rollup select in the future. + /////////////////////////////////////////////////////////////////////////// + @Disabled + @Test + public void test() throws Exception { + System.out.println(getSQLPlanOrErrorMsg("select k1, sum(v1) from only_base group by k1")); + } + + @Disabled + @Test + public void testLegacyRollupSelect() throws Exception { + String explain = getSQLPlanOrErrorMsg("SELECT user_id, sum(cost) FROM t0 GROUP BY user_id"); + System.out.println(explain); + } + + @Disabled + @Test + public void testDuplicateKey() throws Exception { + System.out.println(getSQLPlanOrErrorMsg("select k1, sum(v1) from detail group by k1")); + } + + // todo: add test cases + // 1. equal case: k2=0 + // 2. not equal case: k2 !=0 + @Disabled + @Test + public void test1() throws Exception { + // predicates pushdown scan: k2>0 + // rollup: r2 + String sql1 = "select k1, sum(v1) from t2 where k2>0 group by k1"; + System.out.println(getSQLPlanOrErrorMsg(sql1)); + } + + @Disabled + @Test + public void test2() throws Exception { + // pushdown: k2>k3 + // rollup: r1 + String sql2 = "select k1, sum(v1) from t2 where k2>k3 group by k1"; + System.out.println(getSQLPlanOrErrorMsg(sql2)); + } + + /** + *
+     * :VOlapScanNode
+     *      TABLE: t2(t2), PREAGGREGATION: OFF. Reason: conjunct on `c1` which is StorageEngine value column
+     *      PREDICATES: `k2` > 0
+     *      rollup: t2
+     *      partitions=1/1, tablets=3/3, tabletList=10017,10019,10021
+     *      cardinality=0, avgRowSize=12.0, numNodes=1
+     * 
+ */ + @Disabled + @Test + public void testFilterPushDownProject() throws Exception { + // failed to select rollup + String sql = "select c1, sum(v1) from" + + "(select k1 as c1, k2 as c2, k3 as c3, k4 as c4, v1 as v1 from t2) t" + + " where c2>0 group by c1"; + System.out.println(getSQLPlanOrErrorMsg(sql)); + } + + /** + *
+     *   0:VOlapScanNode
+     *      TABLE: t2(t2), PREAGGREGATION: OFF. Reason: aggExpr.getChild(0)[(SlotRef{slotDesc=
+     *      SlotDescriptor{id=1, parent=0, col=v1, type=INT, materialized=true, byteSize=0,
+     *      byteOffset=-1, nullIndicatorByte=0, nullIndicatorBit=0, slotIdx=0}, col=v1, label=`v1`, tblName=null}
+     *      SlotRef{slotDesc=SlotDescriptor{id=2, parent=0, col=v2, type=INT, materialized=true,
+     *      byteSize=0, byteOffset=-1, nullIndicatorByte=0, nullIndicatorBit=0, slotIdx=0}, col=v2,
+     *      label=`v2`, tblName=null})] is not SlotRef or CastExpr|CaseExpr
+     *      rollup: t2
+     *      partitions=1/1, tablets=3/3, tabletList=10017,10019,10021
+     *      cardinality=0, avgRowSize=12.0, numNodes=1
+     * 
+ */ + @Disabled + @Test + public void testMultiAggColumns() throws Exception { + System.out.println(getSQLPlanOrErrorMsg("select k1, sum(v1 + v2) from t2 group by k1")); + } + + + /////////////////////////////////////////////////////////////////////////// + // Test turn off pre-agg in legacy logic. + // Add these cases for nereids rollup select in the future. + /////////////////////////////////////////////////////////////////////////// + + /** + *
+     *   0:VOlapScanNode
+     *      TABLE: t1(t1), PREAGGREGATION: OFF. Reason: No AggregateInfo
+     * 
+ */ + @Disabled + @Test + public void testNoAgg() throws Exception { + System.out.println(getSQLPlanOrErrorMsg("select k1, v1 from t1")); + } + + @Disabled + @Test + public void testJoin() throws Exception { + System.out.println( + getSQLPlanOrErrorMsg("select t1.k1, t2.k2, sum(t1.v1), sum(t2.v1) from " + + "t1 join t2 on t1.k1=t2.k1 group by t1.k1, t2.k2;")); + } + + @Disabled + @Test + public void testOuterJoin() throws Exception { + System.out.println(getSQLPlanOrErrorMsg( + "select t1.k1, t2.k2, sum(t1.v1), sum(t2.v1) from t1 outer join t2 on t1.k1=t2.k1")); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java index 5493ca9c6a..6292a24c79 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java @@ -81,8 +81,10 @@ public class PlanToStringTest { @Test public void testLogicalOlapScan() { LogicalOlapScan plan = PlanConstructor.newLogicalOlapScan(0, "table", 0); + System.out.println(plan.toString()); Assertions.assertTrue( - plan.toString().matches("LogicalOlapScan \\( qualified=db\\.table, output=\\[id#\\d+, name#\\d+] \\)")); + plan.toString().matches("LogicalOlapScan \\( qualified=db\\.table, " + + "output=\\[id#\\d+, name#\\d+], candidateIndexIds=\\[], selectedIndexId=-1 \\)")); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java index d23d43e3e3..c0d909e2eb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java @@ -18,10 +18,14 @@ package org.apache.doris.nereids.util; import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.jobs.JobContext; import org.apache.doris.nereids.memo.Group; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.memo.Memo; +import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.pattern.GroupExpressionMatching; import org.apache.doris.nereids.pattern.MatchingContext; import org.apache.doris.nereids.pattern.PatternDescriptor; @@ -31,7 +35,9 @@ import org.apache.doris.nereids.rules.RuleFactory; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; import com.google.common.base.Supplier; import com.google.common.collect.Lists; @@ -258,6 +264,15 @@ public class PlanChecker { return this; } + public PlanChecker checkPlannerResult(String sql, Consumer consumer) { + LogicalPlan parsed = new NereidsParser().parseSingle(sql); + NereidsPlanner nereidsPlanner = new NereidsPlanner( + new StatementContext(connectContext, new OriginStatement(sql, 0))); + nereidsPlanner.plan(LogicalPlanAdapter.of(parsed)); + consumer.accept(nereidsPlanner); + return this; + } + public static PlanChecker from(ConnectContext connectContext) { return new PlanChecker(connectContext); } @@ -274,4 +289,8 @@ public class PlanChecker { public CascadesContext getCascadesContext() { return cascadesContext; } + + public Plan getPlan() { + return cascadesContext.getMemo().copyOut(); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanConstructor.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanConstructor.java index 4bad76c26e..cae47c6115 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanConstructor.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanConstructor.java @@ -74,6 +74,10 @@ public class PlanConstructor { } public static OlapTable newOlapTable(long tableId, String tableName, int hashColumn) { + return newOlapTable(tableId, tableName, hashColumn, KeysType.PRIMARY_KEYS); + } + + public static OlapTable newOlapTable(long tableId, String tableName, int hashColumn, KeysType keysType) { List columns = ImmutableList.of( new Column("id", Type.INT, true, AggregateType.NONE, "0", ""), new Column("name", Type.STRING, true, AggregateType.NONE, "", "")); @@ -82,13 +86,13 @@ public class PlanConstructor { ImmutableList.of(columns.get(hashColumn))); OlapTable table = new OlapTable(tableId, tableName, columns, - KeysType.PRIMARY_KEYS, new PartitionInfo(), hashDistributionInfo); + keysType, new PartitionInfo(), hashDistributionInfo); table.setIndexMeta(-1, tableName, table.getFullSchema(), 0, 0, (short) 0, TStorageType.COLUMN, - KeysType.PRIMARY_KEYS); + keysType); return table; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 57b68ca4bc..1e10f52a28 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -17,7 +17,9 @@ package org.apache.doris.utframe; +import org.apache.doris.alter.AlterJobV2; import org.apache.doris.analysis.AlterSqlBlockRuleStmt; +import org.apache.doris.analysis.AlterTableStmt; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreatePolicyStmt; @@ -68,6 +70,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.io.FileUtils; +import org.junit.Assert; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -499,4 +502,27 @@ public abstract class TestWithFeService { connectContext.setCurrentUserIdentity(user); connectContext.setQualifiedUser(SystemInfoService.DEFAULT_CLUSTER + ":" + userName); } + + protected void addRollup(String sql) throws Exception { + AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext); + Env.getCurrentEnv().alterTable(alterTableStmt); + checkAlterJob(); + // waiting table state to normal + Thread.sleep(100); + } + + private void checkAlterJob() throws InterruptedException { + // check alter job + Map alterJobs = Env.getCurrentEnv().getMaterializedViewHandler().getAlterJobsV2(); + for (AlterJobV2 alterJobV2 : alterJobs.values()) { + while (!alterJobV2.getJobState().isFinalState()) { + System.out.println("alter job " + alterJobV2.getDbId() + + " is running. state: " + alterJobV2.getJobState()); + Thread.sleep(100); + } + System.out.println("alter job " + alterJobV2.getDbId() + " is done. state: " + alterJobV2.getJobState()); + Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState()); + } + } + } diff --git a/regression-test/data/nereids_syntax_p0/rollup.out b/regression-test/data/nereids_syntax_p0/rollup.out new file mode 100644 index 0000000000..71d96f5919 --- /dev/null +++ b/regression-test/data/nereids_syntax_p0/rollup.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !rollup1 -- +2 6 +3 4 + +-- !rollup2 -- +1 6 +2 4 + diff --git a/regression-test/suites/nereids_syntax_p0/rollup.groovy b/regression-test/suites/nereids_syntax_p0/rollup.groovy new file mode 100644 index 0000000000..aabb980138 --- /dev/null +++ b/regression-test/suites/nereids_syntax_p0/rollup.groovy @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +suite("rollup") { + sql """ + CREATE TABLE `rollup_t1` ( + `k1` int(11) NULL, + `k2` int(11) NULL, + `k3` int(11) NULL, + `v1` int(11) SUM NULL + ) ENGINE=OLAP + AGGREGATE KEY(`k1`, `k2`, `k3`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2", + "disable_auto_compaction" = "false" + ); + """ + + sql "ALTER TABLE rollup_t1 ADD ROLLUP r1(k2, v1)" + + + def getJobRollupState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE ROLLUP WHERE TableName='${tableName}' ORDER BY CreateTime DESC LIMIT 1; """ + return jobStateResult[0][8] + } + + int max_try_secs = 60 + while (max_try_secs--) { + String res = getJobRollupState("rollup_t1") + if (res == "FINISHED") { + break + } else { + Thread.sleep(2000) + if (max_try_secs < 1) { + println "test timeout," + "state:" + res + assertEquals("FINISHED",res) + } + } + } + Thread.sleep(200) + + sql "insert into rollup_t1 values(1, 2, 3, 4)" + sql "insert into rollup_t1 values(1, 2, 3, 2)" + sql "insert into rollup_t1 values(2, 3, 4, 1)" + sql "insert into rollup_t1 values(2, 3, 4, 3)" + + sql "set enable_vectorized_engine=true" + + sql "set enable_nereids_planner=true" + + explain { + sql("select k2, sum(v1) from rollup_t1 group by k2") + contains("rollup_t1(r1)") + } + + explain { + sql("select k1, sum(v1) from rollup_t1 group by k1") + contains("rollup_t1(rollup_t1)") + } + + order_qt_rollup1 "select k2, sum(v1) from rollup_t1 group by k2" + + order_qt_rollup2 "select k1, sum(v1) from rollup_t1 group by k1" +}