[feature](Nereids) support push down no group agg to olap scan (#14683)

use zonemap to do aggregate without group by exprs.
valid aggregate function:
- count
- min
- max

implementation in legacy planner: #12881
This commit is contained in:
morrySnow
2022-12-08 15:34:39 +08:00
committed by GitHub
parent 2fb896d916
commit 1887881a61
16 changed files with 506 additions and 44 deletions

View File

@ -378,6 +378,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
tupleDescriptor.setRef(tableRef);
olapScanNode.setSelectedPartitionIds(olapScan.getSelectedPartitionIds());
olapScanNode.setSampleTabletIds(olapScan.getSelectedTabletIds());
olapScanNode.setPushDownAggNoGrouping(olapScan.getPushDownAggOperator().toThrift());
switch (olapScan.getTable().getKeysType()) {
case AGG_KEYS:

View File

@ -39,6 +39,7 @@ import org.apache.doris.nereids.rules.rewrite.logical.MergeProjects;
import org.apache.doris.nereids.rules.rewrite.logical.NormalizeAggregate;
import org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanPartition;
import org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanTablet;
import org.apache.doris.nereids.rules.rewrite.logical.PushAggregateToOlapScan;
import org.apache.doris.nereids.rules.rewrite.logical.PushFilterInsideJoin;
import org.apache.doris.nereids.rules.rewrite.logical.ReorderJoin;
@ -94,6 +95,7 @@ public class NereidsRewriteJobExecutor extends BatchRulesJob {
// to avoid two consecutive same project appear when we do optimization.
.add(topDownBatch(ImmutableList.of(new EliminateGroupByConstant())))
.add(topDownBatch(ImmutableList.of(new EliminateUnnecessaryProject())))
.add(topDownBatch(ImmutableList.of(new PushAggregateToOlapScan())))
// this rule batch must keep at the end of rewrite to do some plan check
.add(bottomUpBatch(ImmutableList.of(new CheckAfterRewrite())))
.build();

View File

@ -104,7 +104,7 @@ public enum RuleType {
PUSHDOWN_FILTER_THROUGH_LEFT_SEMI_JOIN(RuleTypeClass.REWRITE),
PUSH_FILTER_INSIDE_JOIN(RuleTypeClass.REWRITE),
PUSHDOWN_FILTER_THROUGH_PROJECT(RuleTypeClass.REWRITE),
PUSHDOWN_PROJECT_THROUGHT_LIMIT(RuleTypeClass.REWRITE),
PUSHDOWN_PROJECT_THROUGH_LIMIT(RuleTypeClass.REWRITE),
// column prune rules,
COLUMN_PRUNE_AGGREGATION_CHILD(RuleTypeClass.REWRITE),
COLUMN_PRUNE_FILTER_CHILD(RuleTypeClass.REWRITE),
@ -139,8 +139,8 @@ public enum RuleType {
MATERIALIZED_INDEX_FILTER_PROJECT_SCAN(RuleTypeClass.REWRITE),
OLAP_SCAN_PARTITION_PRUNE(RuleTypeClass.REWRITE),
OLAP_SCAN_TABLET_PRUNE(RuleTypeClass.REWRITE),
PUSH_AGGREGATE_TO_OLAP_SCAN(RuleTypeClass.REWRITE),
EXTRACT_SINGLE_TABLE_EXPRESSION_FROM_DISJUNCTION(RuleTypeClass.REWRITE),
REWRITE_SENTINEL(RuleTypeClass.REWRITE),
// limit push down
PUSH_LIMIT_THROUGH_JOIN(RuleTypeClass.REWRITE),

View File

@ -49,10 +49,11 @@ public class LogicalOlapScanToPhysicalOlapScan extends OneImplementationRuleFact
olapScan.getTable(),
olapScan.getQualifier(),
olapScan.getSelectedIndexId(),
olapScan.getSelectedTabletId(),
olapScan.getSelectedTabletIds(),
olapScan.getSelectedPartitionIds(),
convertDistribution(olapScan),
olapScan.getPreAggStatus(),
olapScan.getPushDownAggOperator(),
Optional.empty(),
olapScan.getLogicalProperties())
).toRule(RuleType.LOGICAL_OLAP_SCAN_TO_PHYSICAL_OLAP_SCAN_RULE);

View File

@ -0,0 +1,197 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.rewrite.logical;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
import org.apache.doris.nereids.trees.expressions.functions.agg.Max;
import org.apache.doris.nereids.trees.expressions.functions.agg.Min;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PushDownAggOperator;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.types.ArrayType;
import org.apache.doris.nereids.types.BitmapType;
import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.HllType;
import org.apache.doris.nereids.types.StringType;
import org.apache.doris.nereids.types.coercion.CharacterType;
import org.apache.doris.nereids.types.coercion.NumericType;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* push aggregate without group by exprs to olap scan.
*/
public class PushAggregateToOlapScan implements RewriteRuleFactory {
@Override
public List<Rule> buildRules() {
return ImmutableList.of(
logicalAggregate(logicalOlapScan())
.when(aggregate -> check(aggregate, aggregate.child()))
.then(aggregate -> {
LogicalOlapScan olapScan = aggregate.child();
Map<Slot, Slot> projections = Maps.newHashMap();
olapScan.getOutput().forEach(s -> projections.put(s, s));
LogicalOlapScan pushed = pushAggregateToOlapScan(aggregate, olapScan, projections);
if (pushed == olapScan) {
return aggregate;
} else {
return aggregate.withChildren(pushed);
}
})
.toRule(RuleType.PUSH_AGGREGATE_TO_OLAP_SCAN),
logicalAggregate(logicalProject(logicalOlapScan()))
.when(aggregate -> check(aggregate, aggregate.child().child()))
.then(aggregate -> {
LogicalProject<LogicalOlapScan> project = aggregate.child();
LogicalOlapScan olapScan = project.child();
Map<Slot, Slot> projections = Maps.newHashMap();
olapScan.getOutput().forEach(s -> projections.put(s, s));
project.getProjects().stream()
.filter(Alias.class::isInstance)
.map(Alias.class::cast)
.filter(alias -> alias.child() instanceof Slot)
.forEach(alias -> projections.put(alias.toSlot(), (Slot) alias.child()));
LogicalOlapScan pushed = pushAggregateToOlapScan(aggregate, olapScan, projections);
if (pushed == olapScan) {
return aggregate;
} else {
return aggregate.withChildren(project.withChildren(pushed));
}
})
.toRule(RuleType.PUSH_AGGREGATE_TO_OLAP_SCAN)
);
}
private boolean check(LogicalAggregate<? extends Plan> aggregate, LogicalOlapScan olapScan) {
// session variables
if (ConnectContext.get() != null && !ConnectContext.get().getSessionVariable().enablePushDownNoGroupAgg()) {
return false;
}
// olap scan
if (olapScan.isAggPushed()) {
return false;
}
KeysType keysType = olapScan.getTable().getKeysType();
if (keysType == KeysType.UNIQUE_KEYS || keysType == KeysType.PRIMARY_KEYS) {
return false;
}
// aggregate
if (!aggregate.getGroupByExpressions().isEmpty()) {
return false;
}
List<AggregateFunction> aggregateFunctions = aggregate.getOutputExpressions().stream()
.<Set<AggregateFunction>>map(e -> e.collect(AggregateFunction.class::isInstance))
.flatMap(Set::stream).collect(Collectors.toList());
if (aggregateFunctions.stream().anyMatch(af -> af.arity() > 1)) {
return false;
}
if (!aggregateFunctions.stream()
.allMatch(af -> af instanceof Count || af instanceof Min || af instanceof Max)) {
return false;
}
// both
if (aggregateFunctions.stream().anyMatch(Count.class::isInstance) && keysType != KeysType.DUP_KEYS) {
return false;
}
return true;
}
private LogicalOlapScan pushAggregateToOlapScan(
LogicalAggregate<? extends Plan> aggregate,
LogicalOlapScan olapScan,
Map<Slot, Slot> projections) {
List<AggregateFunction> aggregateFunctions = aggregate.getOutputExpressions().stream()
.<Set<AggregateFunction>>map(e -> e.collect(AggregateFunction.class::isInstance))
.flatMap(Set::stream).collect(Collectors.toList());
PushDownAggOperator pushDownAggOperator = olapScan.getPushDownAggOperator();
for (AggregateFunction aggregateFunction : aggregateFunctions) {
pushDownAggOperator = pushDownAggOperator.merge(aggregateFunction.getName());
if (aggregateFunction.arity() == 0) {
continue;
}
Expression child = aggregateFunction.child(0);
Slot slot;
if (child instanceof Slot) {
slot = (Slot) child;
} else if (child instanceof Cast && child.child(0) instanceof SlotReference) {
if (child.getDataType() instanceof NumericType
&& child.child(0).getDataType() instanceof NumericType) {
slot = (Slot) child.child(0);
} else {
return olapScan;
}
} else {
return olapScan;
}
// replace by SlotReference in olap table. check no complex project on this SlotReference.
if (!projections.containsKey(slot)) {
return olapScan;
}
slot = projections.get(slot);
DataType dataType = slot.getDataType();
if (pushDownAggOperator.containsMinMax()) {
if (dataType instanceof ArrayType
|| dataType instanceof HllType
|| dataType instanceof BitmapType
|| dataType instanceof StringType) {
return olapScan;
}
}
// The zone map max length of CharFamily is 512, do not
// over the length: https://github.com/apache/doris/pull/6293
if (dataType instanceof CharacterType
&& (((CharacterType) dataType).getLen() > 512 || ((CharacterType) dataType).getLen() < 0)) {
return olapScan;
}
if (pushDownAggOperator.containsCount() && slot.nullable()) {
return olapScan;
}
}
return olapScan.withPushDownAggregateOperator(pushDownAggOperator);
}
}

View File

@ -54,6 +54,6 @@ public class PushdownProjectThroughLimit extends OneRewriteRuleFactory {
return new LogicalLimit<LogicalProject<GroupPlan>>(logicalLimit.getLimit(),
logicalLimit.getOffset(), new LogicalProject<>(logicalProject.getProjects(),
logicalLimit.child()));
}).toRule(RuleType.PUSHDOWN_PROJECT_THROUGHT_LIMIT);
}).toRule(RuleType.PUSHDOWN_PROJECT_THROUGH_LIMIT);
}
}

View File

@ -0,0 +1,70 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans;
import org.apache.doris.thrift.TPushAggOp;
/**
* use for push down agg without group by exprs to olap scan.
*/
public class PushDownAggOperator {
public static PushDownAggOperator NONE = new PushDownAggOperator(TPushAggOp.NONE);
public static PushDownAggOperator MIN_MAX = new PushDownAggOperator(TPushAggOp.MINMAX);
public static PushDownAggOperator COUNT = new PushDownAggOperator(TPushAggOp.COUNT);
public static PushDownAggOperator MIX = new PushDownAggOperator(TPushAggOp.MIX);
private final TPushAggOp thriftOperator;
private PushDownAggOperator(TPushAggOp thriftOperator) {
this.thriftOperator = thriftOperator;
}
/**
* merge operator.
*/
public PushDownAggOperator merge(String functionName) {
PushDownAggOperator newOne;
if ("COUNT".equalsIgnoreCase(functionName)) {
newOne = COUNT;
} else {
newOne = MIN_MAX;
}
if (this == NONE || this == newOne) {
return newOne;
} else {
return MIX;
}
}
public TPushAggOp toThrift() {
return thriftOperator;
}
public boolean containsMinMax() {
return this == MIN_MAX || this == MIX;
}
public boolean containsCount() {
return this == COUNT || this == MIX;
}
@Override
public String toString() {
return thriftOperator.toString();
}
}

View File

@ -27,6 +27,7 @@ import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.PushDownAggOperator;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
@ -47,15 +48,18 @@ import java.util.Optional;
public class LogicalOlapScan extends LogicalRelation implements CatalogRelation {
private final long selectedIndexId;
private final ImmutableList<Long> selectedTabletId;
private final List<Long> selectedTabletIds;
private final boolean partitionPruned;
private final boolean tabletPruned;
private final ImmutableList<Long> candidateIndexIds;
private final List<Long> candidateIndexIds;
private final boolean indexSelected;
private final PreAggStatus preAggStatus;
private final boolean aggPushed;
private final PushDownAggOperator pushDownAggOperator;
public LogicalOlapScan(RelationId id, OlapTable table) {
this(id, table, ImmutableList.of());
}
@ -63,13 +67,13 @@ public class LogicalOlapScan extends LogicalRelation implements CatalogRelation
public LogicalOlapScan(RelationId id, OlapTable table, List<String> qualifier) {
this(id, table, qualifier, Optional.empty(), Optional.empty(),
table.getPartitionIds(), false, ImmutableList.of(), false,
ImmutableList.of(), false, PreAggStatus.on());
ImmutableList.of(), false, PreAggStatus.on(), false, PushDownAggOperator.NONE);
}
public LogicalOlapScan(RelationId id, Table table, List<String> qualifier) {
this(id, table, qualifier, Optional.empty(), Optional.empty(),
((OlapTable) table).getPartitionIds(), false, ImmutableList.of(), false,
ImmutableList.of(), false, PreAggStatus.on());
ImmutableList.of(), false, PreAggStatus.on(), false, PushDownAggOperator.NONE);
}
/**
@ -78,20 +82,23 @@ public class LogicalOlapScan extends LogicalRelation implements CatalogRelation
public LogicalOlapScan(RelationId id, Table table, List<String> qualifier,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
List<Long> selectedPartitionIds, boolean partitionPruned,
ImmutableList<Long> selectedTabletIds, boolean tabletPruned,
List<Long> candidateIndexIds, boolean indexSelected, PreAggStatus preAggStatus) {
List<Long> selectedTabletIds, boolean tabletPruned,
List<Long> candidateIndexIds, boolean indexSelected, PreAggStatus preAggStatus,
boolean aggPushed, PushDownAggOperator pushDownAggOperator) {
super(id, PlanType.LOGICAL_OLAP_SCAN, table, qualifier,
groupExpression, logicalProperties, selectedPartitionIds);
// TODO: use CBO manner to select best index id, according to index's statistics info,
// revisit this after rollup and materialized view selection are fully supported.
this.selectedIndexId = CollectionUtils.isEmpty(candidateIndexIds)
? getTable().getBaseIndexId() : candidateIndexIds.get(0);
this.selectedTabletId = selectedTabletIds;
this.selectedTabletIds = ImmutableList.copyOf(selectedTabletIds);
this.partitionPruned = partitionPruned;
this.tabletPruned = tabletPruned;
this.candidateIndexIds = ImmutableList.copyOf(candidateIndexIds);
this.indexSelected = indexSelected;
this.preAggStatus = preAggStatus;
this.aggPushed = aggPushed;
this.pushDownAggOperator = pushDownAggOperator;
}
@Override
@ -114,7 +121,8 @@ public class LogicalOlapScan extends LogicalRelation implements CatalogRelation
"output", getOutput(),
"candidateIndexIds", candidateIndexIds,
"selectedIndexId", selectedIndexId,
"preAgg", preAggStatus
"preAgg", preAggStatus,
"pushAgg", pushDownAggOperator
);
}
@ -128,44 +136,51 @@ public class LogicalOlapScan extends LogicalRelation implements CatalogRelation
}
return Objects.equals(selectedPartitionIds, ((LogicalOlapScan) o).selectedPartitionIds)
&& Objects.equals(candidateIndexIds, ((LogicalOlapScan) o).candidateIndexIds)
&& Objects.equals(selectedTabletId, ((LogicalOlapScan) o).selectedTabletId);
&& Objects.equals(selectedTabletIds, ((LogicalOlapScan) o).selectedTabletIds)
&& Objects.equals(pushDownAggOperator, ((LogicalOlapScan) o).pushDownAggOperator);
}
@Override
public int hashCode() {
return Objects.hash(id, selectedPartitionIds, candidateIndexIds, selectedTabletId);
return Objects.hash(id, selectedPartitionIds, candidateIndexIds, selectedTabletIds, pushDownAggOperator);
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalOlapScan(id, table, qualifier, groupExpression, Optional.of(getLogicalProperties()),
selectedPartitionIds, partitionPruned, selectedTabletId, tabletPruned,
candidateIndexIds, indexSelected, preAggStatus);
selectedPartitionIds, partitionPruned, selectedTabletIds, tabletPruned,
candidateIndexIds, indexSelected, preAggStatus, aggPushed, pushDownAggOperator);
}
@Override
public LogicalOlapScan withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
return new LogicalOlapScan(id, table, qualifier, Optional.empty(), logicalProperties,
selectedPartitionIds, partitionPruned, selectedTabletId, tabletPruned,
candidateIndexIds, indexSelected, preAggStatus);
selectedPartitionIds, partitionPruned, selectedTabletIds, tabletPruned,
candidateIndexIds, indexSelected, preAggStatus, aggPushed, pushDownAggOperator);
}
public LogicalOlapScan withSelectedPartitionIds(List<Long> selectedPartitionIds) {
return new LogicalOlapScan(id, table, qualifier, Optional.empty(), Optional.of(getLogicalProperties()),
selectedPartitionIds, true, selectedTabletId, tabletPruned,
candidateIndexIds, indexSelected, preAggStatus);
selectedPartitionIds, true, selectedTabletIds, tabletPruned,
candidateIndexIds, indexSelected, preAggStatus, aggPushed, pushDownAggOperator);
}
public LogicalOlapScan withMaterializedIndexSelected(PreAggStatus preAgg, List<Long> candidateIndexIds) {
return new LogicalOlapScan(id, table, qualifier, Optional.empty(), Optional.of(getLogicalProperties()),
selectedPartitionIds, partitionPruned, selectedTabletId, tabletPruned,
candidateIndexIds, true, preAgg);
selectedPartitionIds, partitionPruned, selectedTabletIds, tabletPruned,
candidateIndexIds, true, preAgg, aggPushed, pushDownAggOperator);
}
public LogicalOlapScan withSelectedTabletIds(ImmutableList<Long> selectedTabletIds) {
public LogicalOlapScan withSelectedTabletIds(List<Long> selectedTabletIds) {
return new LogicalOlapScan(id, table, qualifier, Optional.empty(), Optional.of(getLogicalProperties()),
selectedPartitionIds, partitionPruned, selectedTabletIds, true,
candidateIndexIds, indexSelected, preAggStatus);
candidateIndexIds, indexSelected, preAggStatus, aggPushed, pushDownAggOperator);
}
public LogicalOlapScan withPushDownAggregateOperator(PushDownAggOperator pushDownAggOperator) {
return new LogicalOlapScan(id, table, qualifier, Optional.empty(), Optional.of(getLogicalProperties()),
selectedPartitionIds, partitionPruned, selectedTabletIds, true,
candidateIndexIds, indexSelected, preAggStatus, true, pushDownAggOperator);
}
@Override
@ -181,8 +196,8 @@ public class LogicalOlapScan extends LogicalRelation implements CatalogRelation
return tabletPruned;
}
public List<Long> getSelectedTabletId() {
return selectedTabletId;
public List<Long> getSelectedTabletIds() {
return selectedTabletIds;
}
public long getSelectedIndexId() {
@ -197,6 +212,14 @@ public class LogicalOlapScan extends LogicalRelation implements CatalogRelation
return preAggStatus;
}
public boolean isAggPushed() {
return aggPushed;
}
public PushDownAggOperator getPushDownAggOperator() {
return pushDownAggOperator;
}
@VisibleForTesting
public Optional<String> getSelectedMaterializedIndexName() {
return indexSelected ? Optional.ofNullable(((OlapTable) table).getIndexNameById(selectedIndexId))

View File

@ -24,6 +24,7 @@ import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.PushDownAggOperator;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
@ -45,13 +46,15 @@ public class PhysicalOlapScan extends PhysicalRelation {
private final ImmutableList<Long> selectedTabletIds;
private final ImmutableList<Long> selectedPartitionIds;
private final PreAggStatus preAggStatus;
private final PushDownAggOperator pushDownAggOperator;
/**
* Constructor for PhysicalOlapScan.
*/
public PhysicalOlapScan(RelationId id, OlapTable olapTable, List<String> qualifier, long selectedIndexId,
List<Long> selectedTabletIds, List<Long> selectedPartitionIds, DistributionSpec distributionSpec,
PreAggStatus preAggStatus, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties) {
PreAggStatus preAggStatus, PushDownAggOperator pushDownAggOperator,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties) {
super(id, PlanType.PHYSICAL_OLAP_SCAN, qualifier, groupExpression, logicalProperties);
this.olapTable = olapTable;
this.selectedIndexId = selectedIndexId;
@ -59,6 +62,7 @@ public class PhysicalOlapScan extends PhysicalRelation {
this.selectedPartitionIds = ImmutableList.copyOf(selectedPartitionIds);
this.distributionSpec = distributionSpec;
this.preAggStatus = preAggStatus;
this.pushDownAggOperator = pushDownAggOperator;
}
/**
@ -66,7 +70,8 @@ public class PhysicalOlapScan extends PhysicalRelation {
*/
public PhysicalOlapScan(RelationId id, OlapTable olapTable, List<String> qualifier, long selectedIndexId,
List<Long> selectedTabletIds, List<Long> selectedPartitionIds, DistributionSpec distributionSpec,
PreAggStatus preAggStatus, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
PreAggStatus preAggStatus, PushDownAggOperator pushDownAggOperator,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
PhysicalProperties physicalProperties, StatsDeriveResult statsDeriveResult) {
super(id, PlanType.PHYSICAL_OLAP_SCAN, qualifier, groupExpression, logicalProperties, physicalProperties,
statsDeriveResult);
@ -76,6 +81,7 @@ public class PhysicalOlapScan extends PhysicalRelation {
this.selectedPartitionIds = ImmutableList.copyOf(selectedPartitionIds);
this.distributionSpec = distributionSpec;
this.preAggStatus = preAggStatus;
this.pushDownAggOperator = pushDownAggOperator;
}
public long getSelectedIndexId() {
@ -103,6 +109,10 @@ public class PhysicalOlapScan extends PhysicalRelation {
return preAggStatus;
}
public PushDownAggOperator getPushDownAggOperator() {
return pushDownAggOperator;
}
@Override
public String toString() {
return Utils.toSqlString("PhysicalOlapScan",
@ -140,20 +150,23 @@ public class PhysicalOlapScan extends PhysicalRelation {
@Override
public PhysicalOlapScan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalOlapScan(id, olapTable, qualifier, selectedIndexId, selectedTabletIds,
selectedPartitionIds, distributionSpec, preAggStatus, groupExpression, getLogicalProperties());
selectedPartitionIds, distributionSpec, preAggStatus, pushDownAggOperator,
groupExpression, getLogicalProperties());
}
@Override
public PhysicalOlapScan withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
return new PhysicalOlapScan(id, olapTable, qualifier, selectedIndexId, selectedTabletIds,
selectedPartitionIds, distributionSpec, preAggStatus, Optional.empty(), logicalProperties.get());
selectedPartitionIds, distributionSpec, preAggStatus, pushDownAggOperator,
Optional.empty(), logicalProperties.get());
}
@Override
public PhysicalOlapScan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
StatsDeriveResult statsDeriveResult) {
public PhysicalOlapScan withPhysicalPropertiesAndStats(
PhysicalProperties physicalProperties, StatsDeriveResult statsDeriveResult) {
return new PhysicalOlapScan(id, olapTable, qualifier, selectedIndexId, selectedTabletIds,
selectedPartitionIds, distributionSpec, preAggStatus, Optional.empty(), getLogicalProperties(),
selectedPartitionIds, distributionSpec, preAggStatus, pushDownAggOperator,
Optional.empty(), getLogicalProperties(),
physicalProperties, statsDeriveResult);
}
}

View File

@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.PushDownAggOperator;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
@ -63,7 +64,7 @@ public class PhysicalPlanTranslatorTest {
t1Output.add(col3);
LogicalProperties t1Properties = new LogicalProperties(() -> t1Output);
PhysicalOlapScan scan = new PhysicalOlapScan(RelationId.createGenerator().getNextId(), t1, qualifier, 0L,
Collections.emptyList(), Collections.emptyList(), null, PreAggStatus.on(),
Collections.emptyList(), Collections.emptyList(), null, PreAggStatus.on(), PushDownAggOperator.NONE,
Optional.empty(),
t1Properties);
Literal t1FilterRight = new IntegerLiteral(1);

View File

@ -27,6 +27,7 @@ import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
import org.apache.doris.nereids.trees.plans.PushDownAggOperator;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
@ -75,9 +76,8 @@ public class MergeProjectPostProcessTest {
t1Output.add(c);
LogicalProperties t1Properties = new LogicalProperties(() -> t1Output);
PhysicalOlapScan scan = new PhysicalOlapScan(RelationId.createGenerator().getNextId(), t1, qualifier, 0L,
Collections.emptyList(), Collections.emptyList(), null, PreAggStatus.on(),
Optional.empty(),
t1Properties);
Collections.emptyList(), Collections.emptyList(), null, PreAggStatus.on(), PushDownAggOperator.NONE,
Optional.empty(), t1Properties);
Alias x = new Alias(a, "x");
List<NamedExpression> projList3 = Lists.newArrayList(x, b, c);
PhysicalProject proj3 = new PhysicalProject(projList3, placeHolder, scan);

View File

@ -152,13 +152,13 @@ public class PruneOlapScanTabletTest {
LogicalFilter<LogicalOlapScan> filter = new LogicalFilter<>(expr,
new LogicalOlapScan(RelationId.createGenerator().getNextId(), olapTable));
Assertions.assertEquals(0, filter.child().getSelectedTabletId().size());
Assertions.assertEquals(0, filter.child().getSelectedTabletIds().size());
CascadesContext context = MemoTestUtils.createCascadesContext(filter);
context.topDownRewrite(ImmutableList.of(new PruneOlapScanTablet().build()));
LogicalFilter<LogicalOlapScan> filter1 = ((LogicalFilter<LogicalOlapScan>) context.getMemo().copyOut());
LogicalOlapScan olapScan = filter1.child();
Assertions.assertEquals(19, olapScan.getSelectedTabletId().size());
Assertions.assertEquals(19, olapScan.getSelectedTabletIds().size());
}
}

View File

@ -0,0 +1,154 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.rewrite.logical;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
import org.apache.doris.nereids.trees.expressions.functions.agg.Max;
import org.apache.doris.nereids.trees.expressions.functions.agg.Min;
import org.apache.doris.nereids.trees.expressions.functions.scalar.Ln;
import org.apache.doris.nereids.trees.plans.PushDownAggOperator;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.util.MemoTestUtils;
import org.apache.doris.nereids.util.PlanConstructor;
import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Collections;
public class PushAggregateToOlapScanTest {
@Test
public void testWithoutProject() {
LogicalOlapScan olapScan = PlanConstructor.newLogicalOlapScan(1, "tbl", 0);
LogicalAggregate<LogicalOlapScan> aggregate;
CascadesContext context;
LogicalOlapScan pushedOlapScan;
// min max
aggregate = new LogicalAggregate<>(
Collections.emptyList(),
ImmutableList.of(new Alias(new Min(olapScan.getOutput().get(0)), "min")),
olapScan);
context = MemoTestUtils.createCascadesContext(aggregate);
context.topDownRewrite(new PushAggregateToOlapScan());
pushedOlapScan = (LogicalOlapScan) (context.getMemo().copyOut().child(0));
Assertions.assertTrue(pushedOlapScan.isAggPushed());
Assertions.assertEquals(PushDownAggOperator.MIN_MAX, pushedOlapScan.getPushDownAggOperator());
// count
aggregate = new LogicalAggregate<>(
Collections.emptyList(),
ImmutableList.of(new Alias(new Count(olapScan.getOutput().get(0)), "count")),
olapScan);
context = MemoTestUtils.createCascadesContext(aggregate);
context.topDownRewrite(new PushAggregateToOlapScan());
pushedOlapScan = (LogicalOlapScan) (context.getMemo().copyOut().child(0));
Assertions.assertTrue(pushedOlapScan.isAggPushed());
Assertions.assertEquals(PushDownAggOperator.COUNT, pushedOlapScan.getPushDownAggOperator());
// mix
aggregate = new LogicalAggregate<>(
Collections.emptyList(),
ImmutableList.of(new Alias(new Count(olapScan.getOutput().get(0)), "count"),
new Alias(new Max(olapScan.getOutput().get(0)), "max")),
olapScan);
context = MemoTestUtils.createCascadesContext(aggregate);
context.topDownRewrite(new PushAggregateToOlapScan());
pushedOlapScan = (LogicalOlapScan) (context.getMemo().copyOut().child(0));
Assertions.assertTrue(pushedOlapScan.isAggPushed());
Assertions.assertEquals(PushDownAggOperator.MIX, pushedOlapScan.getPushDownAggOperator());
}
@Test
public void testWithProject() {
LogicalOlapScan olapScan = PlanConstructor.newLogicalOlapScan(1, "tbl", 0);
LogicalProject<LogicalOlapScan> project = new LogicalProject<>(
ImmutableList.of(olapScan.getOutput().get(0)), olapScan);
LogicalAggregate<LogicalProject<LogicalOlapScan>> aggregate;
CascadesContext context;
LogicalOlapScan pushedOlapScan;
// min max
aggregate = new LogicalAggregate<>(
Collections.emptyList(),
ImmutableList.of(new Alias(new Min(project.getOutput().get(0)), "min")),
project);
context = MemoTestUtils.createCascadesContext(aggregate);
context.topDownRewrite(new PushAggregateToOlapScan());
pushedOlapScan = (LogicalOlapScan) (context.getMemo().copyOut().child(0).child(0));
Assertions.assertTrue(pushedOlapScan.isAggPushed());
Assertions.assertEquals(PushDownAggOperator.MIN_MAX, pushedOlapScan.getPushDownAggOperator());
// count
aggregate = new LogicalAggregate<>(
Collections.emptyList(),
ImmutableList.of(new Alias(new Count(project.getOutput().get(0)), "count")),
project);
context = MemoTestUtils.createCascadesContext(aggregate);
context.topDownRewrite(new PushAggregateToOlapScan());
pushedOlapScan = (LogicalOlapScan) (context.getMemo().copyOut().child(0).child(0));
Assertions.assertTrue(pushedOlapScan.isAggPushed());
Assertions.assertEquals(PushDownAggOperator.COUNT, pushedOlapScan.getPushDownAggOperator());
// mix
aggregate = new LogicalAggregate<>(
Collections.emptyList(),
ImmutableList.of(new Alias(new Count(project.getOutput().get(0)), "count"),
new Alias(new Max(olapScan.getOutput().get(0)), "max")),
project);
context = MemoTestUtils.createCascadesContext(aggregate);
context.topDownRewrite(new PushAggregateToOlapScan());
pushedOlapScan = (LogicalOlapScan) (context.getMemo().copyOut().child(0).child(0));
Assertions.assertTrue(pushedOlapScan.isAggPushed());
Assertions.assertEquals(PushDownAggOperator.MIX, pushedOlapScan.getPushDownAggOperator());
}
@Test
void testProjectionCheck() {
LogicalOlapScan olapScan = PlanConstructor.newLogicalOlapScan(1, "tbl", 0);
LogicalProject<LogicalOlapScan> project = new LogicalProject<>(
ImmutableList.of(new Alias(new Ln(olapScan.getOutput().get(0)), "alias")), olapScan);
LogicalAggregate<LogicalProject<LogicalOlapScan>> aggregate;
CascadesContext context;
LogicalOlapScan pushedOlapScan;
// min max
aggregate = new LogicalAggregate<>(
Collections.emptyList(),
ImmutableList.of(new Alias(new Min(project.getOutput().get(0)), "min")),
project);
context = MemoTestUtils.createCascadesContext(aggregate);
context.topDownRewrite(new PushAggregateToOlapScan());
pushedOlapScan = (LogicalOlapScan) (context.getMemo().copyOut().child(0).child(0));
Assertions.assertFalse(pushedOlapScan.isAggPushed());
Assertions.assertEquals(PushDownAggOperator.NONE, pushedOlapScan.getPushDownAggOperator());
}
}

View File

@ -251,16 +251,16 @@ public class PlanEqualsTest {
PhysicalOlapScan actual = new PhysicalOlapScan(id, olapTable, Lists.newArrayList("a"),
olapTable.getBaseIndexId(), selectedTabletId, olapTable.getPartitionIds(), distributionSpecHash,
PreAggStatus.on(), Optional.empty(), logicalProperties);
PreAggStatus.on(), PushDownAggOperator.NONE, Optional.empty(), logicalProperties);
PhysicalOlapScan expected = new PhysicalOlapScan(id, olapTable, Lists.newArrayList("a"),
olapTable.getBaseIndexId(), selectedTabletId, olapTable.getPartitionIds(), distributionSpecHash,
PreAggStatus.on(), Optional.empty(), logicalProperties);
PreAggStatus.on(), PushDownAggOperator.NONE, Optional.empty(), logicalProperties);
Assertions.assertEquals(expected, actual);
PhysicalOlapScan unexpected = new PhysicalOlapScan(id, olapTable, Lists.newArrayList("b"),
olapTable.getBaseIndexId(), selectedTabletId, olapTable.getPartitionIds(), distributionSpecHash,
PreAggStatus.on(), Optional.empty(), logicalProperties);
PreAggStatus.on(), PushDownAggOperator.NONE, Optional.empty(), logicalProperties);
Assertions.assertNotEquals(unexpected, actual);
}

View File

@ -81,7 +81,7 @@ public class PlanToStringTest {
LogicalOlapScan plan = PlanConstructor.newLogicalOlapScan(0, "table", 0);
Assertions.assertTrue(
plan.toString().matches("LogicalOlapScan \\( qualified=db\\.table, "
+ "output=\\[id#\\d+, name#\\d+], candidateIndexIds=\\[], selectedIndexId=-1, preAgg=ON \\)"));
+ "output=\\[id#\\d+, name#\\d+], candidateIndexIds=\\[], selectedIndexId=-1, preAgg=ON, pushAgg=NONE \\)"));
}
@Test

View File

@ -77,7 +77,7 @@ public class PlanConstructor {
}
public static OlapTable newOlapTable(long tableId, String tableName, int hashColumn) {
return newOlapTable(tableId, tableName, hashColumn, KeysType.PRIMARY_KEYS);
return newOlapTable(tableId, tableName, hashColumn, KeysType.DUP_KEYS);
}
public static OlapTable newOlapTable(long tableId, String tableName, int hashColumn, KeysType keysType) {