[feature](Nereids) support materialized index selection (#13416)

This PR unified the selection of rollup index and materialized view index into uniform logic, which is called selecting materialized index. 

Main steps:

### Find candidate indexes
1. When aggregate is present, it's handled in `SelectMaterializedIndexWithAggregate`.  The base index and indexes that could use pre-aggregation should be used. The pre-aggregation status is determined by aggregation function, grouping expression, and pushdown predicates.
2. When aggregate is not on top of scan node, it's handled in `SelectMaterializedIndexWithoutAggregate`. The base index and indexes that have all the key columns could be used.

### Filter and order the candidate indexes
1. filter indexes that contain all the required output scan columns.
2. filter indexes that could match prefix index most.
3. order the result index by row count, column count, and index id.
This commit is contained in:
Shuo Wang
2022-10-25 19:25:58 +08:00
committed by GitHub
parent bd884d3298
commit d6c3470c8d
13 changed files with 1796 additions and 408 deletions

View File

@ -33,7 +33,6 @@ import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.properties.DistributionSpecHash;
import org.apache.doris.nereids.properties.DistributionSpecHash.ShuffleType;
import org.apache.doris.nereids.properties.OrderKey;
@ -320,22 +319,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
tupleDescriptor.setRef(tableRef);
olapScanNode.setSelectedPartitionIds(olapScan.getSelectedPartitionIds());
// TODO: Unify the logic here for all the table types once aggregate/unique key types are fully supported.
switch (olapScan.getTable().getKeysType()) {
case AGG_KEYS:
case UNIQUE_KEYS:
// TODO: Improve complete info for aggregate and unique key types table.
case DUP_KEYS:
PreAggStatus preAgg = olapScan.getPreAggStatus();
olapScanNode.setSelectedIndexInfo(olapScan.getSelectedIndexId(), preAgg.isOn(), preAgg.getOffReason());
break;
case DUP_KEYS:
try {
olapScanNode.updateScanRangeInfoByNewMVSelector(olapScan.getSelectedIndexId(), true, "");
olapScanNode.setIsPreAggregation(true, "");
} catch (Exception e) {
throw new AnalysisException(e.getMessage());
}
break;
default:
throw new RuntimeException("Not supported key type: " + olapScan.getTable().getKeysType());
}

View File

@ -22,8 +22,8 @@ import org.apache.doris.nereids.jobs.Job;
import org.apache.doris.nereids.rules.RuleSet;
import org.apache.doris.nereids.rules.expression.rewrite.ExpressionNormalization;
import org.apache.doris.nereids.rules.expression.rewrite.ExpressionOptimization;
import org.apache.doris.nereids.rules.mv.SelectRollupWithAggregate;
import org.apache.doris.nereids.rules.mv.SelectRollupWithoutAggregate;
import org.apache.doris.nereids.rules.mv.SelectMaterializedIndexWithAggregate;
import org.apache.doris.nereids.rules.mv.SelectMaterializedIndexWithoutAggregate;
import org.apache.doris.nereids.rules.rewrite.logical.ColumnPruning;
import org.apache.doris.nereids.rules.rewrite.logical.EliminateFilter;
import org.apache.doris.nereids.rules.rewrite.logical.EliminateLimit;
@ -73,8 +73,8 @@ public class NereidsRewriteJobExecutor extends BatchRulesJob {
.add(topDownBatch(ImmutableList.of(new EliminateLimit())))
.add(topDownBatch(ImmutableList.of(new EliminateFilter())))
.add(topDownBatch(ImmutableList.of(new PruneOlapScanPartition())))
.add(topDownBatch(ImmutableList.of(new SelectRollupWithAggregate())))
.add(topDownBatch(ImmutableList.of(new SelectRollupWithoutAggregate())))
.add(topDownBatch(ImmutableList.of(new SelectMaterializedIndexWithAggregate())))
.add(topDownBatch(ImmutableList.of(new SelectMaterializedIndexWithoutAggregate())))
.build();
rulesJob.addAll(jobs);

View File

@ -108,12 +108,16 @@ public enum RuleType {
ELIMINATE_FILTER(RuleTypeClass.REWRITE),
ELIMINATE_OUTER(RuleTypeClass.REWRITE),
FIND_HASH_CONDITION_FOR_JOIN(RuleTypeClass.REWRITE),
ROLLUP_AGG_SCAN(RuleTypeClass.REWRITE),
ROLLUP_AGG_FILTER_SCAN(RuleTypeClass.REWRITE),
ROLLUP_AGG_PROJECT_SCAN(RuleTypeClass.REWRITE),
ROLLUP_AGG_PROJECT_FILTER_SCAN(RuleTypeClass.REWRITE),
ROLLUP_AGG_FILTER_PROJECT_SCAN(RuleTypeClass.REWRITE),
ROLLUP_WITH_OUT_AGG(RuleTypeClass.REWRITE),
MATERIALIZED_INDEX_AGG_SCAN(RuleTypeClass.REWRITE),
MATERIALIZED_INDEX_AGG_FILTER_SCAN(RuleTypeClass.REWRITE),
MATERIALIZED_INDEX_AGG_PROJECT_SCAN(RuleTypeClass.REWRITE),
MATERIALIZED_INDEX_AGG_PROJECT_FILTER_SCAN(RuleTypeClass.REWRITE),
MATERIALIZED_INDEX_AGG_FILTER_PROJECT_SCAN(RuleTypeClass.REWRITE),
MATERIALIZED_INDEX_SCAN(RuleTypeClass.REWRITE),
MATERIALIZED_INDEX_FILTER_SCAN(RuleTypeClass.REWRITE),
MATERIALIZED_INDEX_PROJECT_SCAN(RuleTypeClass.REWRITE),
MATERIALIZED_INDEX_PROJECT_FILTER_SCAN(RuleTypeClass.REWRITE),
MATERIALIZED_INDEX_FILTER_PROJECT_SCAN(RuleTypeClass.REWRITE),
OLAP_SCAN_PARTITION_PRUNE(RuleTypeClass.REWRITE),
EXTRACT_SINGLE_TABLE_EXPRESSION_FROM_DISJUNCTION(RuleTypeClass.REWRITE),
REWRITE_SENTINEL(RuleTypeClass.REWRITE),

View File

@ -0,0 +1,273 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.mv;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.InPredicate;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.NullSafeEqual;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.util.ExpressionUtils;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Base class for selecting materialized index rules.
*/
public abstract class AbstractSelectMaterializedIndexRule {
protected boolean shouldSelectIndex(LogicalOlapScan scan) {
switch (scan.getTable().getKeysType()) {
case AGG_KEYS:
case UNIQUE_KEYS:
case DUP_KEYS:
return !scan.isIndexSelected();
default:
return false;
}
}
/**
* 1. indexes have all the required columns.
* 2. find matching key prefix most.
* 3. sort by row count, column count and index id.
*/
protected List<Long> filterAndOrder(
Stream<MaterializedIndex> candidates,
LogicalOlapScan scan,
Set<Slot> requiredScanOutput,
List<Expression> predicates) {
OlapTable table = scan.getTable();
// Scan slot exprId -> slot name
Map<ExprId, String> exprIdToName = scan.getOutput()
.stream()
.collect(Collectors.toMap(NamedExpression::getExprId, NamedExpression::getName));
// get required column names in metadata.
Set<String> requiredColumnNames = requiredScanOutput
.stream()
.map(slot -> exprIdToName.get(slot.getExprId()))
.collect(Collectors.toSet());
// 1. filter index contains all the required columns by column name.
List<MaterializedIndex> containAllRequiredColumns = candidates
.filter(index -> table.getSchemaByIndexId(index.getId(), true)
.stream()
.map(Column::getName)
.collect(Collectors.toSet())
.containsAll(requiredColumnNames)
).collect(Collectors.toList());
// 2. find matching key prefix most.
List<MaterializedIndex> matchingKeyPrefixMost = matchPrefixMost(scan, containAllRequiredColumns, predicates,
exprIdToName);
List<Long> partitionIds = scan.getSelectedPartitionIds();
// 3. sort by row count, column count and index id.
return matchingKeyPrefixMost.stream()
.map(MaterializedIndex::getId)
.sorted(Comparator
// compare by row count
.comparing(rid -> partitionIds.stream()
.mapToLong(pid -> table.getPartition(pid).getIndex((Long) rid).getRowCount())
.sum())
// compare by column count
.thenComparing(rid -> table.getSchemaByIndexId((Long) rid).size())
// compare by index id
.thenComparing(rid -> (Long) rid))
.collect(Collectors.toList());
}
protected List<MaterializedIndex> matchPrefixMost(
LogicalOlapScan scan,
List<MaterializedIndex> candidate,
List<Expression> predicates,
Map<ExprId, String> exprIdToName) {
Map<Boolean, Set<String>> split = filterCanUsePrefixIndexAndSplitByEquality(predicates, exprIdToName);
Set<String> equalColNames = split.getOrDefault(true, ImmutableSet.of());
Set<String> nonEqualColNames = split.getOrDefault(false, ImmutableSet.of());
if (!(equalColNames.isEmpty() && nonEqualColNames.isEmpty())) {
List<MaterializedIndex> matchingResult = matchKeyPrefixMost(scan.getTable(), candidate,
equalColNames, nonEqualColNames);
return matchingResult.isEmpty() ? candidate : matchingResult;
} else {
return candidate;
}
}
///////////////////////////////////////////////////////////////////////////
// Split conjuncts into equal-to and non-equal-to.
///////////////////////////////////////////////////////////////////////////
/**
* Filter the input conjuncts those can use prefix and split into 2 groups: is equal-to or non-equal-to predicate
* when comparing the key column.
*/
private Map<Boolean, Set<String>> filterCanUsePrefixIndexAndSplitByEquality(
List<Expression> conjunct, Map<ExprId, String> exprIdToColName) {
return conjunct.stream()
.map(expr -> PredicateChecker.canUsePrefixIndex(expr, exprIdToColName))
.filter(result -> !result.equals(PrefixIndexCheckResult.FAILURE))
.collect(Collectors.groupingBy(
result -> result.type == ResultType.SUCCESS_EQUAL,
Collectors.mapping(result -> result.colName, Collectors.toSet())
));
}
private enum ResultType {
FAILURE,
SUCCESS_EQUAL,
SUCCESS_NON_EQUAL,
}
private static class PrefixIndexCheckResult {
public static final PrefixIndexCheckResult FAILURE = new PrefixIndexCheckResult(null, ResultType.FAILURE);
private final String colName;
private final ResultType type;
private PrefixIndexCheckResult(String colName, ResultType result) {
this.colName = colName;
this.type = result;
}
public static PrefixIndexCheckResult createEqual(String name) {
return new PrefixIndexCheckResult(name, ResultType.SUCCESS_EQUAL);
}
public static PrefixIndexCheckResult createNonEqual(String name) {
return new PrefixIndexCheckResult(name, ResultType.SUCCESS_NON_EQUAL);
}
}
/**
* Check if an expression could prefix key index.
*/
private static class PredicateChecker extends ExpressionVisitor<PrefixIndexCheckResult, Map<ExprId, String>> {
private static final PredicateChecker INSTANCE = new PredicateChecker();
private PredicateChecker() {
}
public static PrefixIndexCheckResult canUsePrefixIndex(Expression expression,
Map<ExprId, String> exprIdToName) {
return expression.accept(INSTANCE, exprIdToName);
}
@Override
public PrefixIndexCheckResult visit(Expression expr, Map<ExprId, String> context) {
return PrefixIndexCheckResult.FAILURE;
}
@Override
public PrefixIndexCheckResult visitInPredicate(InPredicate in, Map<ExprId, String> context) {
Optional<ExprId> slotOrCastOnSlot = ExpressionUtils.isSlotOrCastOnSlot(in.getCompareExpr());
if (slotOrCastOnSlot.isPresent() && in.getOptions().stream().allMatch(Literal.class::isInstance)) {
return PrefixIndexCheckResult.createEqual(context.get(slotOrCastOnSlot.get()));
} else {
return PrefixIndexCheckResult.FAILURE;
}
}
@Override
public PrefixIndexCheckResult visitComparisonPredicate(ComparisonPredicate cp, Map<ExprId, String> context) {
if (cp instanceof EqualTo || cp instanceof NullSafeEqual) {
return check(cp, context, PrefixIndexCheckResult::createEqual);
} else {
return check(cp, context, PrefixIndexCheckResult::createNonEqual);
}
}
private PrefixIndexCheckResult check(ComparisonPredicate cp, Map<ExprId, String> exprIdToColumnName,
Function<String, PrefixIndexCheckResult> resultMapper) {
return check(cp).map(exprId -> resultMapper.apply(exprIdToColumnName.get(exprId)))
.orElse(PrefixIndexCheckResult.FAILURE);
}
private Optional<ExprId> check(ComparisonPredicate cp) {
Optional<ExprId> exprId = check(cp.left(), cp.right());
return exprId.isPresent() ? exprId : check(cp.right(), cp.left());
}
private Optional<ExprId> check(Expression maybeSlot, Expression maybeConst) {
Optional<ExprId> exprIdOpt = ExpressionUtils.isSlotOrCastOnSlot(maybeSlot);
return exprIdOpt.isPresent() && maybeConst.isConstant() ? exprIdOpt : Optional.empty();
}
}
///////////////////////////////////////////////////////////////////////////
// Matching key prefix
///////////////////////////////////////////////////////////////////////////
private List<MaterializedIndex> matchKeyPrefixMost(
OlapTable table,
List<MaterializedIndex> indexes,
Set<String> equalColumns,
Set<String> nonEqualColumns) {
TreeMap<Integer, List<MaterializedIndex>> collect = indexes.stream()
.collect(Collectors.toMap(
index -> indexKeyPrefixMatchCount(table, index, equalColumns, nonEqualColumns),
Lists::newArrayList,
(l1, l2) -> {
l1.addAll(l2);
return l1;
},
TreeMap::new)
);
return collect.descendingMap().firstEntry().getValue();
}
private int indexKeyPrefixMatchCount(
OlapTable table,
MaterializedIndex index,
Set<String> equalColNames,
Set<String> nonEqualColNames) {
int matchCount = 0;
for (Column column : table.getSchemaByIndexId(index.getId())) {
if (equalColNames.contains(column.getName())) {
matchCount++;
} else if (nonEqualColNames.contains(column.getName())) {
// Unequivalence predicate's columns can match only first column in index.
matchCount++;
break;
} else {
break;
}
}
return matchCount;
}
}

View File

@ -26,19 +26,15 @@ import org.apache.doris.nereids.annotation.Developing;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.InPredicate;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.NullSafeEqual;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
import org.apache.doris.nereids.trees.expressions.functions.agg.Max;
import org.apache.doris.nereids.trees.expressions.functions.agg.Min;
import org.apache.doris.nereids.trees.expressions.functions.agg.Sum;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.algebra.Project;
@ -50,24 +46,26 @@ import org.apache.doris.nereids.util.ExpressionUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Select rollup index when aggregate is present.
* Select materialized index, i.e., both for rollup and materialized view when aggregate is present.
* TODO: optimize queries with aggregate not on top of scan directly, e.g., aggregate -> join -> scan
* to use materialized index.
*/
@Developing
public class SelectRollupWithAggregate implements RewriteRuleFactory {
public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterializedIndexRule
implements RewriteRuleFactory {
///////////////////////////////////////////////////////////////////////////
// All the patterns
///////////////////////////////////////////////////////////////////////////
@ -76,9 +74,9 @@ public class SelectRollupWithAggregate implements RewriteRuleFactory {
return ImmutableList.of(
// only agg above scan
// Aggregate(Scan)
logicalAggregate(logicalOlapScan().when(LogicalOlapScan::shouldSelectRollup)).then(agg -> {
logicalAggregate(logicalOlapScan().when(this::shouldSelectIndex)).then(agg -> {
LogicalOlapScan scan = agg.child();
Pair<PreAggStatus, List<Long>> result = selectCandidateRollupIds(
Pair<PreAggStatus, List<Long>> result = select(
scan,
agg.getInputSlots(),
ImmutableList.of(),
@ -87,11 +85,11 @@ public class SelectRollupWithAggregate implements RewriteRuleFactory {
return agg.withChildren(
scan.withMaterializedIndexSelected(result.key(), result.value())
);
}).toRule(RuleType.ROLLUP_AGG_SCAN),
}).toRule(RuleType.MATERIALIZED_INDEX_AGG_SCAN),
// filter could push down scan.
// Aggregate(Filter(Scan))
logicalAggregate(logicalFilter(logicalOlapScan().when(LogicalOlapScan::shouldSelectRollup)))
logicalAggregate(logicalFilter(logicalOlapScan().when(this::shouldSelectIndex)))
.then(agg -> {
LogicalFilter<LogicalOlapScan> filter = agg.child();
LogicalOlapScan scan = filter.child();
@ -100,7 +98,7 @@ public class SelectRollupWithAggregate implements RewriteRuleFactory {
.addAll(filter.getInputSlots())
.build();
Pair<PreAggStatus, List<Long>> result = selectCandidateRollupIds(
Pair<PreAggStatus, List<Long>> result = select(
scan,
requiredSlots,
filter.getConjuncts(),
@ -110,15 +108,15 @@ public class SelectRollupWithAggregate implements RewriteRuleFactory {
return agg.withChildren(filter.withChildren(
scan.withMaterializedIndexSelected(result.key(), result.value())
));
}).toRule(RuleType.ROLLUP_AGG_FILTER_SCAN),
}).toRule(RuleType.MATERIALIZED_INDEX_AGG_FILTER_SCAN),
// column pruning or other projections such as alias, etc.
// Aggregate(Project(Scan))
logicalAggregate(logicalProject(logicalOlapScan().when(LogicalOlapScan::shouldSelectRollup)))
logicalAggregate(logicalProject(logicalOlapScan().when(this::shouldSelectIndex)))
.then(agg -> {
LogicalProject<LogicalOlapScan> project = agg.child();
LogicalOlapScan scan = project.child();
Pair<PreAggStatus, List<Long>> result = selectCandidateRollupIds(
Pair<PreAggStatus, List<Long>> result = select(
scan,
project.getInputSlots(),
ImmutableList.of(),
@ -131,18 +129,21 @@ public class SelectRollupWithAggregate implements RewriteRuleFactory {
scan.withMaterializedIndexSelected(result.key(), result.value())
)
);
}).toRule(RuleType.ROLLUP_AGG_PROJECT_SCAN),
}).toRule(RuleType.MATERIALIZED_INDEX_AGG_PROJECT_SCAN),
// filter could push down and project.
// Aggregate(Project(Filter(Scan)))
logicalAggregate(logicalProject(logicalFilter(logicalOlapScan()
.when(LogicalOlapScan::shouldSelectRollup)))).then(agg -> {
.when(this::shouldSelectIndex)))).then(agg -> {
LogicalProject<LogicalFilter<LogicalOlapScan>> project = agg.child();
LogicalFilter<LogicalOlapScan> filter = project.child();
LogicalOlapScan scan = filter.child();
Pair<PreAggStatus, List<Long>> result = selectCandidateRollupIds(
Set<Slot> requiredSlots = Stream.concat(
project.getInputSlots().stream(), filter.getInputSlots().stream())
.collect(Collectors.toSet());
Pair<PreAggStatus, List<Long>> result = select(
scan,
agg.getInputSlots(),
requiredSlots,
filter.getConjuncts(),
extractAggFunctionAndReplaceSlot(agg, Optional.of(project)),
ExpressionUtils.replace(agg.getGroupByExpressions(),
@ -151,16 +152,16 @@ public class SelectRollupWithAggregate implements RewriteRuleFactory {
return agg.withChildren(project.withChildren(filter.withChildren(
scan.withMaterializedIndexSelected(result.key(), result.value())
)));
}).toRule(RuleType.ROLLUP_AGG_PROJECT_FILTER_SCAN),
}).toRule(RuleType.MATERIALIZED_INDEX_AGG_PROJECT_FILTER_SCAN),
// filter can't push down
// Aggregate(Filter(Project(Scan)))
logicalAggregate(logicalFilter(logicalProject(logicalOlapScan()
.when(LogicalOlapScan::shouldSelectRollup)))).then(agg -> {
.when(this::shouldSelectIndex)))).then(agg -> {
LogicalFilter<LogicalProject<LogicalOlapScan>> filter = agg.child();
LogicalProject<LogicalOlapScan> project = filter.child();
LogicalOlapScan scan = project.child();
Pair<PreAggStatus, List<Long>> result = selectCandidateRollupIds(
Pair<PreAggStatus, List<Long>> result = select(
scan,
project.getInputSlots(),
ImmutableList.of(),
@ -171,23 +172,22 @@ public class SelectRollupWithAggregate implements RewriteRuleFactory {
return agg.withChildren(filter.withChildren(project.withChildren(
scan.withMaterializedIndexSelected(result.key(), result.value())
)));
}).toRule(RuleType.ROLLUP_AGG_FILTER_PROJECT_SCAN)
}).toRule(RuleType.MATERIALIZED_INDEX_AGG_FILTER_PROJECT_SCAN)
);
}
///////////////////////////////////////////////////////////////////////////
// Main entrance of select rollup
// Main entrance of select materialized index.
///////////////////////////////////////////////////////////////////////////
/**
* Select candidate rollup ids.
* Select materialized index ids.
* <p>
* 0. turn off pre agg, checking input aggregate functions and group by expressions, etc.
* 1. rollup contains all the required output slots.
* 2. match the most prefix index if pushdown predicates present.
* 3. sort the result matching rollup index ids.
* 1. find candidate indexes by pre-agg status: checking input aggregate functions and group by expressions
* and pushdown predicates.
* 2. filter and order the candidate indexes.
*/
private Pair<PreAggStatus, List<Long>> selectCandidateRollupIds(
private Pair<PreAggStatus, List<Long>> select(
LogicalOlapScan scan,
Set<Slot> requiredScanOutput,
List<Expression> predicates,
@ -197,204 +197,46 @@ public class SelectRollupWithAggregate implements RewriteRuleFactory {
String.format("Scan's output (%s) should contains all the input required scan output (%s).",
scan.getOutput(), requiredScanOutput));
// 0. maybe turn off pre agg.
PreAggStatus preAggStatus = checkPreAggStatus(scan, predicates, aggregateFunctions, groupingExprs);
if (preAggStatus.isOff()) {
// return early if pre agg status if off.
return Pair.of(preAggStatus, ImmutableList.of(scan.getTable().getBaseIndexId()));
}
OlapTable table = scan.getTable();
// Scan slot exprId -> slot name
Map<ExprId, String> exprIdToName = scan.getOutput()
.stream()
.collect(Collectors.toMap(NamedExpression::getExprId, NamedExpression::getName));
// get required column names in metadata.
Set<String> requiredColumnNames = requiredScanOutput
.stream()
.map(slot -> exprIdToName.get(slot.getExprId()))
.collect(Collectors.toSet());
// 1. filter rollup contains all the required columns by column name.
List<MaterializedIndex> containAllRequiredColumns = table.getVisibleIndex().stream()
.filter(rollup -> table.getSchemaByIndexId(rollup.getId(), true)
// 0. check pre-aggregation status.
final PreAggStatus preAggStatus;
final Stream<MaterializedIndex> checkPreAggResult;
switch (table.getKeysType()) {
case AGG_KEYS:
case UNIQUE_KEYS:
// Check pre-aggregation status by base index for aggregate-keys and unique-keys OLAP table.
preAggStatus = checkPreAggStatus(scan, table.getBaseIndexId(), predicates,
aggregateFunctions, groupingExprs);
if (preAggStatus.isOff()) {
// return early if pre agg status if off.
return Pair.of(preAggStatus, ImmutableList.of(scan.getTable().getBaseIndexId()));
}
checkPreAggResult = table.getVisibleIndex().stream();
break;
case DUP_KEYS:
Map<Boolean, List<MaterializedIndex>> indexesGroupByIsBaseOrNot = table.getVisibleIndex()
.stream()
.map(Column::getName)
.collect(Collectors.toSet())
.containsAll(requiredColumnNames)
).collect(Collectors.toList());
.collect(Collectors.groupingBy(index -> index.getId() == table.getBaseIndexId()));
Map<Boolean, Set<String>> split = filterCanUsePrefixIndexAndSplitByEquality(predicates, exprIdToName);
Set<String> equalColNames = split.getOrDefault(true, ImmutableSet.of());
Set<String> nonEqualColNames = split.getOrDefault(false, ImmutableSet.of());
// 2. find matching key prefix most.
List<MaterializedIndex> matchingKeyPrefixMost;
if (!(equalColNames.isEmpty() && nonEqualColNames.isEmpty())) {
List<MaterializedIndex> matchingResult = matchKeyPrefixMost(table, containAllRequiredColumns,
equalColNames, nonEqualColNames);
matchingKeyPrefixMost = matchingResult.isEmpty() ? containAllRequiredColumns : matchingResult;
} else {
matchingKeyPrefixMost = containAllRequiredColumns;
}
List<Long> partitionIds = scan.getSelectedPartitionIds();
// 3. sort by row count, column count and index id.
List<Long> sortedIndexId = matchingKeyPrefixMost.stream()
.map(MaterializedIndex::getId)
.sorted(Comparator
// compare by row count
.comparing(rid -> partitionIds.stream()
.mapToLong(pid -> table.getPartition(pid).getIndex((Long) rid).getRowCount())
.sum())
// compare by column count
.thenComparing(rid -> table.getSchemaByIndexId((Long) rid).size())
// compare by rollup index id
.thenComparing(rid -> (Long) rid))
.collect(Collectors.toList());
return Pair.of(preAggStatus, sortedIndexId);
}
///////////////////////////////////////////////////////////////////////////
// Matching key prefix
///////////////////////////////////////////////////////////////////////////
private List<MaterializedIndex> matchKeyPrefixMost(
OlapTable table,
List<MaterializedIndex> rollups,
Set<String> equalColumns,
Set<String> nonEqualColumns) {
TreeMap<Integer, List<MaterializedIndex>> collect = rollups.stream()
.collect(Collectors.toMap(
rollup -> rollupKeyPrefixMatchCount(table, rollup, equalColumns, nonEqualColumns),
Lists::newArrayList,
(l1, l2) -> {
l1.addAll(l2);
return l1;
},
TreeMap::new)
// Duplicate-keys table could use base index and indexes that pre-aggregation status is on.
checkPreAggResult = Stream.concat(
indexesGroupByIsBaseOrNot.get(true).stream(),
indexesGroupByIsBaseOrNot.getOrDefault(false, ImmutableList.of())
.stream()
.filter(index -> checkPreAggStatus(scan, index.getId(), predicates,
aggregateFunctions, groupingExprs).isOn())
);
return collect.descendingMap().firstEntry().getValue();
}
private int rollupKeyPrefixMatchCount(
OlapTable table,
MaterializedIndex rollup,
Set<String> equalColNames,
Set<String> nonEqualColNames) {
int matchCount = 0;
for (Column column : table.getSchemaByIndexId(rollup.getId())) {
if (equalColNames.contains(column.getName())) {
matchCount++;
} else if (nonEqualColNames.contains(column.getName())) {
// Unequivalence predicate's columns can match only first column in rollup.
matchCount++;
// Pre-aggregation is set to `on` by default for duplicate-keys table.
preAggStatus = PreAggStatus.on();
break;
} else {
break;
}
}
return matchCount;
}
///////////////////////////////////////////////////////////////////////////
// Split conjuncts into equal-to and non-equal-to.
///////////////////////////////////////////////////////////////////////////
/**
* Filter the input conjuncts those can use prefix and split into 2 groups: is equal-to or non-equal-to predicate
* when comparing the key column.
*/
private Map<Boolean, Set<String>> filterCanUsePrefixIndexAndSplitByEquality(
List<Expression> conjunct, Map<ExprId, String> exprIdToColName) {
return conjunct.stream()
.map(expr -> PredicateChecker.canUsePrefixIndex(expr, exprIdToColName))
.filter(result -> !result.equals(PrefixIndexCheckResult.FAILURE))
.collect(Collectors.groupingBy(
result -> result.type == ResultType.SUCCESS_EQUAL,
Collectors.mapping(result -> result.colName, Collectors.toSet())
));
}
private enum ResultType {
FAILURE,
SUCCESS_EQUAL,
SUCCESS_NON_EQUAL,
}
private static class PrefixIndexCheckResult {
public static final PrefixIndexCheckResult FAILURE = new PrefixIndexCheckResult(null, ResultType.FAILURE);
private final String colName;
private final ResultType type;
private PrefixIndexCheckResult(String colName, ResultType result) {
this.colName = colName;
this.type = result;
default:
throw new RuntimeException("Not supported keys type: " + table.getKeysType());
}
public static PrefixIndexCheckResult createEqual(String name) {
return new PrefixIndexCheckResult(name, ResultType.SUCCESS_EQUAL);
}
public static PrefixIndexCheckResult createNonEqual(String name) {
return new PrefixIndexCheckResult(name, ResultType.SUCCESS_NON_EQUAL);
}
}
/**
* Check if an expression could prefix key index.
*/
private static class PredicateChecker extends ExpressionVisitor<PrefixIndexCheckResult, Map<ExprId, String>> {
private static final PredicateChecker INSTANCE = new PredicateChecker();
private PredicateChecker() {
}
public static PrefixIndexCheckResult canUsePrefixIndex(Expression expression,
Map<ExprId, String> exprIdToName) {
return expression.accept(INSTANCE, exprIdToName);
}
@Override
public PrefixIndexCheckResult visit(Expression expr, Map<ExprId, String> context) {
return PrefixIndexCheckResult.FAILURE;
}
@Override
public PrefixIndexCheckResult visitInPredicate(InPredicate in, Map<ExprId, String> context) {
Optional<ExprId> slotOrCastOnSlot = ExpressionUtils.isSlotOrCastOnSlot(in.getCompareExpr());
if (slotOrCastOnSlot.isPresent() && in.getOptions().stream().allMatch(Literal.class::isInstance)) {
return PrefixIndexCheckResult.createEqual(context.get(slotOrCastOnSlot.get()));
} else {
return PrefixIndexCheckResult.FAILURE;
}
}
@Override
public PrefixIndexCheckResult visitComparisonPredicate(ComparisonPredicate cp, Map<ExprId, String> context) {
if (cp instanceof EqualTo || cp instanceof NullSafeEqual) {
return check(cp, context, PrefixIndexCheckResult::createEqual);
} else {
return check(cp, context, PrefixIndexCheckResult::createNonEqual);
}
}
private PrefixIndexCheckResult check(ComparisonPredicate cp, Map<ExprId, String> exprIdToColumnName,
Function<String, PrefixIndexCheckResult> resultMapper) {
return check(cp).map(exprId -> resultMapper.apply(exprIdToColumnName.get(exprId)))
.orElse(PrefixIndexCheckResult.FAILURE);
}
private Optional<ExprId> check(ComparisonPredicate cp) {
Optional<ExprId> exprId = check(cp.left(), cp.right());
return exprId.isPresent() ? exprId : check(cp.right(), cp.left());
}
private Optional<ExprId> check(Expression maybeSlot, Expression maybeConst) {
Optional<ExprId> exprIdOpt = ExpressionUtils.isSlotOrCastOnSlot(maybeSlot);
return exprIdOpt.isPresent() && maybeConst.isConstant() ? exprIdOpt : Optional.empty();
}
List<Long> sortedIndexId = filterAndOrder(checkPreAggResult, scan, requiredScanOutput, predicates);
return Pair.of(preAggStatus, sortedIndexId);
}
/**
@ -434,10 +276,11 @@ public class SelectRollupWithAggregate implements RewriteRuleFactory {
///////////////////////////////////////////////////////////////////////////
private PreAggStatus checkPreAggStatus(
LogicalOlapScan olapScan,
long indexId,
List<Expression> predicates,
List<AggregateFunction> aggregateFuncs,
List<Expression> groupingExprs) {
CheckContext checkContext = new CheckContext(olapScan);
CheckContext checkContext = new CheckContext(olapScan, indexId);
return checkAggregateFunctions(aggregateFuncs, checkContext)
.offOrElse(() -> checkGroupingExprs(groupingExprs, checkContext))
.offOrElse(() -> checkPredicates(predicates, checkContext));
@ -491,6 +334,20 @@ public class SelectRollupWithAggregate implements RewriteRuleFactory {
return checkAggFunc(sum, AggregateType.SUM, extractSlotId(sum.child()), context, false);
}
// TODO: select count(xxx) for duplicated-keys table.
@Override
public PreAggStatus visitCount(Count count, CheckContext context) {
// Now count(distinct key_column) is only supported for aggregate-keys and unique-keys OLAP table.
if (count.isDistinct()) {
Optional<ExprId> exprIdOpt = extractSlotId(count.child(0));
if (exprIdOpt.isPresent() && context.exprIdToKeyColumn.containsKey(exprIdOpt.get())) {
return PreAggStatus.on();
}
}
return PreAggStatus.off(String.format(
"Count distinct is only valid for key columns, but meet %s.", count.toSql()));
}
private PreAggStatus checkAggFunc(
AggregateFunction aggFunc,
AggregateType matchingAggType,
@ -532,18 +389,17 @@ public class SelectRollupWithAggregate implements RewriteRuleFactory {
public final Map<ExprId, Column> exprIdToKeyColumn;
public final Map<ExprId, Column> exprIdToValueColumn;
public CheckContext(LogicalOlapScan scan) {
public CheckContext(LogicalOlapScan scan, long indexId) {
// map<is_key, map<column_name, column>>
Map<Boolean, Map<String, Column>> nameToColumnGroupingByIsKey = scan.getTable().getBaseSchema()
Map<Boolean, Map<String, Column>> nameToColumnGroupingByIsKey
= scan.getTable().getSchemaByIndexId(indexId)
.stream()
.collect(Collectors.groupingBy(
Column::isKey,
Collectors.mapping(Function.identity(),
Collectors.toMap(Column::getName, Function.identity())
)
Collectors.toMap(Column::getName, Function.identity())
));
Map<String, Column> keyNameToColumn = nameToColumnGroupingByIsKey.get(true);
Map<String, Column> valueNameToColumn = nameToColumnGroupingByIsKey.get(false);
Map<String, Column> valueNameToColumn = nameToColumnGroupingByIsKey.getOrDefault(false, ImmutableMap.of());
Map<String, ExprId> nameToExprId = scan.getOutput()
.stream()
.collect(Collectors.toMap(
@ -571,7 +427,7 @@ public class SelectRollupWithAggregate implements RewriteRuleFactory {
private PreAggStatus checkGroupingExprs(
List<Expression> groupingExprs,
CheckContext checkContext) {
return checkHasNoValueTypeColumn(groupingExprs, checkContext,
return disablePreAggIfContainsAnyValueColumn(groupingExprs, checkContext,
"Grouping expression %s contains value column %s");
}
@ -581,14 +437,15 @@ public class SelectRollupWithAggregate implements RewriteRuleFactory {
private PreAggStatus checkPredicates(
List<Expression> predicates,
CheckContext checkContext) {
return checkHasNoValueTypeColumn(predicates, checkContext,
return disablePreAggIfContainsAnyValueColumn(predicates, checkContext,
"Predicate %s contains value column %s");
}
/**
* Check the input expressions have no referenced slot to underlying value type column.
*/
private PreAggStatus checkHasNoValueTypeColumn(List<Expression> exprs, CheckContext ctx, String errorMsg) {
private PreAggStatus disablePreAggIfContainsAnyValueColumn(List<Expression> exprs, CheckContext ctx,
String errorMsg) {
Map<ExprId, Column> exprIdToValueColumn = ctx.exprIdToValueColumn;
return exprs.stream()
.map(expr -> expr.getInputSlots()

View File

@ -0,0 +1,147 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.mv;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* Select materialized index, i.e., both for rollup and materialized view when aggregate is not present.
* <p>
* Scan OLAP table with aggregate is handled in {@link SelectMaterializedIndexWithAggregate}.
* <p>
* Note that we should first apply {@link SelectMaterializedIndexWithAggregate} and then
* {@link SelectMaterializedIndexWithoutAggregate}.
* Besides, these two rules should run in isolated batches, thus when enter this rule, it's guaranteed that there is
* no aggregation on top of the scan.
*/
public class SelectMaterializedIndexWithoutAggregate extends AbstractSelectMaterializedIndexRule
implements RewriteRuleFactory {
@Override
public List<Rule> buildRules() {
return ImmutableList.of(
// project with pushdown filter.
// Project(Filter(Scan))
logicalProject(logicalFilter(logicalOlapScan().when(this::shouldSelectIndex)))
.then(project -> {
LogicalFilter<LogicalOlapScan> filter = project.child();
LogicalOlapScan scan = filter.child();
return project.withChildren(filter.withChildren(
select(scan, project::getInputSlots, filter::getConjuncts)));
}).toRule(RuleType.MATERIALIZED_INDEX_PROJECT_FILTER_SCAN),
// project with filter that cannot be pushdown.
// Filter(Project(Scan))
logicalFilter(logicalProject(logicalOlapScan().when(this::shouldSelectIndex)))
.then(filter -> {
LogicalProject<LogicalOlapScan> project = filter.child();
LogicalOlapScan scan = project.child();
return filter.withChildren(project.withChildren(
select(scan, project::getInputSlots, ImmutableList::of)
));
}).toRule(RuleType.MATERIALIZED_INDEX_FILTER_PROJECT_SCAN),
// scan with filters could be pushdown.
// Filter(Scan)
logicalFilter(logicalOlapScan().when(this::shouldSelectIndex))
.then(filter -> {
LogicalOlapScan scan = filter.child();
return filter.withChildren(select(scan, ImmutableSet::of, filter::getConjuncts));
})
.toRule(RuleType.MATERIALIZED_INDEX_FILTER_SCAN),
// project and scan.
// Project(Scan)
logicalProject(logicalOlapScan().when(this::shouldSelectIndex))
.then(project -> {
LogicalOlapScan scan = project.child();
return project.withChildren(
select(scan, project::getInputSlots, ImmutableList::of));
})
.toRule(RuleType.MATERIALIZED_INDEX_PROJECT_SCAN),
// only scan.
logicalOlapScan()
.when(this::shouldSelectIndex)
.then(scan -> select(scan, scan::getOutputSet, ImmutableList::of))
.toRule(RuleType.MATERIALIZED_INDEX_SCAN)
);
}
/**
* Select materialized index when aggregate node is not present.
*
* @param scan Scan node.
* @param requiredScanOutputSupplier Supplier to get the required scan output.
* @param predicatesSupplier Supplier to get pushdown predicates.
* @return Result scan node.
*/
private LogicalOlapScan select(
LogicalOlapScan scan,
Supplier<Set<Slot>> requiredScanOutputSupplier,
Supplier<List<Expression>> predicatesSupplier) {
switch (scan.getTable().getKeysType()) {
case AGG_KEYS:
case UNIQUE_KEYS:
OlapTable table = scan.getTable();
long baseIndexId = table.getBaseIndexId();
int baseIndexKeySize = table.getKeyColumnsByIndexId(table.getBaseIndexId()).size();
// No aggregate on scan.
// So only base index and indexes that have all the keys could be used.
List<MaterializedIndex> candidates = table.getVisibleIndex().stream()
.filter(index -> index.getId() == baseIndexId
|| table.getKeyColumnsByIndexId(index.getId()).size() == baseIndexKeySize)
.collect(Collectors.toList());
PreAggStatus preAgg = PreAggStatus.off("No aggregate on scan.");
if (candidates.size() == 1) {
// `candidates` only have base index.
return scan.withMaterializedIndexSelected(preAgg, ImmutableList.of(baseIndexId));
} else {
return scan.withMaterializedIndexSelected(preAgg,
filterAndOrder(candidates.stream(), scan, requiredScanOutputSupplier.get(),
predicatesSupplier.get()));
}
case DUP_KEYS:
// Set pre-aggregation to `on` to keep consistency with legacy logic.
return scan.withMaterializedIndexSelected(PreAggStatus.on(),
filterAndOrder(scan.getTable().getVisibleIndex().stream(), scan,
requiredScanOutputSupplier.get(),
predicatesSupplier.get()));
default:
throw new RuntimeException("Not supported keys type: " + scan.getTable().getKeysType());
}
}
}

View File

@ -1,60 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.mv;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import com.google.common.collect.ImmutableList;
/**
* Select rollup index when aggregate is not present.
* <p>
* Scan OLAP table with aggregate is handled in {@link SelectRollupWithAggregate}. This rule is to disable
* pre-aggregation for OLAP scan when there is no aggregate plan.
* <p>
* Note that we should first apply {@link SelectRollupWithAggregate} and then {@link SelectRollupWithoutAggregate}.
* Besides, these two rules should run in isolated batches, thus when enter this rule, it's guaranteed that there is
* no aggregation on top of the scan.
*/
public class SelectRollupWithoutAggregate extends OneRewriteRuleFactory {
@Override
public Rule build() {
return logicalOlapScan()
.whenNot(LogicalOlapScan::isRollupSelected)
.then(this::scanWithoutAggregate)
.toRule(RuleType.ROLLUP_WITH_OUT_AGG);
}
private LogicalOlapScan scanWithoutAggregate(LogicalOlapScan scan) {
switch (scan.getTable().getKeysType()) {
case AGG_KEYS:
case UNIQUE_KEYS:
return scan.withMaterializedIndexSelected(PreAggStatus.off("No aggregate on scan."),
ImmutableList.of(scan.getTable().getBaseIndexId()));
default:
// Set pre-aggregation to `on` to keep consistency with legacy logic.
return scan.withMaterializedIndexSelected(PreAggStatus.on(),
ImmutableList.of(scan.getTable().getBaseIndexId()));
}
}
}

View File

@ -22,7 +22,6 @@ import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.rules.mv.SelectRollupWithAggregate;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
@ -50,7 +49,7 @@ public class LogicalOlapScan extends LogicalRelation {
private final boolean partitionPruned;
private final List<Long> candidateIndexIds;
private final boolean rollupSelected;
private final boolean indexSelected;
private final PreAggStatus preAggStatus;
@ -74,7 +73,7 @@ public class LogicalOlapScan extends LogicalRelation {
public LogicalOlapScan(RelationId id, Table table, List<String> qualifier,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
List<Long> selectedPartitionIdList, boolean partitionPruned, List<Long> candidateIndexIds,
boolean rollupSelected, PreAggStatus preAggStatus) {
boolean indexSelected, PreAggStatus preAggStatus) {
super(id, PlanType.LOGICAL_OLAP_SCAN, table, qualifier,
groupExpression, logicalProperties, selectedPartitionIdList);
// TODO: use CBO manner to select best index id, according to index's statistics info,
@ -87,7 +86,7 @@ public class LogicalOlapScan extends LogicalRelation {
}
this.partitionPruned = partitionPruned;
this.candidateIndexIds = candidateIndexIds;
this.rollupSelected = rollupSelected;
this.indexSelected = indexSelected;
this.preAggStatus = preAggStatus;
}
@ -128,18 +127,18 @@ public class LogicalOlapScan extends LogicalRelation {
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalOlapScan(id, table, qualifier, groupExpression, Optional.of(getLogicalProperties()),
selectedPartitionIds, partitionPruned, candidateIndexIds, rollupSelected, preAggStatus);
selectedPartitionIds, partitionPruned, candidateIndexIds, indexSelected, preAggStatus);
}
@Override
public LogicalOlapScan withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
return new LogicalOlapScan(id, table, qualifier, Optional.empty(), logicalProperties, selectedPartitionIds,
partitionPruned, candidateIndexIds, rollupSelected, preAggStatus);
partitionPruned, candidateIndexIds, indexSelected, preAggStatus);
}
public LogicalOlapScan withSelectedPartitionId(List<Long> selectedPartitionId) {
return new LogicalOlapScan(id, table, qualifier, Optional.empty(), Optional.of(getLogicalProperties()),
selectedPartitionId, true, candidateIndexIds, rollupSelected, preAggStatus);
selectedPartitionId, true, candidateIndexIds, indexSelected, preAggStatus);
}
public LogicalOlapScan withMaterializedIndexSelected(PreAggStatus preAgg, List<Long> candidateIndexIds) {
@ -164,30 +163,17 @@ public class LogicalOlapScan extends LogicalRelation {
return selectedIndexId;
}
public boolean isRollupSelected() {
return rollupSelected;
public boolean isIndexSelected() {
return indexSelected;
}
public PreAggStatus getPreAggStatus() {
return preAggStatus;
}
/**
* Should apply {@link SelectRollupWithAggregate} or not.
*/
public boolean shouldSelectRollup() {
switch (((OlapTable) table).getKeysType()) {
case AGG_KEYS:
case UNIQUE_KEYS:
return !rollupSelected;
default:
return false;
}
}
@VisibleForTesting
public Optional<String> getSelectRollupName() {
return rollupSelected ? Optional.ofNullable(((OlapTable) table).getIndexNameById(selectedIndexId))
public Optional<String> getSelectedMaterializedIndexName() {
return indexSelected ? Optional.ofNullable(((OlapTable) table).getIndexNameById(selectedIndexId))
: Optional.empty();
}
}

View File

@ -1203,4 +1203,9 @@ public class OlapScanNode extends ScanNode {
public String getReasonOfPreAggregation() {
return reasonOfPreAggregation;
}
@VisibleForTesting
public String getSelectedIndexName() {
return olapTable.getIndexNameById(selectedIndexId);
}
}

View File

@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.mv;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.utframe.TestWithFeService;
import org.junit.jupiter.api.Assertions;
import java.util.List;
import java.util.function.Consumer;
/**
* Base class to test selecting materialized index.
*/
public abstract class BaseMaterializedIndexSelectTest extends TestWithFeService {
protected void singleTableTest(String sql, String indexName, boolean preAgg) {
singleTableTest(sql, scan -> {
Assertions.assertEquals(preAgg, scan.isPreAggregation());
Assertions.assertEquals(indexName, scan.getSelectedIndexName());
});
}
protected void singleTableTest(String sql, Consumer<OlapScanNode> scanConsumer) {
PlanChecker.from(connectContext).checkPlannerResult(sql, planner -> {
List<ScanNode> scans = planner.getScanNodes();
Assertions.assertEquals(1, scans.size());
ScanNode scanNode = scans.get(0);
Assertions.assertTrue(scanNode instanceof OlapScanNode);
OlapScanNode olapScan = (OlapScanNode) scanNode;
scanConsumer.accept(olapScan);
});
}
}

File diff suppressed because it is too large Load Diff

View File

@ -21,22 +21,22 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.util.PatternMatchSupported;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.utframe.TestWithFeService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
class SelectRollupIndexTest extends BaseMaterializedIndexSelectTest implements PatternMatchSupported {
class SelectRollupTest extends TestWithFeService implements PatternMatchSupported {
@Override
protected void beforeCreatingConnectContext() throws Exception {
FeConstants.default_scheduler_interval_millisecond = 10;
FeConstants.runningUnitTest = true;
}
@Override
protected void runBeforeAll() throws Exception {
FeConstants.runningUnitTest = true;
createDatabase("test");
connectContext.setDatabase("default_cluster:test");
useDatabase("test");
createTable("CREATE TABLE `t` (\n"
+ " `k1` int(11) NULL,\n"
@ -57,6 +57,26 @@ class SelectRollupTest extends TestWithFeService implements PatternMatchSupporte
// waiting table state to normal
Thread.sleep(500);
addRollup("alter table t add rollup r2(k2, k3, v1)");
addRollup("alter table t add rollup r3(k2)");
addRollup("alter table t add rollup r4(k2, k3)");
createTable("CREATE TABLE `t1` (\n"
+ " `k1` int(11) NULL,\n"
+ " `k2` int(11) NULL,\n"
+ " `v1` int(11) SUM NULL\n"
+ ") ENGINE=OLAP\n"
+ "AGGREGATE KEY(`k1`, `k2`)\n"
+ "COMMENT 'OLAP'\n"
+ "DISTRIBUTED BY HASH(`k1`) BUCKETS 3\n"
+ "PROPERTIES (\n"
+ "\"replication_allocation\" = \"tag.location.default: 1\",\n"
+ "\"in_memory\" = \"false\",\n"
+ "\"storage_format\" = \"V2\",\n"
+ "\"disable_auto_compaction\" = \"false\"\n"
+ ");");
addRollup("alter table t1 add rollup r1(k1)");
addRollup("alter table t1 add rollup r2(k2, v1)");
addRollup("alter table t1 add rollup r3(k1, k2)");
createTable("CREATE TABLE `duplicate_tbl` (\n"
+ " `k1` int(11) NULL,\n"
@ -77,24 +97,17 @@ class SelectRollupTest extends TestWithFeService implements PatternMatchSupporte
@Test
public void testAggMatching() {
PlanChecker.from(connectContext)
.analyze(" select k2, sum(v1) from t group by k2")
.applyTopDown(new SelectRollupWithAggregate())
.matches(logicalOlapScan().when(scan -> {
Assertions.assertTrue(scan.getPreAggStatus().isOn());
Assertions.assertEquals("r1", scan.getSelectRollupName().get());
return true;
}));
singleTableTest("select k2, sum(v1) from t group by k2", "r1", true);
}
@Test
public void testMatchingBase() {
PlanChecker.from(connectContext)
.analyze(" select k1, sum(v1) from t group by k1")
.applyTopDown(new SelectRollupWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.matches(logicalOlapScan().when(scan -> {
Assertions.assertTrue(scan.getPreAggStatus().isOn());
Assertions.assertEquals("t", scan.getSelectRollupName().get());
Assertions.assertEquals("t", scan.getSelectedMaterializedIndexName().get());
return true;
}));
}
@ -103,10 +116,10 @@ class SelectRollupTest extends TestWithFeService implements PatternMatchSupporte
void testAggFilterScan() {
PlanChecker.from(connectContext)
.analyze("select k2, sum(v1) from t where k3=0 group by k2")
.applyTopDown(new SelectRollupWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.matches(logicalOlapScan().when(scan -> {
Assertions.assertTrue(scan.getPreAggStatus().isOn());
Assertions.assertEquals("r2", scan.getSelectRollupName().get());
Assertions.assertEquals("r2", scan.getSelectedMaterializedIndexName().get());
return true;
}));
}
@ -118,29 +131,22 @@ class SelectRollupTest extends TestWithFeService implements PatternMatchSupporte
@Test
public void testTranslateWhenPreAggIsOff() {
PlanChecker.from(connectContext).checkPlannerResult(
"select k2, min(v1) from t group by k2",
planner -> {
List<ScanNode> scans = planner.getScanNodes();
Assertions.assertEquals(1, scans.size());
ScanNode scanNode = scans.get(0);
Assertions.assertTrue(scanNode instanceof OlapScanNode);
OlapScanNode olapScan = (OlapScanNode) scanNode;
Assertions.assertFalse(olapScan.isPreAggregation());
Assertions.assertEquals("Aggregate operator don't match, "
+ "aggregate function: min(v1), column aggregate type: SUM",
olapScan.getReasonOfPreAggregation());
});
singleTableTest("select k2, min(v1) from t group by k2", scan -> {
Assertions.assertFalse(scan.isPreAggregation());
Assertions.assertEquals("Aggregate operator don't match, "
+ "aggregate function: min(v1), column aggregate type: SUM",
scan.getReasonOfPreAggregation());
});
}
@Test
public void testWithEqualFilter() {
PlanChecker.from(connectContext)
.analyze("select k2, sum(v1) from t where k3=0 group by k2")
.applyTopDown(new SelectRollupWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.matches(logicalOlapScan().when(scan -> {
Assertions.assertTrue(scan.getPreAggStatus().isOn());
Assertions.assertEquals("r2", scan.getSelectRollupName().get());
Assertions.assertEquals("r2", scan.getSelectedMaterializedIndexName().get());
return true;
}));
}
@ -149,10 +155,10 @@ class SelectRollupTest extends TestWithFeService implements PatternMatchSupporte
public void testWithNonEqualFilter() {
PlanChecker.from(connectContext)
.analyze("select k2, sum(v1) from t where k3>0 group by k2")
.applyTopDown(new SelectRollupWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.matches(logicalOlapScan().when(scan -> {
Assertions.assertTrue(scan.getPreAggStatus().isOn());
Assertions.assertEquals("r2", scan.getSelectRollupName().get());
Assertions.assertEquals("r2", scan.getSelectedMaterializedIndexName().get());
return true;
}));
}
@ -161,10 +167,10 @@ class SelectRollupTest extends TestWithFeService implements PatternMatchSupporte
public void testWithFilter() {
PlanChecker.from(connectContext)
.analyze("select k2, sum(v1) from t where k2>3 group by k3")
.applyTopDown(new SelectRollupWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.matches(logicalOlapScan().when(scan -> {
Assertions.assertTrue(scan.getPreAggStatus().isOn());
Assertions.assertEquals("r2", scan.getSelectRollupName().get());
Assertions.assertEquals("r2", scan.getSelectedMaterializedIndexName().get());
return true;
}));
}
@ -176,11 +182,11 @@ class SelectRollupTest extends TestWithFeService implements PatternMatchSupporte
+ " where c3>0 group by c2";
PlanChecker.from(connectContext)
.analyze(sql)
.applyTopDown(new SelectRollupWithAggregate())
.applyTopDown(new SelectRollupWithoutAggregate())
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithoutAggregate())
.matches(logicalOlapScan().when(scan -> {
Assertions.assertTrue(scan.getPreAggStatus().isOn());
Assertions.assertEquals("r2", scan.getSelectRollupName().get());
Assertions.assertEquals("r2", scan.getSelectedMaterializedIndexName().get());
return true;
}));
}
@ -193,8 +199,8 @@ class SelectRollupTest extends TestWithFeService implements PatternMatchSupporte
public void testNoAggregate() {
PlanChecker.from(connectContext)
.analyze("select k1, v1 from t")
.applyTopDown(new SelectRollupWithAggregate())
.applyTopDown(new SelectRollupWithoutAggregate())
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithoutAggregate())
.matches(logicalOlapScan().when(scan -> {
PreAggStatus preAgg = scan.getPreAggStatus();
Assertions.assertTrue(preAgg.isOff());
@ -207,8 +213,8 @@ class SelectRollupTest extends TestWithFeService implements PatternMatchSupporte
public void testAggregateTypeNotMatch() {
PlanChecker.from(connectContext)
.analyze("select k1, min(v1) from t group by k1")
.applyTopDown(new SelectRollupWithAggregate())
.applyTopDown(new SelectRollupWithoutAggregate())
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithoutAggregate())
.matches(logicalOlapScan().when(scan -> {
PreAggStatus preAgg = scan.getPreAggStatus();
Assertions.assertTrue(preAgg.isOff());
@ -222,8 +228,8 @@ class SelectRollupTest extends TestWithFeService implements PatternMatchSupporte
public void testInvalidSlotInAggFunction() {
PlanChecker.from(connectContext)
.analyze("select k1, sum(v1 + 1) from t group by k1")
.applyTopDown(new SelectRollupWithAggregate())
.applyTopDown(new SelectRollupWithoutAggregate())
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithoutAggregate())
.matches(logicalOlapScan().when(scan -> {
PreAggStatus preAgg = scan.getPreAggStatus();
Assertions.assertTrue(preAgg.isOff());
@ -237,8 +243,8 @@ class SelectRollupTest extends TestWithFeService implements PatternMatchSupporte
public void testKeyColumnInAggFunction() {
PlanChecker.from(connectContext)
.analyze("select k1, sum(k2) from t group by k1")
.applyTopDown(new SelectRollupWithAggregate())
.applyTopDown(new SelectRollupWithoutAggregate())
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithoutAggregate())
.matches(logicalOlapScan().when(scan -> {
PreAggStatus preAgg = scan.getPreAggStatus();
Assertions.assertTrue(preAgg.isOff());
@ -252,12 +258,12 @@ class SelectRollupTest extends TestWithFeService implements PatternMatchSupporte
public void testMaxCanUseKeyColumn() {
PlanChecker.from(connectContext)
.analyze("select k2, max(k3) from t group by k3")
.applyTopDown(new SelectRollupWithAggregate())
.applyTopDown(new SelectRollupWithoutAggregate())
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithoutAggregate())
.matches(logicalOlapScan().when(scan -> {
PreAggStatus preAgg = scan.getPreAggStatus();
Assertions.assertTrue(preAgg.isOn());
Assertions.assertEquals("r2", scan.getSelectRollupName().get());
Assertions.assertEquals("r4", scan.getSelectedMaterializedIndexName().get());
return true;
}));
}
@ -266,12 +272,12 @@ class SelectRollupTest extends TestWithFeService implements PatternMatchSupporte
public void testMinCanUseKeyColumn() {
PlanChecker.from(connectContext)
.analyze("select k2, min(k3) from t group by k3")
.applyTopDown(new SelectRollupWithAggregate())
.applyTopDown(new SelectRollupWithoutAggregate())
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithoutAggregate())
.matches(logicalOlapScan().when(scan -> {
PreAggStatus preAgg = scan.getPreAggStatus();
Assertions.assertTrue(preAgg.isOn());
Assertions.assertEquals("r2", scan.getSelectRollupName().get());
Assertions.assertEquals("r4", scan.getSelectedMaterializedIndexName().get());
return true;
}));
}
@ -280,8 +286,8 @@ class SelectRollupTest extends TestWithFeService implements PatternMatchSupporte
public void testDuplicatePreAggOn() {
PlanChecker.from(connectContext)
.analyze("select k1, sum(k1) from duplicate_tbl group by k1")
.applyTopDown(new SelectRollupWithAggregate())
.applyTopDown(new SelectRollupWithoutAggregate())
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithoutAggregate())
.matches(logicalOlapScan().when(scan -> {
PreAggStatus preAgg = scan.getPreAggStatus();
Assertions.assertTrue(preAgg.isOn());
@ -293,8 +299,8 @@ class SelectRollupTest extends TestWithFeService implements PatternMatchSupporte
public void testDuplicatePreAggOnEvenWithoutAggregate() {
PlanChecker.from(connectContext)
.analyze("select k1, v1 from duplicate_tbl")
.applyTopDown(new SelectRollupWithAggregate())
.applyTopDown(new SelectRollupWithoutAggregate())
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithoutAggregate())
.matches(logicalOlapScan().when(scan -> {
PreAggStatus preAgg = scan.getPreAggStatus();
Assertions.assertTrue(preAgg.isOn());
@ -302,4 +308,71 @@ class SelectRollupTest extends TestWithFeService implements PatternMatchSupporte
}));
}
@Test
public void testKeysOnlyQuery() throws Exception {
singleTableTest("select k1 from t1", "r3", false);
singleTableTest("select k2 from t1", "r3", false);
singleTableTest("select k1, k2 from t1", "r3", false);
singleTableTest("select k1 from t1 group by k1", "r1", true);
singleTableTest("select k2 from t1 group by k2", "r2", true);
singleTableTest("select k1, k2 from t1 group by k1, k2", "r3", true);
}
/**
* Rollup with all the keys should be used.
*/
@Test
public void testRollupWithAllTheKeys() throws Exception {
createTable(" CREATE TABLE `t4` (\n"
+ " `k1` int(11) NULL,\n"
+ " `k2` int(11) NULL,\n"
+ " `v1` int(11) SUM NULL,\n"
+ " `v2` int(11) SUM NULL\n"
+ ") ENGINE=OLAP\n"
+ "AGGREGATE KEY(`k1`, `k2`)\n"
+ "COMMENT 'OLAP'\n"
+ "DISTRIBUTED BY HASH(`k1`) BUCKETS 3\n"
+ "PROPERTIES (\n"
+ "\"replication_allocation\" = \"tag.location.default: 1\",\n"
+ "\"in_memory\" = \"false\",\n"
+ "\"storage_format\" = \"V2\",\n"
+ "\"disable_auto_compaction\" = \"false\"\n"
+ ");");
addRollup("alter table t4 add rollup r1(k1, k2, v1)");
singleTableTest("select k1, k2, v1 from t4", "r1", false);
singleTableTest("select k1, k2, sum(v1) from t4 group by k1, k2", "r1", true);
singleTableTest("select k1, v1 from t4", "r1", false);
singleTableTest("select k1, sum(v1) from t4 group by k1", "r1", true);
}
@Test
public void testComplexGroupingExpr() throws Exception {
singleTableTest("select k2 + 1, sum(v1) from t group by k2 + 1", "r1", true);
}
@Test
public void testCountDistinctKeyColumn() {
singleTableTest("select k2, count(distinct k3) from t group by k2", "r4", true);
}
@Test
public void testCountDistinctValueColumn() {
singleTableTest("select k1, count(distinct v1) from from t group by k1", scan -> {
Assertions.assertFalse(scan.isPreAggregation());
Assertions.assertEquals("Count distinct is only valid for key columns, but meet count(distinct v1).",
scan.getReasonOfPreAggregation());
Assertions.assertEquals("t", scan.getSelectedIndexName());
});
}
@Test
public void testOnlyValueColumn1() throws Exception {
singleTableTest("select sum(v1) from t", "r1", true);
}
@Test
public void testOnlyValueColumn2() throws Exception {
singleTableTest("select v1 from t", "t", false);
}
}

View File

@ -22,6 +22,7 @@ import org.apache.doris.analysis.AlterSqlBlockRuleStmt;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.CreatePolicyStmt;
import org.apache.doris.analysis.CreateSqlBlockRuleStmt;
import org.apache.doris.analysis.CreateTableAsSelectStmt;
@ -29,6 +30,7 @@ import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.CreateViewStmt;
import org.apache.doris.analysis.DropPolicyStmt;
import org.apache.doris.analysis.DropSqlBlockRuleStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.ExplainOptions;
import org.apache.doris.analysis.ShowCreateTableStmt;
import org.apache.doris.analysis.SqlParser;
@ -73,7 +75,6 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@ -116,6 +117,7 @@ public abstract class TestWithFeService {
@BeforeAll
public final void beforeAll() throws Exception {
beforeCreatingConnectContext();
connectContext = createDefaultCtx();
createDorisCluster();
runBeforeAll();
@ -134,6 +136,10 @@ public abstract class TestWithFeService {
runBeforeEach();
}
protected void beforeCreatingConnectContext() throws Exception {
}
protected void runBeforeAll() throws Exception {
}
@ -454,6 +460,12 @@ public abstract class TestWithFeService {
createTables(sql);
}
public void dropTable(String table, boolean force) throws Exception {
DropTableStmt dropTableStmt = (DropTableStmt) parseAndAnalyzeStmt(
"drop table " + table + (force ? " force" : "") + ";", connectContext);
Env.getCurrentEnv().dropTable(dropTableStmt);
}
public void createTableAsSelect(String sql) throws Exception {
CreateTableAsSelectStmt createTableAsSelectStmt = (CreateTableAsSelectStmt) parseAndAnalyzeStmt(sql);
Env.getCurrentEnv().createTableAsSelect(createTableAsSelectStmt);
@ -524,7 +536,16 @@ public abstract class TestWithFeService {
Thread.sleep(100);
}
private void checkAlterJob() throws InterruptedException, MetaNotFoundException {
protected void createMv(String sql) throws Exception {
CreateMaterializedViewStmt createMaterializedViewStmt =
(CreateMaterializedViewStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
Env.getCurrentEnv().createMaterializedView(createMaterializedViewStmt);
checkAlterJob();
// waiting table state to normal
Thread.sleep(100);
}
private void checkAlterJob() throws InterruptedException {
// check alter job
Map<Long, AlterJobV2> alterJobs = Env.getCurrentEnv().getMaterializedViewHandler().getAlterJobsV2();
for (AlterJobV2 alterJobV2 : alterJobs.values()) {
@ -534,17 +555,23 @@ public abstract class TestWithFeService {
Thread.sleep(100);
}
System.out.println("alter job " + alterJobV2.getDbId() + " is done. state: " + alterJobV2.getJobState());
Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState());
Assertions.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState());
// Add table state check in case of below Exception:
// there is still a short gap between "job finish" and "table become normal",
// so if user send next alter job right after the "job finish",
// it may encounter "table's state not NORMAL" error.
Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException(alterJobV2.getDbId());
OlapTable tbl = (OlapTable) db.getTableOrMetaException(alterJobV2.getTableId(), Table.TableType.OLAP);
while (tbl.getState() != OlapTable.OlapTableState.NORMAL) {
Thread.sleep(1000);
try {
// Add table state check in case of below Exception:
// there is still a short gap between "job finish" and "table become normal",
// so if user send next alter job right after the "job finish",
// it may encounter "table's state not NORMAL" error.
Database db =
Env.getCurrentInternalCatalog().getDbOrMetaException(alterJobV2.getDbId());
OlapTable tbl = (OlapTable) db.getTableOrMetaException(alterJobV2.getTableId(), Table.TableType.OLAP);
while (tbl.getState() != OlapTable.OlapTableState.NORMAL) {
Thread.sleep(1000);
}
} catch (MetaNotFoundException e) {
// Sometimes table could be dropped by tests, but the corresponding alter job is not deleted yet.
// Ignore this error.
System.out.println(e.getMessage());
}
}
}