From 2b62ac2fefc0520a5bc6420294440fd441269037 Mon Sep 17 00:00:00 2001 From: Shuo Wang Date: Fri, 9 Sep 2022 18:14:31 +0800 Subject: [PATCH] [Feature](Nereids) Main framework for selecting rollup index. (#12464) # Proposed changes First step of #12303 ## Problem summary This is the first step for supporting rollup index selection for aggregate/unique key OLAP table. This PR aims to select rollup index when the aggregate node is present and the aggregate function matches the value type. So pre-aggregation is turned on by default. Cases that pre-aggregation should be turned off will be addressed in the next PR. Main steps for rollup index selection: 1. filter rollup indexes with all the required columns. 2. filter rollup indexes that match the key prefix most. 3. order the rollup indexes by row count, column count, rollup index id. TODO remaining: 1. address cases that pre-aggregation should be turned off. (next PR) 2. add more test cases. Refactor - Add `Project.getSlotToProducer` to extract a map from the project output slot to its producing expression. - Add `Filter.getConjuncts` to split the filter condition to conjunctive predicates. - Move the usage of `ExpressionReplacer` to `ExpressionUtils.replace(expr, replaceMap)` to simplify the code. --- .../apache/doris/nereids/NereidsPlanner.java | 10 + .../nereids/glue/LogicalPlanAdapter.java | 5 + .../translator/PhysicalPlanTranslator.java | 22 +- .../doris/nereids/jobs/batch/RewriteJob.java | 2 + .../apache/doris/nereids/rules/RuleType.java | 5 + .../nereids/rules/analysis/ResolveHaving.java | 5 +- .../doris/nereids/rules/mv/SelectRollup.java | 410 ++++++++++++++++++ .../rules/rewrite/AggregateDisassemble.java | 6 +- .../rewrite/logical/NormalizeAggregate.java | 4 +- .../rewrite/logical/SwapFilterAndProject.java | 35 +- .../visitor/ExpressionReplacer.java | 39 -- .../nereids/trees/plans/algebra/Filter.java | 7 + .../nereids/trees/plans/algebra/Project.java | 24 + .../trees/plans/logical/LogicalAggregate.java | 10 +- .../trees/plans/logical/LogicalOlapScan.java | 61 ++- .../doris/nereids/util/ExpressionUtils.java | 62 +++ .../planner/MaterializedViewSelector.java | 4 + .../apache/doris/planner/OlapScanNode.java | 12 + .../PhysicalPlanTranslatorTest.java | 6 +- .../nereids/rules/mv/SelectRollupTest.java | 321 ++++++++++++++ .../nereids/trees/plans/PlanToStringTest.java | 4 +- .../doris/nereids/util/PlanChecker.java | 19 + .../doris/nereids/util/PlanConstructor.java | 8 +- .../doris/utframe/TestWithFeService.java | 26 ++ .../data/nereids_syntax_p0/rollup.out | 9 + .../suites/nereids_syntax_p0/rollup.groovy | 83 ++++ 26 files changed, 1106 insertions(+), 93 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/rules/mv/SelectRollup.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ExpressionReplacer.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/nereids/rules/mv/SelectRollupTest.java create mode 100644 regression-test/data/nereids_syntax_p0/rollup.out create mode 100644 regression-test/suites/nereids_syntax_p0/rollup.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 7bcea8a1e5..2d3596c8ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -40,6 +40,7 @@ import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.Planner; import org.apache.doris.planner.ScanNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import java.util.ArrayList; @@ -84,6 +85,15 @@ public class NereidsPlanner extends Planner { logicalPlanAdapter.setColLabels(columnLabelList); } + @VisibleForTesting + public void plan(StatementBase queryStmt) { + try { + plan(queryStmt, statementContext.getConnectContext().getSessionVariable().toThrift()); + } catch (UserException e) { + throw new RuntimeException(e); + } + } + /** * Do analyze and optimize for query plan. * diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java index 9fd2d8667e..76d101148d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/LogicalPlanAdapter.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.OutFileClause; import org.apache.doris.analysis.Queriable; import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.analysis.StatementBase; +import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import java.util.ArrayList; @@ -82,4 +83,8 @@ public class LogicalPlanAdapter extends StatementBase implements Queriable { // TODO: generate real digest return ""; } + + public static LogicalPlanAdapter of(Plan plan) { + return new LogicalPlanAdapter((LogicalPlan) plan); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 7219a435a5..d47bf1c1aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -297,11 +297,25 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor> having, Resolver resolver) { LogicalAggregate aggregate = having.child(); - Expression newPredicates = ExpressionReplacer.INSTANCE - .visit(having.getPredicates(), resolver.getSubstitution()); + Expression newPredicates = ExpressionUtils.replace(having.getPredicates(), resolver.getSubstitution()); List newOutputExpressions = Streams.concat( aggregate.getOutputExpressions().stream(), resolver.getNewOutputSlots().stream() ).collect(Collectors.toList()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/mv/SelectRollup.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/mv/SelectRollup.java new file mode 100644 index 0000000000..03cc0e4b45 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/mv/SelectRollup.java @@ -0,0 +1,410 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.mv; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.nereids.annotation.Developing; +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory; +import org.apache.doris.nereids.trees.expressions.ComparisonPredicate; +import org.apache.doris.nereids.trees.expressions.EqualTo; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.InPredicate; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.NullSafeEqual; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.functions.AggregateFunction; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.trees.plans.algebra.Project; +import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; +import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.util.ExpressionUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Select rollup. + */ +@Developing +public class SelectRollup implements RewriteRuleFactory { + /////////////////////////////////////////////////////////////////////////// + // All the patterns + /////////////////////////////////////////////////////////////////////////// + @Override + public List buildRules() { + return ImmutableList.of( + // only agg above scan + // Aggregate(Scan) + logicalAggregate(logicalOlapScan().when(LogicalOlapScan::shouldSelectRollup)).then(agg -> { + LogicalOlapScan scan = agg.child(); + return agg.withChildren( + scan.withCandidateIndexIds( + selectCandidateRollupIds( + scan, + agg.getInputSlots(), + ImmutableList.of(), + extractAggFunctionAndReplaceSlot(agg, Optional.empty()) + ) + ) + ); + }).toRule(RuleType.ROLLUP_AGG_SCAN), + + // filter could push down scan. + // Aggregate(Filter(Scan)) + logicalAggregate(logicalFilter(logicalOlapScan().when(LogicalOlapScan::shouldSelectRollup))) + .then(agg -> { + LogicalFilter filter = agg.child(); + LogicalOlapScan scan = filter.child(); + ImmutableSet requiredSlots = ImmutableSet.builder() + .addAll(agg.getInputSlots()) + .addAll(filter.getInputSlots()) + .build(); + return agg.withChildren(filter.withChildren( + scan.withCandidateIndexIds( + selectCandidateRollupIds( + scan, + requiredSlots, + filter.getConjuncts(), + extractAggFunctionAndReplaceSlot(agg, Optional.empty()) + ) + ) + )); + }).toRule(RuleType.ROLLUP_AGG_FILTER_SCAN), + + // column pruning or other projections such as alias, etc. + // Aggregate(Project(Scan)) + logicalAggregate(logicalProject(logicalOlapScan().when(LogicalOlapScan::shouldSelectRollup))) + .then(agg -> { + LogicalProject project = agg.child(); + LogicalOlapScan scan = project.child(); + return agg.withChildren( + project.withChildren( + scan.withCandidateIndexIds( + selectCandidateRollupIds( + scan, + project.getInputSlots(), + ImmutableList.of(), + extractAggFunctionAndReplaceSlot(agg, + Optional.of(project)) + ) + ) + ) + ); + }).toRule(RuleType.ROLLUP_AGG_PROJECT_SCAN), + + // filter could push down and project. + // Aggregate(Project(Filter(Scan))) + logicalAggregate(logicalProject(logicalFilter(logicalOlapScan() + .when(LogicalOlapScan::shouldSelectRollup)))).then(agg -> { + LogicalProject> project = agg.child(); + LogicalFilter filter = project.child(); + LogicalOlapScan scan = filter.child(); + return agg.withChildren(project.withChildren(filter.withChildren( + scan.withCandidateIndexIds(selectCandidateRollupIds( + scan, + agg.getInputSlots(), + filter.getConjuncts(), + extractAggFunctionAndReplaceSlot(agg, Optional.of(project)) + ) + ) + ))); + }).toRule(RuleType.ROLLUP_AGG_PROJECT_FILTER_SCAN), + + // filter can't push down + // Aggregate(Filter(Project(Scan))) + logicalAggregate(logicalFilter(logicalProject(logicalOlapScan() + .when(LogicalOlapScan::shouldSelectRollup)))).then(agg -> { + LogicalFilter> filter = agg.child(); + LogicalProject project = filter.child(); + LogicalOlapScan scan = project.child(); + return agg.withChildren(filter.withChildren(project.withChildren( + scan.withCandidateIndexIds(selectCandidateRollupIds( + scan, + project.getInputSlots(), + ImmutableList.of(), + extractAggFunctionAndReplaceSlot(agg, Optional.of(project)) + ) + ) + ))); + }).toRule(RuleType.ROLLUP_AGG_FILTER_PROJECT_SCAN) + ); + } + + /////////////////////////////////////////////////////////////////////////// + // Main entrance of select rollup + /////////////////////////////////////////////////////////////////////////// + + /** + * Select candidate rollup ids. + *

+ * todo: 0. turn off pre agg, checking input aggregate functions and group by expressions, etc. + * 1. rollup contains all the required output slots. + * 2. match the most prefix index if pushdown predicates present. + */ + private List selectCandidateRollupIds( + LogicalOlapScan olapScan, + Set requiredScanOutput, + List predicates, + // not used now, reserved for checking aggregate function type match. + List aggregateFunctions) { + Preconditions.checkArgument(Sets.newHashSet(olapScan.getOutput()).containsAll(requiredScanOutput), + "Scan's output should contains all the input required scan output."); + + OlapTable table = olapScan.getTable(); + + // Scan slot exprId -> slot name + Map exprIdToName = olapScan.getOutput() + .stream() + .collect(Collectors.toMap(NamedExpression::getExprId, NamedExpression::getName)); + + // get required column names in metadata. + Set requiredColumnNames = requiredScanOutput + .stream() + .map(slot -> exprIdToName.get(slot.getExprId())) + .collect(Collectors.toSet()); + + // 1. filter rollup contains all the required columns by column name. + List containAllRequiredColumns = table.getVisibleIndex().stream() + .filter(rollup -> table.getSchemaByIndexId(rollup.getId(), true) + .stream() + .map(Column::getName) + .collect(Collectors.toSet()) + .containsAll(requiredColumnNames) + ).collect(Collectors.toList()); + + Map> split = filterCanUsePrefixIndexAndSplitByEquality(predicates, exprIdToName); + Set equalColNames = split.getOrDefault(true, ImmutableSet.of()); + Set nonEqualColNames = split.getOrDefault(false, ImmutableSet.of()); + + // 2. find matching key prefix most. + List matchingKeyPrefixMost; + if (!(equalColNames.isEmpty() && nonEqualColNames.isEmpty())) { + List matchingResult = matchKeyPrefixMost(table, containAllRequiredColumns, + equalColNames, nonEqualColNames); + matchingKeyPrefixMost = matchingResult.isEmpty() ? containAllRequiredColumns : matchingResult; + } else { + matchingKeyPrefixMost = containAllRequiredColumns; + } + + List partitionIds = olapScan.getSelectedPartitionIds(); + // 3. sort by row count, column count and index id. + return matchingKeyPrefixMost.stream() + .map(MaterializedIndex::getId) + .sorted(Comparator + // compare by row count + .comparing(rid -> partitionIds.stream() + .mapToLong(pid -> table.getPartition(pid).getIndex((Long) rid).getRowCount()) + .sum()) + // compare by column count + .thenComparing(rid -> table.getSchemaByIndexId((Long) rid).size()) + // compare by rollup index id + .thenComparing(rid -> (Long) rid)) + .collect(Collectors.toList()); + } + + /////////////////////////////////////////////////////////////////////////// + // Matching key prefix + /////////////////////////////////////////////////////////////////////////// + private List matchKeyPrefixMost( + OlapTable table, + List rollups, + Set equalColumns, + Set nonEqualColumns) { + TreeMap> collect = rollups.stream() + .collect(Collectors.toMap( + rollup -> rollupKeyPrefixMatchCount(table, rollup, equalColumns, nonEqualColumns), + Lists::newArrayList, + (l1, l2) -> { + l1.addAll(l2); + return l1; + }, + TreeMap::new) + ); + return collect.descendingMap().firstEntry().getValue(); + } + + private int rollupKeyPrefixMatchCount( + OlapTable table, + MaterializedIndex rollup, + Set equalColNames, + Set nonEqualColNames) { + int matchCount = 0; + for (Column column : table.getSchemaByIndexId(rollup.getId())) { + if (equalColNames.contains(column.getName())) { + matchCount++; + } else if (nonEqualColNames.contains(column.getName())) { + // Unequivalence predicate's columns can match only first column in rollup. + matchCount++; + break; + } else { + break; + } + } + return matchCount; + } + + /////////////////////////////////////////////////////////////////////////// + // Split conjuncts into equal-to and non-equal-to. + /////////////////////////////////////////////////////////////////////////// + + /** + * Filter the input conjuncts those can use prefix and split into 2 groups: is equal-to or non-equal-to predicate + * when comparing the key column. + */ + private Map> filterCanUsePrefixIndexAndSplitByEquality( + List conjunct, Map exprIdToColName) { + return conjunct.stream() + .map(expr -> PredicateChecker.canUsePrefixIndex(expr, exprIdToColName)) + .filter(result -> !result.equals(CheckResult.FAILURE)) + .collect(Collectors.groupingBy( + result -> result.type == ResultType.SUCCESS_EQUAL, + Collectors.mapping(result -> result.colName, Collectors.toSet()) + )); + } + + private enum ResultType { + FAILURE, + SUCCESS_EQUAL, + SUCCESS_NON_EQUAL, + } + + private static class CheckResult { + public static final CheckResult FAILURE = new CheckResult(null, ResultType.FAILURE); + private final String colName; + private final ResultType type; + + private CheckResult(String colName, ResultType result) { + this.colName = colName; + this.type = result; + } + + public static CheckResult createEqual(String name) { + return new CheckResult(name, ResultType.SUCCESS_EQUAL); + } + + public static CheckResult createNonEqual(String name) { + return new CheckResult(name, ResultType.SUCCESS_NON_EQUAL); + } + } + + /** + * Check if an expression could prefix key index. + */ + private static class PredicateChecker extends ExpressionVisitor> { + private static final PredicateChecker INSTANCE = new PredicateChecker(); + + private PredicateChecker() { + } + + public static CheckResult canUsePrefixIndex(Expression expression, Map exprIdToName) { + return expression.accept(INSTANCE, exprIdToName); + } + + @Override + public CheckResult visit(Expression expr, Map context) { + return CheckResult.FAILURE; + } + + @Override + public CheckResult visitInPredicate(InPredicate in, Map context) { + Optional slotOrCastOnSlot = ExpressionUtils.isSlotOrCastOnSlot(in.getCompareExpr()); + if (slotOrCastOnSlot.isPresent() && in.getOptions().stream().allMatch(Literal.class::isInstance)) { + return CheckResult.createEqual(context.get(slotOrCastOnSlot.get())); + } else { + return CheckResult.FAILURE; + } + } + + @Override + public CheckResult visitComparisonPredicate(ComparisonPredicate cp, Map context) { + if (cp instanceof EqualTo || cp instanceof NullSafeEqual) { + return check(cp, context, CheckResult::createEqual); + } else { + return check(cp, context, CheckResult::createNonEqual); + } + } + + private CheckResult check(ComparisonPredicate cp, Map exprIdToColumnName, + Function resultMapper) { + return check(cp).map(exprId -> resultMapper.apply(exprIdToColumnName.get(exprId))) + .orElse(CheckResult.FAILURE); + } + + private Optional check(ComparisonPredicate cp) { + Optional exprId = check(cp.left(), cp.right()); + return exprId.isPresent() ? exprId : check(cp.right(), cp.left()); + } + + private Optional check(Expression maybeSlot, Expression maybeConst) { + Optional exprIdOpt = ExpressionUtils.isSlotOrCastOnSlot(maybeSlot); + return exprIdOpt.isPresent() && maybeConst.isConstant() ? exprIdOpt : Optional.empty(); + } + } + + /** + * Do aggregate function extraction and replace aggregate function's input slots by underlying project. + *

+ * 1. extract aggregate functions in aggregate plan. + *

+ * 2. replace aggregate function's input slot by underlying project expression if project is present. + *

+ * For example: + *

+     * input arguments:
+     * agg: Aggregate(sum(v) as sum_value)
+     * underlying project: Project(a + b as v)
+     *
+     * output:
+     * sum(a + b)
+     * 
+ */ + private List extractAggFunctionAndReplaceSlot( + LogicalAggregate agg, + Optional> project) { + Optional> slotToProducerOpt = project.map(Project::getSlotToProducer); + return agg.getOutputExpressions().stream() + // extract aggregate functions. + .flatMap(e -> e.>collect(AggregateFunction.class::isInstance).stream()) + // replace aggregate function's input slot by its producing expression. + .map(expr -> slotToProducerOpt.map(slotToExpressions + -> (AggregateFunction) ExpressionUtils.replace(expr, slotToExpressions)) + .orElse(expr) + ) + .collect(Collectors.toList()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggregateDisassemble.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggregateDisassemble.java index 87363e4273..7d68752e07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggregateDisassemble.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggregateDisassemble.java @@ -24,10 +24,10 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.AggregateFunction; -import org.apache.doris.nereids.trees.expressions.visitor.ExpressionReplacer; import org.apache.doris.nereids.trees.plans.AggPhase; import org.apache.doris.nereids.trees.plans.GroupPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.util.ExpressionUtils; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -112,11 +112,11 @@ public class AggregateDisassemble extends OneRewriteRuleFactory { // 3. replace expression in globalOutputExprs and globalGroupByExprs List globalOutputExprs = aggregate.getOutputExpressions().stream() - .map(e -> ExpressionReplacer.INSTANCE.visit(e, inputSubstitutionMap)) + .map(e -> ExpressionUtils.replace(e, inputSubstitutionMap)) .map(NamedExpression.class::cast) .collect(Collectors.toList()); List globalGroupByExprs = localGroupByExprs.stream() - .map(e -> ExpressionReplacer.INSTANCE.visit(e, inputSubstitutionMap)).collect(Collectors.toList()); + .map(e -> ExpressionUtils.replace(e, inputSubstitutionMap)).collect(Collectors.toList()); // 4. generate new plan LogicalAggregate localAggregate = new LogicalAggregate<>( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/NormalizeAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/NormalizeAggregate.java index 4d42fff964..0fe139b85b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/NormalizeAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/NormalizeAggregate.java @@ -25,10 +25,10 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.AggregateFunction; -import org.apache.doris.nereids.trees.expressions.visitor.ExpressionReplacer; import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.util.ExpressionUtils; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -126,7 +126,7 @@ public class NormalizeAggregate extends OneRewriteRuleFactory { root = new LogicalAggregate<>(newKeys, newOutputs, aggregate.isDisassembled(), true, aggregate.getAggPhase(), root); List projections = outputs.stream() - .map(e -> ExpressionReplacer.INSTANCE.visit(e, substitutionMap)) + .map(e -> ExpressionUtils.replace(e, substitutionMap)) .map(NamedExpression.class::cast) .collect(Collectors.toList()); root = new LogicalProject<>(projections, root); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SwapFilterAndProject.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SwapFilterAndProject.java index d8c1e56c13..7d6b0e46f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SwapFilterAndProject.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SwapFilterAndProject.java @@ -20,37 +20,30 @@ package org.apache.doris.nereids.rules.rewrite.logical; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory; -import org.apache.doris.nereids.trees.expressions.Alias; -import org.apache.doris.nereids.trees.expressions.Expression; -import org.apache.doris.nereids.trees.expressions.NamedExpression; -import org.apache.doris.nereids.trees.expressions.visitor.ExpressionReplacer; import org.apache.doris.nereids.trees.plans.GroupPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; -import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import org.apache.doris.nereids.util.ExpressionUtils; /** - * Rewrite filter -> project to project -> filter. + * Push down filter through project. + * input: + * filter(a>2, b=0) -> project(c+d as a, e as b) + * output: + * project(c+d as a, e as b) -> filter(c+d>2, e=0). */ public class SwapFilterAndProject extends OneRewriteRuleFactory { @Override public Rule build() { - return logicalFilter(logicalProject()).thenApply(ctx -> { - LogicalFilter> filter = ctx.root; + return logicalFilter(logicalProject()).then(filter -> { LogicalProject project = filter.child(); - List namedExpressionList = project.getProjects(); - Map slotToAlias = new HashMap<>(); - namedExpressionList.stream().filter(Alias.class::isInstance).forEach(s -> { - slotToAlias.put(s.toSlot(), ((Alias) s).child()); - }); - Expression rewrittenPredicate = ExpressionReplacer.INSTANCE.visit(filter.getPredicates(), slotToAlias); - LogicalFilter rewrittenFilter = - new LogicalFilter(rewrittenPredicate, project.child()); - return new LogicalProject(project.getProjects(), rewrittenFilter); + return new LogicalProject<>( + project.getProjects(), + new LogicalFilter<>( + ExpressionUtils.replace(filter.getPredicates(), project.getSlotToProducer()), + project.child() + ) + ); }).toRule(RuleType.SWAP_FILTER_AND_PROJECT); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ExpressionReplacer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ExpressionReplacer.java deleted file mode 100644 index f3fd35d695..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/visitor/ExpressionReplacer.java +++ /dev/null @@ -1,39 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.nereids.trees.expressions.visitor; - -import org.apache.doris.nereids.trees.expressions.Expression; - -import java.util.Map; - -/** - * replace expr nodes by substitutionMap - */ -public class ExpressionReplacer - extends DefaultExpressionRewriter> { - public static final ExpressionReplacer INSTANCE = new ExpressionReplacer(); - - @Override - public Expression visit(Expression expr, Map substitutionMap) { - if (substitutionMap.containsKey(expr)) { - return substitutionMap.get(expr); - } - return super.visit(expr, substitutionMap); - } -} - diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Filter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Filter.java index f4371eb4f7..1e4048f582 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Filter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Filter.java @@ -18,10 +18,17 @@ package org.apache.doris.nereids.trees.plans.algebra; import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.util.ExpressionUtils; + +import java.util.List; /** * Common interface for logical/physical filter. */ public interface Filter { Expression getPredicates(); + + default List getConjuncts() { + return ExpressionUtils.extractConjunction(getPredicates()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Project.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Project.java index 1203998a5b..2cc9fec4b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Project.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Project.java @@ -17,13 +17,37 @@ package org.apache.doris.nereids.trees.plans.algebra; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; /** * Common interface for logical/physical project. */ public interface Project { List getProjects(); + + /** + * Generate a map that the key is the project output slot, corresponding value is the expression produces the slot. + * Note that alias is striped off. + */ + default Map getSlotToProducer() { + return getProjects() + .stream() + .collect(Collectors.toMap( + NamedExpression::toSlot, + namedExpr -> { + if (namedExpr instanceof Alias) { + return ((Alias) namedExpr).child(); + } else { + return namedExpr; + } + }) + ); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java index f847bdd5d6..7b13ace319 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java @@ -39,10 +39,12 @@ import java.util.Optional; /** * Logical Aggregate plan. *

- * eg:select a, sum(b), c from table group by a, c; - * groupByExprList: Column field after group by. eg: a, c; - * outputExpressionList: Column field after select. eg: a, sum(b), c; - * partitionExprList: Column field after partition by. + * For example SQL: + *

+ * select a, sum(b), c from table group by a, c; + *

+ * groupByExpressions: Column field after group by. eg: a, c; + * outputExpressions: Column field after select. eg: a, sum(b), c; *

* Each agg node only contains the select statement field of the same layer, * and other agg nodes in the subquery contain. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java index 67436166f5..36350f8fee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java @@ -27,9 +27,11 @@ import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.commons.collections.CollectionUtils; import java.util.List; import java.util.Optional; @@ -43,37 +45,42 @@ public class LogicalOlapScan extends LogicalRelation { private final List selectedTabletId; private final boolean partitionPruned; + private final List candidateIndexIds; + private final boolean rollupSelected; + public LogicalOlapScan(OlapTable table) { this(table, ImmutableList.of()); } public LogicalOlapScan(OlapTable table, List qualifier) { this(table, qualifier, Optional.empty(), Optional.empty(), - table.getPartitionIds(), false); + table.getPartitionIds(), false, ImmutableList.of(), false); } public LogicalOlapScan(Table table, List qualifier) { this(table, qualifier, Optional.empty(), Optional.empty(), - ((OlapTable) table).getPartitionIds(), false); + ((OlapTable) table).getPartitionIds(), false, ImmutableList.of(), false); } /** * Constructor for LogicalOlapScan. - * - * @param table Doris table - * @param qualifier table name qualifier */ public LogicalOlapScan(Table table, List qualifier, Optional groupExpression, Optional logicalProperties, List selectedPartitionIdList, - boolean partitionPruned) { + boolean partitionPruned, List candidateIndexIds, boolean rollupSelected) { super(PlanType.LOGICAL_OLAP_SCAN, table, qualifier, groupExpression, logicalProperties, selectedPartitionIdList); - this.selectedIndexId = getTable().getBaseIndexId(); + // TODO: use CBO manner to select best index id, according to index's statistics info, + // revisit this after rollup and materialized view selection are fully supported. + this.selectedIndexId = CollectionUtils.isEmpty(candidateIndexIds) + ? getTable().getBaseIndexId() : candidateIndexIds.get(0); this.selectedTabletId = Lists.newArrayList(); for (Partition partition : getTable().getAllPartitions()) { selectedTabletId.addAll(partition.getBaseIndex().getTabletIdsInOrder()); } this.partitionPruned = partitionPruned; + this.candidateIndexIds = candidateIndexIds; + this.rollupSelected = rollupSelected; } @Override @@ -86,7 +93,9 @@ public class LogicalOlapScan extends LogicalRelation { public String toString() { return Utils.toSqlString("LogicalOlapScan", "qualified", qualifiedName(), - "output", getOutput() + "output", getOutput(), + "candidateIndexIds", candidateIndexIds, + "selectedIndexId", selectedIndexId ); } @@ -104,18 +113,23 @@ public class LogicalOlapScan extends LogicalRelation { @Override public Plan withGroupExpression(Optional groupExpression) { return new LogicalOlapScan(table, qualifier, groupExpression, Optional.of(getLogicalProperties()), - selectedPartitionIds, partitionPruned); + selectedPartitionIds, partitionPruned, candidateIndexIds, rollupSelected); } @Override public LogicalOlapScan withLogicalProperties(Optional logicalProperties) { return new LogicalOlapScan(table, qualifier, Optional.empty(), logicalProperties, selectedPartitionIds, - partitionPruned); + partitionPruned, candidateIndexIds, rollupSelected); } public LogicalOlapScan withSelectedPartitionId(List selectedPartitionId) { - return new LogicalOlapScan(table, qualifier, Optional.empty(), Optional.of(logicalPropertiesSupplier.get()), - selectedPartitionId, true); + return new LogicalOlapScan(table, qualifier, Optional.empty(), Optional.of(getLogicalProperties()), + selectedPartitionId, true, candidateIndexIds, rollupSelected); + } + + public LogicalOlapScan withCandidateIndexIds(List candidateIndexIds) { + return new LogicalOlapScan(table, qualifier, Optional.empty(), Optional.of(getLogicalProperties()), + selectedPartitionIds, partitionPruned, candidateIndexIds, true); } @Override @@ -134,4 +148,27 @@ public class LogicalOlapScan extends LogicalRelation { public long getSelectedIndexId() { return selectedIndexId; } + + public boolean isRollupSelected() { + return rollupSelected; + } + + /** + * Should apply {@link org.apache.doris.nereids.rules.mv.SelectRollup} or not. + */ + public boolean shouldSelectRollup() { + switch (((OlapTable) table).getKeysType()) { + case AGG_KEYS: + case UNIQUE_KEYS: + return !rollupSelected; + default: + return false; + } + } + + @VisibleForTesting + public Optional getSelectRollupName() { + return rollupSelected ? Optional.ofNullable(((OlapTable) table).getIndexNameById(selectedIndexId)) + : Optional.empty(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java index da5cd93bba..79493ae1c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/util/ExpressionUtils.java @@ -18,18 +18,22 @@ package org.apache.doris.nereids.util; import org.apache.doris.nereids.trees.expressions.And; +import org.apache.doris.nereids.trees.expressions.Cast; import org.apache.doris.nereids.trees.expressions.CompoundPredicate; +import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.Or; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral; +import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -163,4 +167,62 @@ public class ExpressionUtils { } return minSlot; } + + /** + * Check whether the input expression is a {@link org.apache.doris.nereids.trees.expressions.Slot} + * or at least one {@link Cast} on a {@link org.apache.doris.nereids.trees.expressions.Slot} + *

+ * for example: + * - SlotReference to a column: + * col + * - Cast on SlotReference: + * cast(int_col as string) + * cast(cast(int_col as long) as string) + * + * @param expr input expression + * @return Return Optional[ExprId] of underlying slot reference if input expression is a slot or cast on slot. + * Otherwise, return empty optional result. + */ + public static Optional isSlotOrCastOnSlot(Expression expr) { + while (expr instanceof Cast) { + expr = expr.child(0); + } + + if (expr instanceof SlotReference) { + return Optional.of(((SlotReference) expr).getExprId()); + } else { + return Optional.empty(); + } + } + + /** + * Replace expression node in the expression tree by `replaceMap` in top-down manner. + * For example. + *

+     * input expression: a > 1
+     * replaceMap: a -> b + c
+     *
+     * output:
+     * b + c > 1
+     * 
+ */ + public static Expression replace(Expression expr, Map replaceMap) { + return expr.accept(ExpressionReplacer.INSTANCE, replaceMap); + } + + private static class ExpressionReplacer + extends DefaultExpressionRewriter> { + public static final ExpressionReplacer INSTANCE = new ExpressionReplacer(); + + private ExpressionReplacer() { + } + + @Override + public Expression visit(Expression expr, Map replaceMap) { + if (replaceMap.containsKey(expr)) { + return replaceMap.get(expr); + } + return super.visit(expr, replaceMap); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java b/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java index dbdcbfb765..01e871c1b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MaterializedViewSelector.java @@ -268,6 +268,7 @@ public class MaterializedViewSelector { return selectedIndexId; } + // Step2: check all columns in compensating predicates are available in the view output private void checkCompensatingPredicates(Set columnsInPredicates, Map candidateIndexIdToMeta) { // When the query statement does not contain any columns in predicates, all candidate index can pass this check @@ -300,6 +301,7 @@ public class MaterializedViewSelector { * @param candidateIndexIdToMeta */ + // Step3: group by list in query is the subset of group by list in view or view contains no aggregation private void checkGrouping(OlapTable table, Set columnsInGrouping, Map candidateIndexIdToMeta) { Iterator> iterator = candidateIndexIdToMeta.entrySet().iterator(); @@ -351,6 +353,7 @@ public class MaterializedViewSelector { + Joiner.on(",").join(candidateIndexIdToMeta.keySet())); } + // Step4: aggregation functions are available in the view output private void checkAggregationFunction(OlapTable table, Set aggregatedColumnsInQueryOutput, Map candidateIndexIdToMeta) throws AnalysisException { Iterator> iterator = candidateIndexIdToMeta.entrySet().iterator(); @@ -387,6 +390,7 @@ public class MaterializedViewSelector { + Joiner.on(",").join(candidateIndexIdToMeta.keySet())); } + // Step5: columns required to compute output expr are available in the view output private void checkOutputColumns(Set columnNamesInQueryOutput, Map candidateIndexIdToMeta) { if (columnNamesInQueryOutput == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 2ee08fb24a..089f1c2e7f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -237,6 +237,18 @@ public class OlapScanNode extends ScanNode { this.selectedPartitionIds = selectedPartitionIds; } + /** + * Only used for Neredis to set rollup or materialized view selection result. + */ + public void selectSelectIndexInfo( + long selectedIndexId, + boolean isPreAggregation, + String reasonOfPreAggregation) { + this.selectedIndexId = selectedIndexId; + this.isPreAggregation = isPreAggregation; + this.reasonOfPreAggregation = reasonOfPreAggregation; + } + /** * The function is used to directly select the index id of the base table as the * selectedIndexId. diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java index 29378e61c1..6cbd598262 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.glue.translator; +import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; @@ -30,12 +31,12 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.types.IntegerType; +import org.apache.doris.nereids.util.PlanConstructor; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanNode; import mockit.Injectable; -import mockit.Mocked; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -47,7 +48,8 @@ import java.util.Optional; public class PhysicalPlanTranslatorTest { @Test - public void testOlapPrune(@Mocked OlapTable t1, @Injectable LogicalProperties placeHolder) throws Exception { + public void testOlapPrune(@Injectable LogicalProperties placeHolder) throws Exception { + OlapTable t1 = PlanConstructor.newOlapTable(0, "t1", 0, KeysType.AGG_KEYS); List qualifier = new ArrayList<>(); qualifier.add("test"); List t1Output = new ArrayList<>(); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/mv/SelectRollupTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/mv/SelectRollupTest.java new file mode 100644 index 0000000000..1cdf53d489 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/mv/SelectRollupTest.java @@ -0,0 +1,321 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.mv; + +import org.apache.doris.common.FeConstants; +import org.apache.doris.nereids.util.PatternMatchSupported; +import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.utframe.TestWithFeService; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +class SelectRollupTest extends TestWithFeService implements PatternMatchSupported { + + @Override + protected void runBeforeAll() throws Exception { + FeConstants.runningUnitTest = true; + createDatabase("test"); + connectContext.setDatabase("default_cluster:test"); + + createTable("CREATE TABLE `t` (\n" + + " `k1` int(11) NULL,\n" + + " `k2` int(11) NULL,\n" + + " `k3` int(11) NULL,\n" + + " `v1` int(11) SUM NULL\n" + + ") ENGINE=OLAP\n" + + "AGGREGATE KEY(`k1`, `k2`, `k3`)\n" + + "COMMENT 'OLAP'\n" + + "DISTRIBUTED BY HASH(`k1`) BUCKETS 3\n" + + "PROPERTIES (\n" + + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"in_memory\" = \"false\",\n" + + "\"storage_format\" = \"V2\",\n" + + "\"disable_auto_compaction\" = \"false\"\n" + + ");"); + addRollup("alter table t add rollup r1(k2, v1)"); + addRollup("alter table t add rollup r2(k2, k3, v1)"); + + // + // createTable("CREATE TABLE `only_base` (\n" + // + " `k1` int(11) NULL,\n" + // + " `k2` int(11) NULL,\n" + // + " `k3` int(11) NULL,\n" + // + " `v1` int(11) SUM NULL\n" + // + ") ENGINE=OLAP\n" + // + "AGGREGATE KEY(`k1`, `k2`, k3)\n" + // + "COMMENT 'OLAP'\n" + // + "DISTRIBUTED BY HASH(`k2`) BUCKETS 3\n" + // + "PROPERTIES (\n" + // + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + // + "\"in_memory\" = \"false\",\n" + // + "\"storage_format\" = \"V2\"\n" + // + ")"); + // + // createTable("CREATE TABLE `dup_tbl` (\n" + // + " `k1` int(11) NULL,\n" + // + " `k2` int(11) NULL,\n" + // + " `k3` int(11) NULL,\n" + // + " `v1` int(11) NULL\n" + // + ") ENGINE=OLAP\n" + // + "DUPLICATE KEY(`k1`, `k2`, `k3`)\n" + // + "COMMENT 'OLAP'\n" + // + "DISTRIBUTED BY HASH(`k2`) BUCKETS 3\n" + // + "PROPERTIES (\n" + // + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + // + "\"in_memory\" = \"false\",\n" + // + "\"storage_format\" = \"V2\",\n" + // + "\"disable_auto_compaction\" = \"false\"\n" + // + ");"); + // + // createTable("create table t0 (\n" + // + " user_id int, \n" + // + " date date, \n" + // + " timestamp datetime,\n" + // + " city varchar,\n" + // + " age int,\n" + // + " gender int,\n" + // + " cost bigint sum,\n" + // + " last_visit_time datetime replace,\n" + // + " max_dwell_time int max,\n" + // + " min_dwell_time int min\n" + // + " )\n" + // + "aggregate key (user_id, date, timestamp, city, age, gender)\n" + // + "distributed by hash(user_id) buckets 3 \n" + // + "properties('replication_num' = '1')"); + // + // + // // addRollup("ALTER TABLE t1 ADD ROLLUP r1(user_id, cost)"); + // + // createTable("CREATE TABLE `t1` (\n" + // + " `k1` int(11) NULL,\n" + // + " `k2` int(11) NULL,\n" + // + " `k3` int(11) NULL,\n" + // + " `k4` int(11) NULL,\n" + // + " `k5` int(11) NULL,\n" + // + " `v1` int(11) SUM NULL,\n" + // + " `v2` int(11) SUM NULL\n" + // + ") ENGINE=OLAP\n" + // + "AGGREGATE KEY(`k1`, `k2`, k3, k4, k5)\n" + // + "COMMENT 'OLAP'\n" + // + "DISTRIBUTED BY HASH(`k1`) BUCKETS 3\n" + // + "PROPERTIES (\n" + // + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + // + "\"in_memory\" = \"false\",\n" + // + "\"storage_format\" = \"V2\"\n" + // + ")"); + // addRollup("ALTER TABLE t1 ADD ROLLUP r1(k1, k2, k3, v1)"); + // addRollup("ALTER TABLE t1 ADD ROLLUP r2(k1, k2, v1)"); + // addRollup("ALTER TABLE t1 ADD ROLLUP r3(k1, v1)"); + // + // createTable("CREATE TABLE `t2` (\n" + // + " `k1` int(11) NULL,\n" + // + " `k2` int(11) NULL,\n" + // + " `k3` int(11) NULL,\n" + // + " `k4` int(11) NULL,\n" + // + " `k5` int(11) NULL,\n" + // + " `v1` int(11) SUM NULL,\n" + // + " `v2` int(11) SUM NULL\n" + // + ") ENGINE=OLAP\n" + // + "AGGREGATE KEY(`k1`, `k2`, k3, k4, k5)\n" + // + "COMMENT 'OLAP'\n" + // + "DISTRIBUTED BY HASH(`k1`) BUCKETS 3\n" + // + "PROPERTIES (\n" + // + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + // + "\"in_memory\" = \"false\",\n" + // + "\"storage_format\" = \"V2\"\n" + // + ")"); + // addRollup("ALTER TABLE t2 ADD ROLLUP r1(k1, k2, k3, v1)"); + // addRollup("ALTER TABLE t2 ADD ROLLUP r2(k1, k2, v1)"); + // addRollup("ALTER TABLE t2 ADD ROLLUP r3(k1, v1)"); + } + + @Test + public void testAggMatching() { + PlanChecker.from(connectContext) + .analyze(" select k2, sum(v1) from t group by k2") + .applyTopDown(new SelectRollup()) + .matches(logicalOlapScan().when(scan -> "r1".equals(scan.getSelectRollupName().get()))); + } + + @Test + public void testMatchingBase() { + PlanChecker.from(connectContext) + .analyze(" select k1, sum(v1) from t group by k1") + .applyTopDown(new SelectRollup()) + .matches(logicalOlapScan().when(scan -> "t".equals(scan.getSelectRollupName().get()))); + } + + @Test + void testAggFilterScan() { + PlanChecker.from(connectContext) + .analyze("select k2, sum(v1) from t where k3=0 group by k2") + .applyTopDown(new SelectRollup()) + .matches(logicalOlapScan().when(scan -> "r2".equals(scan.getSelectRollupName().get()))); + } + + @Test + void testTranslate() { + PlanChecker.from(connectContext) + .checkPlannerResult( + " select k2, sum(v1) from t group by k2", + planner -> { + } + ); + } + + @Disabled + @Test + public void testDup() throws Exception { + // System.out.println(getSQLPlanOrErrorMsg("select k2, sum(v1) from dup_tbl group by k2")); + PlanChecker.from(connectContext) + .checkPlannerResult( + "select k2, sum(v1) from dup_tbl group by k2", + planner -> { + }) + ; + } + + @Disabled + @Test + void testTranslate1() { + PlanChecker.from(connectContext) + .checkPlannerResult( + "select k1, v1 from only_base group by k1", + planner -> { + } + ); + } + + /////////////////////////////////////////////////////////////////////////// + // For debugging legacy rollup selecting. + // Add these cases for nereids rollup select in the future. + /////////////////////////////////////////////////////////////////////////// + @Disabled + @Test + public void test() throws Exception { + System.out.println(getSQLPlanOrErrorMsg("select k1, sum(v1) from only_base group by k1")); + } + + @Disabled + @Test + public void testLegacyRollupSelect() throws Exception { + String explain = getSQLPlanOrErrorMsg("SELECT user_id, sum(cost) FROM t0 GROUP BY user_id"); + System.out.println(explain); + } + + @Disabled + @Test + public void testDuplicateKey() throws Exception { + System.out.println(getSQLPlanOrErrorMsg("select k1, sum(v1) from detail group by k1")); + } + + // todo: add test cases + // 1. equal case: k2=0 + // 2. not equal case: k2 !=0 + @Disabled + @Test + public void test1() throws Exception { + // predicates pushdown scan: k2>0 + // rollup: r2 + String sql1 = "select k1, sum(v1) from t2 where k2>0 group by k1"; + System.out.println(getSQLPlanOrErrorMsg(sql1)); + } + + @Disabled + @Test + public void test2() throws Exception { + // pushdown: k2>k3 + // rollup: r1 + String sql2 = "select k1, sum(v1) from t2 where k2>k3 group by k1"; + System.out.println(getSQLPlanOrErrorMsg(sql2)); + } + + /** + *
+     * :VOlapScanNode
+     *      TABLE: t2(t2), PREAGGREGATION: OFF. Reason: conjunct on `c1` which is StorageEngine value column
+     *      PREDICATES: `k2` > 0
+     *      rollup: t2
+     *      partitions=1/1, tablets=3/3, tabletList=10017,10019,10021
+     *      cardinality=0, avgRowSize=12.0, numNodes=1
+     * 
+ */ + @Disabled + @Test + public void testFilterPushDownProject() throws Exception { + // failed to select rollup + String sql = "select c1, sum(v1) from" + + "(select k1 as c1, k2 as c2, k3 as c3, k4 as c4, v1 as v1 from t2) t" + + " where c2>0 group by c1"; + System.out.println(getSQLPlanOrErrorMsg(sql)); + } + + /** + *
+     *   0:VOlapScanNode
+     *      TABLE: t2(t2), PREAGGREGATION: OFF. Reason: aggExpr.getChild(0)[(SlotRef{slotDesc=
+     *      SlotDescriptor{id=1, parent=0, col=v1, type=INT, materialized=true, byteSize=0,
+     *      byteOffset=-1, nullIndicatorByte=0, nullIndicatorBit=0, slotIdx=0}, col=v1, label=`v1`, tblName=null}
+     *      SlotRef{slotDesc=SlotDescriptor{id=2, parent=0, col=v2, type=INT, materialized=true,
+     *      byteSize=0, byteOffset=-1, nullIndicatorByte=0, nullIndicatorBit=0, slotIdx=0}, col=v2,
+     *      label=`v2`, tblName=null})] is not SlotRef or CastExpr|CaseExpr
+     *      rollup: t2
+     *      partitions=1/1, tablets=3/3, tabletList=10017,10019,10021
+     *      cardinality=0, avgRowSize=12.0, numNodes=1
+     * 
+ */ + @Disabled + @Test + public void testMultiAggColumns() throws Exception { + System.out.println(getSQLPlanOrErrorMsg("select k1, sum(v1 + v2) from t2 group by k1")); + } + + + /////////////////////////////////////////////////////////////////////////// + // Test turn off pre-agg in legacy logic. + // Add these cases for nereids rollup select in the future. + /////////////////////////////////////////////////////////////////////////// + + /** + *
+     *   0:VOlapScanNode
+     *      TABLE: t1(t1), PREAGGREGATION: OFF. Reason: No AggregateInfo
+     * 
+ */ + @Disabled + @Test + public void testNoAgg() throws Exception { + System.out.println(getSQLPlanOrErrorMsg("select k1, v1 from t1")); + } + + @Disabled + @Test + public void testJoin() throws Exception { + System.out.println( + getSQLPlanOrErrorMsg("select t1.k1, t2.k2, sum(t1.v1), sum(t2.v1) from " + + "t1 join t2 on t1.k1=t2.k1 group by t1.k1, t2.k2;")); + } + + @Disabled + @Test + public void testOuterJoin() throws Exception { + System.out.println(getSQLPlanOrErrorMsg( + "select t1.k1, t2.k2, sum(t1.v1), sum(t2.v1) from t1 outer join t2 on t1.k1=t2.k1")); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java index 5493ca9c6a..6292a24c79 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java @@ -81,8 +81,10 @@ public class PlanToStringTest { @Test public void testLogicalOlapScan() { LogicalOlapScan plan = PlanConstructor.newLogicalOlapScan(0, "table", 0); + System.out.println(plan.toString()); Assertions.assertTrue( - plan.toString().matches("LogicalOlapScan \\( qualified=db\\.table, output=\\[id#\\d+, name#\\d+] \\)")); + plan.toString().matches("LogicalOlapScan \\( qualified=db\\.table, " + + "output=\\[id#\\d+, name#\\d+], candidateIndexIds=\\[], selectedIndexId=-1 \\)")); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java index d23d43e3e3..c0d909e2eb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java @@ -18,10 +18,14 @@ package org.apache.doris.nereids.util; import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.jobs.JobContext; import org.apache.doris.nereids.memo.Group; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.memo.Memo; +import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.pattern.GroupExpressionMatching; import org.apache.doris.nereids.pattern.MatchingContext; import org.apache.doris.nereids.pattern.PatternDescriptor; @@ -31,7 +35,9 @@ import org.apache.doris.nereids.rules.RuleFactory; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; import com.google.common.base.Supplier; import com.google.common.collect.Lists; @@ -258,6 +264,15 @@ public class PlanChecker { return this; } + public PlanChecker checkPlannerResult(String sql, Consumer consumer) { + LogicalPlan parsed = new NereidsParser().parseSingle(sql); + NereidsPlanner nereidsPlanner = new NereidsPlanner( + new StatementContext(connectContext, new OriginStatement(sql, 0))); + nereidsPlanner.plan(LogicalPlanAdapter.of(parsed)); + consumer.accept(nereidsPlanner); + return this; + } + public static PlanChecker from(ConnectContext connectContext) { return new PlanChecker(connectContext); } @@ -274,4 +289,8 @@ public class PlanChecker { public CascadesContext getCascadesContext() { return cascadesContext; } + + public Plan getPlan() { + return cascadesContext.getMemo().copyOut(); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanConstructor.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanConstructor.java index 4bad76c26e..cae47c6115 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanConstructor.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanConstructor.java @@ -74,6 +74,10 @@ public class PlanConstructor { } public static OlapTable newOlapTable(long tableId, String tableName, int hashColumn) { + return newOlapTable(tableId, tableName, hashColumn, KeysType.PRIMARY_KEYS); + } + + public static OlapTable newOlapTable(long tableId, String tableName, int hashColumn, KeysType keysType) { List columns = ImmutableList.of( new Column("id", Type.INT, true, AggregateType.NONE, "0", ""), new Column("name", Type.STRING, true, AggregateType.NONE, "", "")); @@ -82,13 +86,13 @@ public class PlanConstructor { ImmutableList.of(columns.get(hashColumn))); OlapTable table = new OlapTable(tableId, tableName, columns, - KeysType.PRIMARY_KEYS, new PartitionInfo(), hashDistributionInfo); + keysType, new PartitionInfo(), hashDistributionInfo); table.setIndexMeta(-1, tableName, table.getFullSchema(), 0, 0, (short) 0, TStorageType.COLUMN, - KeysType.PRIMARY_KEYS); + keysType); return table; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 57b68ca4bc..1e10f52a28 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -17,7 +17,9 @@ package org.apache.doris.utframe; +import org.apache.doris.alter.AlterJobV2; import org.apache.doris.analysis.AlterSqlBlockRuleStmt; +import org.apache.doris.analysis.AlterTableStmt; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreatePolicyStmt; @@ -68,6 +70,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.io.FileUtils; +import org.junit.Assert; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -499,4 +502,27 @@ public abstract class TestWithFeService { connectContext.setCurrentUserIdentity(user); connectContext.setQualifiedUser(SystemInfoService.DEFAULT_CLUSTER + ":" + userName); } + + protected void addRollup(String sql) throws Exception { + AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext); + Env.getCurrentEnv().alterTable(alterTableStmt); + checkAlterJob(); + // waiting table state to normal + Thread.sleep(100); + } + + private void checkAlterJob() throws InterruptedException { + // check alter job + Map alterJobs = Env.getCurrentEnv().getMaterializedViewHandler().getAlterJobsV2(); + for (AlterJobV2 alterJobV2 : alterJobs.values()) { + while (!alterJobV2.getJobState().isFinalState()) { + System.out.println("alter job " + alterJobV2.getDbId() + + " is running. state: " + alterJobV2.getJobState()); + Thread.sleep(100); + } + System.out.println("alter job " + alterJobV2.getDbId() + " is done. state: " + alterJobV2.getJobState()); + Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState()); + } + } + } diff --git a/regression-test/data/nereids_syntax_p0/rollup.out b/regression-test/data/nereids_syntax_p0/rollup.out new file mode 100644 index 0000000000..71d96f5919 --- /dev/null +++ b/regression-test/data/nereids_syntax_p0/rollup.out @@ -0,0 +1,9 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !rollup1 -- +2 6 +3 4 + +-- !rollup2 -- +1 6 +2 4 + diff --git a/regression-test/suites/nereids_syntax_p0/rollup.groovy b/regression-test/suites/nereids_syntax_p0/rollup.groovy new file mode 100644 index 0000000000..aabb980138 --- /dev/null +++ b/regression-test/suites/nereids_syntax_p0/rollup.groovy @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +suite("rollup") { + sql """ + CREATE TABLE `rollup_t1` ( + `k1` int(11) NULL, + `k2` int(11) NULL, + `k3` int(11) NULL, + `v1` int(11) SUM NULL + ) ENGINE=OLAP + AGGREGATE KEY(`k1`, `k2`, `k3`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2", + "disable_auto_compaction" = "false" + ); + """ + + sql "ALTER TABLE rollup_t1 ADD ROLLUP r1(k2, v1)" + + + def getJobRollupState = { tableName -> + def jobStateResult = sql """ SHOW ALTER TABLE ROLLUP WHERE TableName='${tableName}' ORDER BY CreateTime DESC LIMIT 1; """ + return jobStateResult[0][8] + } + + int max_try_secs = 60 + while (max_try_secs--) { + String res = getJobRollupState("rollup_t1") + if (res == "FINISHED") { + break + } else { + Thread.sleep(2000) + if (max_try_secs < 1) { + println "test timeout," + "state:" + res + assertEquals("FINISHED",res) + } + } + } + Thread.sleep(200) + + sql "insert into rollup_t1 values(1, 2, 3, 4)" + sql "insert into rollup_t1 values(1, 2, 3, 2)" + sql "insert into rollup_t1 values(2, 3, 4, 1)" + sql "insert into rollup_t1 values(2, 3, 4, 3)" + + sql "set enable_vectorized_engine=true" + + sql "set enable_nereids_planner=true" + + explain { + sql("select k2, sum(v1) from rollup_t1 group by k2") + contains("rollup_t1(r1)") + } + + explain { + sql("select k1, sum(v1) from rollup_t1 group by k1") + contains("rollup_t1(rollup_t1)") + } + + order_qt_rollup1 "select k2, sum(v1) from rollup_t1 group by k2" + + order_qt_rollup2 "select k1, sum(v1) from rollup_t1 group by k1" +}