[Bug](materialized-view) enable rewrite on select materialized index with aggregate mode (#24691)
enable rewrite on select materialized index with aggregate mode
This commit is contained in:
@ -30,6 +30,7 @@ import org.apache.doris.nereids.parser.NereidsParser;
|
||||
import org.apache.doris.nereids.rules.Rule;
|
||||
import org.apache.doris.nereids.rules.RuleType;
|
||||
import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory;
|
||||
import org.apache.doris.nereids.rules.rewrite.mv.AbstractSelectMaterializedIndexRule.SlotContext;
|
||||
import org.apache.doris.nereids.trees.expressions.Alias;
|
||||
import org.apache.doris.nereids.trees.expressions.Cast;
|
||||
import org.apache.doris.nereids.trees.expressions.ExprId;
|
||||
@ -65,6 +66,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
|
||||
import org.apache.doris.nereids.types.BigIntType;
|
||||
import org.apache.doris.nereids.types.DataType;
|
||||
import org.apache.doris.nereids.types.VarcharType;
|
||||
import org.apache.doris.nereids.util.ExpressionUtils;
|
||||
import org.apache.doris.planner.PlanNode;
|
||||
@ -657,12 +659,8 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
|
||||
* 2. filter indexes that have all the required columns.
|
||||
* 3. select best index from all the candidate indexes that could use.
|
||||
*/
|
||||
private SelectResult select(
|
||||
LogicalOlapScan scan,
|
||||
Set<Slot> requiredScanOutput,
|
||||
Set<Expression> predicates,
|
||||
List<AggregateFunction> aggregateFunctions,
|
||||
List<Expression> groupingExprs,
|
||||
private SelectResult select(LogicalOlapScan scan, Set<Slot> requiredScanOutput, Set<Expression> predicates,
|
||||
List<AggregateFunction> aggregateFunctions, List<Expression> groupingExprs,
|
||||
Set<? extends Expression> requiredExpr) {
|
||||
// remove virtual slot for grouping sets.
|
||||
Set<Slot> nonVirtualRequiredScanOutput = requiredScanOutput.stream()
|
||||
@ -677,105 +675,57 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
|
||||
}
|
||||
|
||||
OlapTable table = scan.getTable();
|
||||
switch (scan.getTable().getKeysType()) {
|
||||
case AGG_KEYS:
|
||||
case UNIQUE_KEYS:
|
||||
case DUP_KEYS:
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Not supported keys type: " + scan.getTable().getKeysType());
|
||||
}
|
||||
|
||||
Map<Boolean, List<MaterializedIndex>> indexesGroupByIsBaseOrNot = table.getVisibleIndex()
|
||||
.stream()
|
||||
.collect(Collectors.groupingBy(index -> index.getId() == table.getBaseIndexId()));
|
||||
if (table.isDupKeysOrMergeOnWrite()) {
|
||||
// Duplicate-keys table could use base index and indexes that pre-aggregation status is on.
|
||||
Set<MaterializedIndex> candidatesWithoutRewriting =
|
||||
indexesGroupByIsBaseOrNot.getOrDefault(false, ImmutableList.of())
|
||||
.stream()
|
||||
.filter(index -> checkPreAggStatus(scan, index.getId(), predicates,
|
||||
aggregateFunctions, groupingExprs).isOn())
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
// try to rewrite bitmap, hll by materialized index columns.
|
||||
List<AggRewriteResult> candidatesWithRewriting = indexesGroupByIsBaseOrNot.getOrDefault(false,
|
||||
ImmutableList.of())
|
||||
.stream()
|
||||
.filter(index -> !candidatesWithoutRewriting.contains(index))
|
||||
.map(index -> rewriteAgg(index, scan, nonVirtualRequiredScanOutput, predicates,
|
||||
aggregateFunctions,
|
||||
groupingExprs))
|
||||
.filter(aggRewriteResult -> checkPreAggStatus(scan, aggRewriteResult.index.getId(),
|
||||
predicates,
|
||||
// check pre-agg status of aggregate function that couldn't rewrite.
|
||||
aggFuncsDiff(aggregateFunctions, aggRewriteResult),
|
||||
groupingExprs).isOn())
|
||||
.filter(result -> result.success)
|
||||
.collect(Collectors.toList());
|
||||
Set<MaterializedIndex> candidatesWithoutRewriting = indexesGroupByIsBaseOrNot
|
||||
.getOrDefault(false, ImmutableList.of()).stream()
|
||||
.filter(index -> preAggEnabledByHint(scan)
|
||||
|| checkPreAggStatus(scan, index.getId(), predicates, aggregateFunctions, groupingExprs).isOn())
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
List<MaterializedIndex> haveAllRequiredColumns = Streams.concat(
|
||||
candidatesWithoutRewriting.stream()
|
||||
.filter(index -> containAllRequiredColumns(index, scan, nonVirtualRequiredScanOutput,
|
||||
requiredExpr, predicates)),
|
||||
candidatesWithRewriting.stream()
|
||||
.filter(aggRewriteResult -> containAllRequiredColumns(aggRewriteResult.index, scan,
|
||||
aggRewriteResult.requiredScanOutput,
|
||||
requiredExpr.stream().map(e -> aggRewriteResult.exprRewriteMap.replaceAgg(e))
|
||||
.collect(Collectors.toSet()),
|
||||
predicates))
|
||||
.map(aggRewriteResult -> aggRewriteResult.index))
|
||||
.collect(Collectors.toList());
|
||||
// try to rewrite bitmap, hll by materialized index columns.
|
||||
List<AggRewriteResult> candidatesWithRewriting = indexesGroupByIsBaseOrNot
|
||||
.getOrDefault(false, ImmutableList.of()).stream()
|
||||
.filter(index -> !candidatesWithoutRewriting.contains(index))
|
||||
.map(index -> rewriteAgg(index, scan, nonVirtualRequiredScanOutput, predicates, aggregateFunctions,
|
||||
groupingExprs))
|
||||
.filter(aggRewriteResult -> checkPreAggStatus(scan, aggRewriteResult.index.getId(), predicates,
|
||||
// check pre-agg status of aggregate function that couldn't rewrite.
|
||||
aggFuncsDiff(aggregateFunctions, aggRewriteResult), groupingExprs).isOn())
|
||||
.filter(result -> result.success).collect(Collectors.toList());
|
||||
|
||||
long selectIndexId = selectBestIndex(haveAllRequiredColumns, scan, predicates);
|
||||
Optional<AggRewriteResult> rewriteResultOpt = candidatesWithRewriting.stream()
|
||||
.filter(aggRewriteResult -> aggRewriteResult.index.getId() == selectIndexId)
|
||||
.findAny();
|
||||
// Pre-aggregation is set to `on` by default for duplicate-keys table.
|
||||
return new SelectResult(PreAggStatus.on(), selectIndexId,
|
||||
rewriteResultOpt.map(r -> r.exprRewriteMap).orElse(new ExprRewriteMap()));
|
||||
} else {
|
||||
if (scan.getPreAggStatus().isOff()) {
|
||||
return new SelectResult(scan.getPreAggStatus(),
|
||||
scan.getTable().getBaseIndexId(), new ExprRewriteMap());
|
||||
}
|
||||
|
||||
Set<MaterializedIndex> candidatesWithoutRewriting = new HashSet<>();
|
||||
|
||||
for (MaterializedIndex index : indexesGroupByIsBaseOrNot.getOrDefault(false, ImmutableList.of())) {
|
||||
final PreAggStatus preAggStatus;
|
||||
if (preAggEnabledByHint(scan)) {
|
||||
preAggStatus = PreAggStatus.on();
|
||||
} else {
|
||||
preAggStatus = checkPreAggStatus(scan, index.getId(), predicates,
|
||||
aggregateFunctions, groupingExprs);
|
||||
}
|
||||
|
||||
if (preAggStatus.isOn()) {
|
||||
candidatesWithoutRewriting.add(index);
|
||||
}
|
||||
}
|
||||
SelectResult baseIndexSelectResult = new SelectResult(
|
||||
checkPreAggStatus(scan, scan.getTable().getBaseIndexId(),
|
||||
predicates, aggregateFunctions, groupingExprs),
|
||||
scan.getTable().getBaseIndexId(), new ExprRewriteMap());
|
||||
if (candidatesWithoutRewriting.isEmpty()) {
|
||||
// return early if pre agg status if off.
|
||||
return baseIndexSelectResult;
|
||||
} else {
|
||||
List<MaterializedIndex> rollupsWithAllRequiredCols =
|
||||
Stream.concat(candidatesWithoutRewriting.stream(), indexesGroupByIsBaseOrNot.get(true).stream())
|
||||
.filter(index -> containAllRequiredColumns(index, scan, nonVirtualRequiredScanOutput,
|
||||
requiredExpr, predicates))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
long selectedIndex = selectBestIndex(rollupsWithAllRequiredCols, scan, predicates);
|
||||
if (selectedIndex == scan.getTable().getBaseIndexId()) {
|
||||
return baseIndexSelectResult;
|
||||
}
|
||||
return new SelectResult(PreAggStatus.on(), selectedIndex, new ExprRewriteMap());
|
||||
List<MaterializedIndex> haveAllRequiredColumns = Streams.concat(
|
||||
candidatesWithoutRewriting.stream()
|
||||
.filter(index -> containAllRequiredColumns(index, scan, nonVirtualRequiredScanOutput,
|
||||
requiredExpr, predicates)),
|
||||
candidatesWithRewriting.stream()
|
||||
.filter(aggRewriteResult -> containAllRequiredColumns(aggRewriteResult.index, scan,
|
||||
aggRewriteResult.requiredScanOutput,
|
||||
requiredExpr.stream().map(e -> aggRewriteResult.exprRewriteMap.replaceAgg(e))
|
||||
.collect(Collectors.toSet()),
|
||||
predicates))
|
||||
.map(aggRewriteResult -> aggRewriteResult.index))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
long selectIndexId = selectBestIndex(haveAllRequiredColumns, scan, predicates);
|
||||
// Pre-aggregation is set to `on` by default for duplicate-keys table.
|
||||
// In other cases where mv is not hit, preagg may turn off from on.
|
||||
if (!table.isDupKeysOrMergeOnWrite() && (new CheckContext(scan, selectIndexId)).isBaseIndex()) {
|
||||
PreAggStatus preagg = scan.getPreAggStatus();
|
||||
if (preagg.isOn()) {
|
||||
preagg = checkPreAggStatus(scan, scan.getTable().getBaseIndexId(), predicates, aggregateFunctions,
|
||||
groupingExprs);
|
||||
}
|
||||
return new SelectResult(preagg, selectIndexId, new ExprRewriteMap());
|
||||
}
|
||||
|
||||
Optional<AggRewriteResult> rewriteResultOpt = candidatesWithRewriting.stream()
|
||||
.filter(aggRewriteResult -> aggRewriteResult.index.getId() == selectIndexId).findAny();
|
||||
return new SelectResult(PreAggStatus.on(), selectIndexId,
|
||||
rewriteResultOpt.map(r -> r.exprRewriteMap).orElse(new ExprRewriteMap()));
|
||||
}
|
||||
|
||||
private List<AggregateFunction> aggFuncsDiff(List<AggregateFunction> aggregateFunctions,
|
||||
@ -1191,6 +1141,13 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
|
||||
}
|
||||
}
|
||||
|
||||
private static Expression castIfNeed(Expression expr, DataType targetType) {
|
||||
if (expr.getDataType().equals(targetType)) {
|
||||
return expr;
|
||||
}
|
||||
return new Cast(expr, targetType);
|
||||
}
|
||||
|
||||
private static class AggFuncRewriter extends DefaultExpressionRewriter<RewriteContext> {
|
||||
public static final AggFuncRewriter INSTANCE = new AggFuncRewriter();
|
||||
|
||||
@ -1212,7 +1169,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
|
||||
// count(distinct col) -> bitmap_union_count(mv_bitmap_union_col)
|
||||
Optional<Slot> slotOpt = ExpressionUtils.extractSlotOrCastOnSlot(count.child(0));
|
||||
|
||||
Expression expr = new ToBitmapWithCheck(new Cast(count.child(0), BigIntType.INSTANCE));
|
||||
Expression expr = new ToBitmapWithCheck(castIfNeed(count.child(0), BigIntType.INSTANCE));
|
||||
// count distinct a value column.
|
||||
if (slotOpt.isPresent() && !context.checkContext.keyNameToColumn.containsKey(
|
||||
normalizeName(expr.toSql()))) {
|
||||
@ -1425,7 +1382,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
|
||||
// ndv on a value column.
|
||||
if (slotOpt.isPresent() && !context.checkContext.keyNameToColumn.containsKey(
|
||||
normalizeName(slotOpt.get().toSql()))) {
|
||||
Expression expr = new Cast(ndv.child(), VarcharType.SYSTEM_DEFAULT);
|
||||
Expression expr = castIfNeed(ndv.child(), VarcharType.SYSTEM_DEFAULT);
|
||||
String hllUnionColumn = normalizeName(
|
||||
CreateMaterializedViewStmt.mvColumnBuilder(AggregateType.HLL_UNION,
|
||||
CreateMaterializedViewStmt.mvColumnBuilder(new HllHash(expr).toSql())));
|
||||
@ -1459,7 +1416,7 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
|
||||
Optional<Slot> slotOpt = ExpressionUtils.extractSlotOrCastOnSlot(sum.child(0));
|
||||
if (!sum.isDistinct() && slotOpt.isPresent()
|
||||
&& !context.checkContext.keyNameToColumn.containsKey(normalizeName(slotOpt.get().toSql()))) {
|
||||
Expression expr = new Cast(sum.child(), BigIntType.INSTANCE);
|
||||
Expression expr = castIfNeed(sum.child(), BigIntType.INSTANCE);
|
||||
String sumColumn = normalizeName(CreateMaterializedViewStmt.mvColumnBuilder(AggregateType.SUM,
|
||||
CreateMaterializedViewStmt.mvColumnBuilder(expr.toSql())));
|
||||
Column mvColumn = context.checkContext.getColumn(sumColumn);
|
||||
|
||||
Reference in New Issue
Block a user