From 4d1f3b8abff7bd39d5ecf866f965d8ed6c9764c1 Mon Sep 17 00:00:00 2001 From: morrySnow <101034200+morrySnow@users.noreply.github.com> Date: Sun, 26 Feb 2023 22:52:50 +0800 Subject: [PATCH] [fix](Nereids) mow unique table's preagg should work like duplicate table (#17028) --- .../org/apache/doris/catalog/OlapTable.java | 5 + .../SelectMaterializedIndexWithAggregate.java | 150 +++++++++--------- ...lectMaterializedIndexWithoutAggregate.java | 70 ++++---- .../apache/doris/planner/OlapScanNode.java | 8 +- 4 files changed, 118 insertions(+), 115 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index face5c1185..7d7351b71c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -2031,4 +2031,9 @@ public class OlapTable extends Table { return idToPartition.keySet(); } + public boolean isDupKeysOrMergeOnWrite() { + return getKeysType() == KeysType.DUP_KEYS + || (getKeysType() == KeysType.UNIQUE_KEYS + && getEnableUniqueKeyMergeOnWrite()); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/mv/SelectMaterializedIndexWithAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/mv/SelectMaterializedIndexWithAggregate.java index 27143b9864..78dc66aa7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/mv/SelectMaterializedIndexWithAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/mv/SelectMaterializedIndexWithAggregate.java @@ -487,84 +487,84 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial Preconditions.checkArgument(scan.getOutputSet().containsAll(nonVirtualRequiredScanOutput), String.format("Scan's output (%s) should contains all the input required scan output (%s).", scan.getOutput(), nonVirtualRequiredScanOutput)); - OlapTable table = scan.getTable(); - - switch (table.getKeysType()) { + switch (scan.getTable().getKeysType()) { case AGG_KEYS: - case UNIQUE_KEYS: { - final PreAggStatus preAggStatus; - if (preAggEnabledByHint(scan)) { - // PreAggStatus could be enabled by pre-aggregation hint for agg-keys and unique-keys. - preAggStatus = PreAggStatus.on(); - } else { - // Only checking pre-aggregation status by base index is enough for aggregate-keys and - // unique-keys OLAP table. - // Because the schemas in non-base materialized index are subsets of the schema of base index. - preAggStatus = checkPreAggStatus(scan, table.getBaseIndexId(), predicates, - aggregateFunctions, groupingExprs); - } - if (preAggStatus.isOff()) { - // return early if pre agg status if off. - return new SelectResult(preAggStatus, scan.getTable().getBaseIndexId(), new ExprRewriteMap()); - } else { - List rollupsWithAllRequiredCols = table.getVisibleIndex().stream() - .filter(index -> containAllRequiredColumns(index, scan, nonVirtualRequiredScanOutput)) - .collect(Collectors.toList()); - return new SelectResult(preAggStatus, selectBestIndex(rollupsWithAllRequiredCols, scan, predicates), - new ExprRewriteMap()); - } - } - case DUP_KEYS: { - Map> indexesGroupByIsBaseOrNot = table.getVisibleIndex() - .stream() - .collect(Collectors.groupingBy(index -> index.getId() == table.getBaseIndexId())); - - // Duplicate-keys table could use base index and indexes that pre-aggregation status is on. - Set candidatesWithoutRewriting = Stream.concat( - indexesGroupByIsBaseOrNot.get(true).stream(), - indexesGroupByIsBaseOrNot.getOrDefault(false, ImmutableList.of()) - .stream() - .filter(index -> checkPreAggStatus(scan, index.getId(), predicates, - aggregateFunctions, groupingExprs).isOn()) - ).collect(ImmutableSet.toImmutableSet()); - - // try to rewrite bitmap, hll by materialized index columns. - List candidatesWithRewriting = indexesGroupByIsBaseOrNot.getOrDefault(false, - ImmutableList.of()) - .stream() - .filter(index -> !candidatesWithoutRewriting.contains(index)) - .map(index -> rewriteAgg(index, scan, nonVirtualRequiredScanOutput, predicates, - aggregateFunctions, - groupingExprs)) - .filter(aggRewriteResult -> checkPreAggStatus(scan, aggRewriteResult.index.getId(), - predicates, - // check pre-agg status of aggregate function that couldn't rewrite. - aggFuncsDiff(aggregateFunctions, aggRewriteResult), - groupingExprs).isOn()) - .filter(result -> result.success) - .collect(Collectors.toList()); - - List haveAllRequiredColumns = Streams.concat( - candidatesWithoutRewriting.stream() - .filter(index -> containAllRequiredColumns(index, scan, nonVirtualRequiredScanOutput)), - candidatesWithRewriting - .stream() - .filter(aggRewriteResult -> containAllRequiredColumns(aggRewriteResult.index, scan, - aggRewriteResult.requiredScanOutput)) - .map(aggRewriteResult -> aggRewriteResult.index) - ).collect(Collectors.toList()); - - long selectIndexId = selectBestIndex(haveAllRequiredColumns, scan, predicates); - Optional rewriteResultOpt = candidatesWithRewriting.stream() - .filter(aggRewriteResult -> aggRewriteResult.index.getId() == selectIndexId) - .findAny(); - // Pre-aggregation is set to `on` by default for duplicate-keys table. - return new SelectResult(PreAggStatus.on(), selectIndexId, - rewriteResultOpt.map(r -> r.exprRewriteMap).orElse(new ExprRewriteMap())); - } + case UNIQUE_KEYS: + case DUP_KEYS: + break; default: - throw new RuntimeException("Not supported keys type: " + table.getKeysType()); + throw new RuntimeException("Not supported keys type: " + scan.getTable().getKeysType()); + } + if (table.isDupKeysOrMergeOnWrite()) { + Map> indexesGroupByIsBaseOrNot = table.getVisibleIndex() + .stream() + .collect(Collectors.groupingBy(index -> index.getId() == table.getBaseIndexId())); + + // Duplicate-keys table could use base index and indexes that pre-aggregation status is on. + Set candidatesWithoutRewriting = Stream.concat( + indexesGroupByIsBaseOrNot.get(true).stream(), + indexesGroupByIsBaseOrNot.getOrDefault(false, ImmutableList.of()) + .stream() + .filter(index -> checkPreAggStatus(scan, index.getId(), predicates, + aggregateFunctions, groupingExprs).isOn()) + ).collect(ImmutableSet.toImmutableSet()); + + // try to rewrite bitmap, hll by materialized index columns. + List candidatesWithRewriting = indexesGroupByIsBaseOrNot.getOrDefault(false, + ImmutableList.of()) + .stream() + .filter(index -> !candidatesWithoutRewriting.contains(index)) + .map(index -> rewriteAgg(index, scan, nonVirtualRequiredScanOutput, predicates, + aggregateFunctions, + groupingExprs)) + .filter(aggRewriteResult -> checkPreAggStatus(scan, aggRewriteResult.index.getId(), + predicates, + // check pre-agg status of aggregate function that couldn't rewrite. + aggFuncsDiff(aggregateFunctions, aggRewriteResult), + groupingExprs).isOn()) + .filter(result -> result.success) + .collect(Collectors.toList()); + + List haveAllRequiredColumns = Streams.concat( + candidatesWithoutRewriting.stream() + .filter(index -> containAllRequiredColumns(index, scan, nonVirtualRequiredScanOutput)), + candidatesWithRewriting + .stream() + .filter(aggRewriteResult -> containAllRequiredColumns(aggRewriteResult.index, scan, + aggRewriteResult.requiredScanOutput)) + .map(aggRewriteResult -> aggRewriteResult.index) + ).collect(Collectors.toList()); + + long selectIndexId = selectBestIndex(haveAllRequiredColumns, scan, predicates); + Optional rewriteResultOpt = candidatesWithRewriting.stream() + .filter(aggRewriteResult -> aggRewriteResult.index.getId() == selectIndexId) + .findAny(); + // Pre-aggregation is set to `on` by default for duplicate-keys table. + return new SelectResult(PreAggStatus.on(), selectIndexId, + rewriteResultOpt.map(r -> r.exprRewriteMap).orElse(new ExprRewriteMap())); + } else { + final PreAggStatus preAggStatus; + if (preAggEnabledByHint(scan)) { + // PreAggStatus could be enabled by pre-aggregation hint for agg-keys and unique-keys. + preAggStatus = PreAggStatus.on(); + } else { + // Only checking pre-aggregation status by base index is enough for aggregate-keys and + // unique-keys OLAP table. + // Because the schemas in non-base materialized index are subsets of the schema of base index. + preAggStatus = checkPreAggStatus(scan, table.getBaseIndexId(), predicates, + aggregateFunctions, groupingExprs); + } + if (preAggStatus.isOff()) { + // return early if pre agg status if off. + return new SelectResult(preAggStatus, scan.getTable().getBaseIndexId(), new ExprRewriteMap()); + } else { + List rollupsWithAllRequiredCols = table.getVisibleIndex().stream() + .filter(index -> containAllRequiredColumns(index, scan, nonVirtualRequiredScanOutput)) + .collect(Collectors.toList()); + return new SelectResult(preAggStatus, selectBestIndex(rollupsWithAllRequiredCols, scan, predicates), + new ExprRewriteMap()); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/mv/SelectMaterializedIndexWithoutAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/mv/SelectMaterializedIndexWithoutAggregate.java index 3c30590a34..7134bd264a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/mv/SelectMaterializedIndexWithoutAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/mv/SelectMaterializedIndexWithoutAggregate.java @@ -117,43 +117,47 @@ public class SelectMaterializedIndexWithoutAggregate extends AbstractSelectMater switch (scan.getTable().getKeysType()) { case AGG_KEYS: case UNIQUE_KEYS: - OlapTable table = scan.getTable(); - long baseIndexId = table.getBaseIndexId(); - int baseIndexKeySize = table.getKeyColumnsByIndexId(table.getBaseIndexId()).size(); - // No aggregate on scan. - // So only base index and indexes that have all the keys could be used. - List candidates = table.getVisibleIndex().stream() - .filter(index -> index.getId() == baseIndexId - || table.getKeyColumnsByIndexId(index.getId()).size() == baseIndexKeySize) - .filter(index -> containAllRequiredColumns(index, scan, requiredScanOutputSupplier.get())) - .collect(Collectors.toList()); - - final PreAggStatus preAggStatus; - if (preAggEnabledByHint(scan)) { - // PreAggStatus could be enabled by pre-aggregation hint for agg-keys and unique-keys. - preAggStatus = PreAggStatus.on(); - } else { - preAggStatus = PreAggStatus.off("No aggregate on scan."); - } - if (candidates.size() == 1) { - // `candidates` only have base index. - return scan.withMaterializedIndexSelected(preAggStatus, baseIndexId); - } else { - return scan.withMaterializedIndexSelected(preAggStatus, - selectBestIndex(candidates, scan, predicatesSupplier.get())); - } case DUP_KEYS: - // Set pre-aggregation to `on` to keep consistency with legacy logic. - List candidate = scan.getTable().getVisibleIndex().stream() - .filter(index -> !indexHasAggregate(index, scan)) - .filter(index -> containAllRequiredColumns(index, scan, - requiredScanOutputSupplier.get())) - .collect(Collectors.toList()); - return scan.withMaterializedIndexSelected(PreAggStatus.on(), - selectBestIndex(candidate, scan, predicatesSupplier.get())); + break; default: throw new RuntimeException("Not supported keys type: " + scan.getTable().getKeysType()); } + if (scan.getTable().isDupKeysOrMergeOnWrite()) { + // Set pre-aggregation to `on` to keep consistency with legacy logic. + List candidate = scan.getTable().getVisibleIndex().stream() + .filter(index -> !indexHasAggregate(index, scan)) + .filter(index -> containAllRequiredColumns(index, scan, + requiredScanOutputSupplier.get())) + .collect(Collectors.toList()); + return scan.withMaterializedIndexSelected(PreAggStatus.on(), + selectBestIndex(candidate, scan, predicatesSupplier.get())); + } else { + OlapTable table = scan.getTable(); + long baseIndexId = table.getBaseIndexId(); + int baseIndexKeySize = table.getKeyColumnsByIndexId(table.getBaseIndexId()).size(); + // No aggregate on scan. + // So only base index and indexes that have all the keys could be used. + List candidates = table.getVisibleIndex().stream() + .filter(index -> index.getId() == baseIndexId + || table.getKeyColumnsByIndexId(index.getId()).size() == baseIndexKeySize) + .filter(index -> containAllRequiredColumns(index, scan, requiredScanOutputSupplier.get())) + .collect(Collectors.toList()); + + final PreAggStatus preAggStatus; + if (preAggEnabledByHint(scan)) { + // PreAggStatus could be enabled by pre-aggregation hint for agg-keys and unique-keys. + preAggStatus = PreAggStatus.on(); + } else { + preAggStatus = PreAggStatus.off("No aggregate on scan."); + } + if (candidates.size() == 1) { + // `candidates` only have base index. + return scan.withMaterializedIndexSelected(preAggStatus, baseIndexId); + } else { + return scan.withMaterializedIndexSelected(preAggStatus, + selectBestIndex(candidates, scan, predicatesSupplier.get())); + } + } } private boolean indexHasAggregate(MaterializedIndex index, LogicalOlapScan scan) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 70ea2e9044..1baa1a8cf6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -469,13 +469,7 @@ public class OlapScanNode extends ScanNode { } public boolean isDupKeysOrMergeOnWrite() { - if (olapTable.getKeysType() == KeysType.DUP_KEYS - || (olapTable.getKeysType() == KeysType.UNIQUE_KEYS - && olapTable.getEnableUniqueKeyMergeOnWrite())) { - return true; - } else { - return false; - } + return olapTable.isDupKeysOrMergeOnWrite(); } @Override