[fix](Nereids) mow unique table's preagg should work like duplicate table (#17028)
This commit is contained in:
@ -2031,4 +2031,9 @@ public class OlapTable extends Table {
|
||||
return idToPartition.keySet();
|
||||
}
|
||||
|
||||
public boolean isDupKeysOrMergeOnWrite() {
|
||||
return getKeysType() == KeysType.DUP_KEYS
|
||||
|| (getKeysType() == KeysType.UNIQUE_KEYS
|
||||
&& getEnableUniqueKeyMergeOnWrite());
|
||||
}
|
||||
}
|
||||
|
||||
@ -487,84 +487,84 @@ public class SelectMaterializedIndexWithAggregate extends AbstractSelectMaterial
|
||||
Preconditions.checkArgument(scan.getOutputSet().containsAll(nonVirtualRequiredScanOutput),
|
||||
String.format("Scan's output (%s) should contains all the input required scan output (%s).",
|
||||
scan.getOutput(), nonVirtualRequiredScanOutput));
|
||||
|
||||
OlapTable table = scan.getTable();
|
||||
|
||||
switch (table.getKeysType()) {
|
||||
switch (scan.getTable().getKeysType()) {
|
||||
case AGG_KEYS:
|
||||
case UNIQUE_KEYS: {
|
||||
final PreAggStatus preAggStatus;
|
||||
if (preAggEnabledByHint(scan)) {
|
||||
// PreAggStatus could be enabled by pre-aggregation hint for agg-keys and unique-keys.
|
||||
preAggStatus = PreAggStatus.on();
|
||||
} else {
|
||||
// Only checking pre-aggregation status by base index is enough for aggregate-keys and
|
||||
// unique-keys OLAP table.
|
||||
// Because the schemas in non-base materialized index are subsets of the schema of base index.
|
||||
preAggStatus = checkPreAggStatus(scan, table.getBaseIndexId(), predicates,
|
||||
aggregateFunctions, groupingExprs);
|
||||
}
|
||||
if (preAggStatus.isOff()) {
|
||||
// return early if pre agg status if off.
|
||||
return new SelectResult(preAggStatus, scan.getTable().getBaseIndexId(), new ExprRewriteMap());
|
||||
} else {
|
||||
List<MaterializedIndex> rollupsWithAllRequiredCols = table.getVisibleIndex().stream()
|
||||
.filter(index -> containAllRequiredColumns(index, scan, nonVirtualRequiredScanOutput))
|
||||
.collect(Collectors.toList());
|
||||
return new SelectResult(preAggStatus, selectBestIndex(rollupsWithAllRequiredCols, scan, predicates),
|
||||
new ExprRewriteMap());
|
||||
}
|
||||
}
|
||||
case DUP_KEYS: {
|
||||
Map<Boolean, List<MaterializedIndex>> indexesGroupByIsBaseOrNot = table.getVisibleIndex()
|
||||
.stream()
|
||||
.collect(Collectors.groupingBy(index -> index.getId() == table.getBaseIndexId()));
|
||||
|
||||
// Duplicate-keys table could use base index and indexes that pre-aggregation status is on.
|
||||
Set<MaterializedIndex> candidatesWithoutRewriting = Stream.concat(
|
||||
indexesGroupByIsBaseOrNot.get(true).stream(),
|
||||
indexesGroupByIsBaseOrNot.getOrDefault(false, ImmutableList.of())
|
||||
.stream()
|
||||
.filter(index -> checkPreAggStatus(scan, index.getId(), predicates,
|
||||
aggregateFunctions, groupingExprs).isOn())
|
||||
).collect(ImmutableSet.toImmutableSet());
|
||||
|
||||
// try to rewrite bitmap, hll by materialized index columns.
|
||||
List<AggRewriteResult> candidatesWithRewriting = indexesGroupByIsBaseOrNot.getOrDefault(false,
|
||||
ImmutableList.of())
|
||||
.stream()
|
||||
.filter(index -> !candidatesWithoutRewriting.contains(index))
|
||||
.map(index -> rewriteAgg(index, scan, nonVirtualRequiredScanOutput, predicates,
|
||||
aggregateFunctions,
|
||||
groupingExprs))
|
||||
.filter(aggRewriteResult -> checkPreAggStatus(scan, aggRewriteResult.index.getId(),
|
||||
predicates,
|
||||
// check pre-agg status of aggregate function that couldn't rewrite.
|
||||
aggFuncsDiff(aggregateFunctions, aggRewriteResult),
|
||||
groupingExprs).isOn())
|
||||
.filter(result -> result.success)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<MaterializedIndex> haveAllRequiredColumns = Streams.concat(
|
||||
candidatesWithoutRewriting.stream()
|
||||
.filter(index -> containAllRequiredColumns(index, scan, nonVirtualRequiredScanOutput)),
|
||||
candidatesWithRewriting
|
||||
.stream()
|
||||
.filter(aggRewriteResult -> containAllRequiredColumns(aggRewriteResult.index, scan,
|
||||
aggRewriteResult.requiredScanOutput))
|
||||
.map(aggRewriteResult -> aggRewriteResult.index)
|
||||
).collect(Collectors.toList());
|
||||
|
||||
long selectIndexId = selectBestIndex(haveAllRequiredColumns, scan, predicates);
|
||||
Optional<AggRewriteResult> rewriteResultOpt = candidatesWithRewriting.stream()
|
||||
.filter(aggRewriteResult -> aggRewriteResult.index.getId() == selectIndexId)
|
||||
.findAny();
|
||||
// Pre-aggregation is set to `on` by default for duplicate-keys table.
|
||||
return new SelectResult(PreAggStatus.on(), selectIndexId,
|
||||
rewriteResultOpt.map(r -> r.exprRewriteMap).orElse(new ExprRewriteMap()));
|
||||
}
|
||||
case UNIQUE_KEYS:
|
||||
case DUP_KEYS:
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Not supported keys type: " + table.getKeysType());
|
||||
throw new RuntimeException("Not supported keys type: " + scan.getTable().getKeysType());
|
||||
}
|
||||
if (table.isDupKeysOrMergeOnWrite()) {
|
||||
Map<Boolean, List<MaterializedIndex>> indexesGroupByIsBaseOrNot = table.getVisibleIndex()
|
||||
.stream()
|
||||
.collect(Collectors.groupingBy(index -> index.getId() == table.getBaseIndexId()));
|
||||
|
||||
// Duplicate-keys table could use base index and indexes that pre-aggregation status is on.
|
||||
Set<MaterializedIndex> candidatesWithoutRewriting = Stream.concat(
|
||||
indexesGroupByIsBaseOrNot.get(true).stream(),
|
||||
indexesGroupByIsBaseOrNot.getOrDefault(false, ImmutableList.of())
|
||||
.stream()
|
||||
.filter(index -> checkPreAggStatus(scan, index.getId(), predicates,
|
||||
aggregateFunctions, groupingExprs).isOn())
|
||||
).collect(ImmutableSet.toImmutableSet());
|
||||
|
||||
// try to rewrite bitmap, hll by materialized index columns.
|
||||
List<AggRewriteResult> candidatesWithRewriting = indexesGroupByIsBaseOrNot.getOrDefault(false,
|
||||
ImmutableList.of())
|
||||
.stream()
|
||||
.filter(index -> !candidatesWithoutRewriting.contains(index))
|
||||
.map(index -> rewriteAgg(index, scan, nonVirtualRequiredScanOutput, predicates,
|
||||
aggregateFunctions,
|
||||
groupingExprs))
|
||||
.filter(aggRewriteResult -> checkPreAggStatus(scan, aggRewriteResult.index.getId(),
|
||||
predicates,
|
||||
// check pre-agg status of aggregate function that couldn't rewrite.
|
||||
aggFuncsDiff(aggregateFunctions, aggRewriteResult),
|
||||
groupingExprs).isOn())
|
||||
.filter(result -> result.success)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<MaterializedIndex> haveAllRequiredColumns = Streams.concat(
|
||||
candidatesWithoutRewriting.stream()
|
||||
.filter(index -> containAllRequiredColumns(index, scan, nonVirtualRequiredScanOutput)),
|
||||
candidatesWithRewriting
|
||||
.stream()
|
||||
.filter(aggRewriteResult -> containAllRequiredColumns(aggRewriteResult.index, scan,
|
||||
aggRewriteResult.requiredScanOutput))
|
||||
.map(aggRewriteResult -> aggRewriteResult.index)
|
||||
).collect(Collectors.toList());
|
||||
|
||||
long selectIndexId = selectBestIndex(haveAllRequiredColumns, scan, predicates);
|
||||
Optional<AggRewriteResult> rewriteResultOpt = candidatesWithRewriting.stream()
|
||||
.filter(aggRewriteResult -> aggRewriteResult.index.getId() == selectIndexId)
|
||||
.findAny();
|
||||
// Pre-aggregation is set to `on` by default for duplicate-keys table.
|
||||
return new SelectResult(PreAggStatus.on(), selectIndexId,
|
||||
rewriteResultOpt.map(r -> r.exprRewriteMap).orElse(new ExprRewriteMap()));
|
||||
} else {
|
||||
final PreAggStatus preAggStatus;
|
||||
if (preAggEnabledByHint(scan)) {
|
||||
// PreAggStatus could be enabled by pre-aggregation hint for agg-keys and unique-keys.
|
||||
preAggStatus = PreAggStatus.on();
|
||||
} else {
|
||||
// Only checking pre-aggregation status by base index is enough for aggregate-keys and
|
||||
// unique-keys OLAP table.
|
||||
// Because the schemas in non-base materialized index are subsets of the schema of base index.
|
||||
preAggStatus = checkPreAggStatus(scan, table.getBaseIndexId(), predicates,
|
||||
aggregateFunctions, groupingExprs);
|
||||
}
|
||||
if (preAggStatus.isOff()) {
|
||||
// return early if pre agg status if off.
|
||||
return new SelectResult(preAggStatus, scan.getTable().getBaseIndexId(), new ExprRewriteMap());
|
||||
} else {
|
||||
List<MaterializedIndex> rollupsWithAllRequiredCols = table.getVisibleIndex().stream()
|
||||
.filter(index -> containAllRequiredColumns(index, scan, nonVirtualRequiredScanOutput))
|
||||
.collect(Collectors.toList());
|
||||
return new SelectResult(preAggStatus, selectBestIndex(rollupsWithAllRequiredCols, scan, predicates),
|
||||
new ExprRewriteMap());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -117,43 +117,47 @@ public class SelectMaterializedIndexWithoutAggregate extends AbstractSelectMater
|
||||
switch (scan.getTable().getKeysType()) {
|
||||
case AGG_KEYS:
|
||||
case UNIQUE_KEYS:
|
||||
OlapTable table = scan.getTable();
|
||||
long baseIndexId = table.getBaseIndexId();
|
||||
int baseIndexKeySize = table.getKeyColumnsByIndexId(table.getBaseIndexId()).size();
|
||||
// No aggregate on scan.
|
||||
// So only base index and indexes that have all the keys could be used.
|
||||
List<MaterializedIndex> candidates = table.getVisibleIndex().stream()
|
||||
.filter(index -> index.getId() == baseIndexId
|
||||
|| table.getKeyColumnsByIndexId(index.getId()).size() == baseIndexKeySize)
|
||||
.filter(index -> containAllRequiredColumns(index, scan, requiredScanOutputSupplier.get()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
final PreAggStatus preAggStatus;
|
||||
if (preAggEnabledByHint(scan)) {
|
||||
// PreAggStatus could be enabled by pre-aggregation hint for agg-keys and unique-keys.
|
||||
preAggStatus = PreAggStatus.on();
|
||||
} else {
|
||||
preAggStatus = PreAggStatus.off("No aggregate on scan.");
|
||||
}
|
||||
if (candidates.size() == 1) {
|
||||
// `candidates` only have base index.
|
||||
return scan.withMaterializedIndexSelected(preAggStatus, baseIndexId);
|
||||
} else {
|
||||
return scan.withMaterializedIndexSelected(preAggStatus,
|
||||
selectBestIndex(candidates, scan, predicatesSupplier.get()));
|
||||
}
|
||||
case DUP_KEYS:
|
||||
// Set pre-aggregation to `on` to keep consistency with legacy logic.
|
||||
List<MaterializedIndex> candidate = scan.getTable().getVisibleIndex().stream()
|
||||
.filter(index -> !indexHasAggregate(index, scan))
|
||||
.filter(index -> containAllRequiredColumns(index, scan,
|
||||
requiredScanOutputSupplier.get()))
|
||||
.collect(Collectors.toList());
|
||||
return scan.withMaterializedIndexSelected(PreAggStatus.on(),
|
||||
selectBestIndex(candidate, scan, predicatesSupplier.get()));
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Not supported keys type: " + scan.getTable().getKeysType());
|
||||
}
|
||||
if (scan.getTable().isDupKeysOrMergeOnWrite()) {
|
||||
// Set pre-aggregation to `on` to keep consistency with legacy logic.
|
||||
List<MaterializedIndex> candidate = scan.getTable().getVisibleIndex().stream()
|
||||
.filter(index -> !indexHasAggregate(index, scan))
|
||||
.filter(index -> containAllRequiredColumns(index, scan,
|
||||
requiredScanOutputSupplier.get()))
|
||||
.collect(Collectors.toList());
|
||||
return scan.withMaterializedIndexSelected(PreAggStatus.on(),
|
||||
selectBestIndex(candidate, scan, predicatesSupplier.get()));
|
||||
} else {
|
||||
OlapTable table = scan.getTable();
|
||||
long baseIndexId = table.getBaseIndexId();
|
||||
int baseIndexKeySize = table.getKeyColumnsByIndexId(table.getBaseIndexId()).size();
|
||||
// No aggregate on scan.
|
||||
// So only base index and indexes that have all the keys could be used.
|
||||
List<MaterializedIndex> candidates = table.getVisibleIndex().stream()
|
||||
.filter(index -> index.getId() == baseIndexId
|
||||
|| table.getKeyColumnsByIndexId(index.getId()).size() == baseIndexKeySize)
|
||||
.filter(index -> containAllRequiredColumns(index, scan, requiredScanOutputSupplier.get()))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
final PreAggStatus preAggStatus;
|
||||
if (preAggEnabledByHint(scan)) {
|
||||
// PreAggStatus could be enabled by pre-aggregation hint for agg-keys and unique-keys.
|
||||
preAggStatus = PreAggStatus.on();
|
||||
} else {
|
||||
preAggStatus = PreAggStatus.off("No aggregate on scan.");
|
||||
}
|
||||
if (candidates.size() == 1) {
|
||||
// `candidates` only have base index.
|
||||
return scan.withMaterializedIndexSelected(preAggStatus, baseIndexId);
|
||||
} else {
|
||||
return scan.withMaterializedIndexSelected(preAggStatus,
|
||||
selectBestIndex(candidates, scan, predicatesSupplier.get()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean indexHasAggregate(MaterializedIndex index, LogicalOlapScan scan) {
|
||||
|
||||
@ -469,13 +469,7 @@ public class OlapScanNode extends ScanNode {
|
||||
}
|
||||
|
||||
public boolean isDupKeysOrMergeOnWrite() {
|
||||
if (olapTable.getKeysType() == KeysType.DUP_KEYS
|
||||
|| (olapTable.getKeysType() == KeysType.UNIQUE_KEYS
|
||||
&& olapTable.getEnableUniqueKeyMergeOnWrite())) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
return olapTable.isDupKeysOrMergeOnWrite();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user