[opt](Nereids) opt distinct agg without group by key plan #32236
This commit is contained in:
@ -107,12 +107,23 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
|
||||
if (!agg.getAggregateParam().canBeBanned) {
|
||||
return true;
|
||||
}
|
||||
// forbid one phase agg on distribute and three or four stage distinct agg inter by distribute
|
||||
if ((agg.getAggMode() == AggMode.INPUT_TO_RESULT || agg.getAggMode() == AggMode.BUFFER_TO_BUFFER)
|
||||
&& children.get(0).getPlan() instanceof PhysicalDistribute) {
|
||||
// forbid one phase agg on distribute
|
||||
if (agg.getAggMode() == AggMode.INPUT_TO_RESULT && children.get(0).getPlan() instanceof PhysicalDistribute) {
|
||||
// this means one stage gather agg, usually bad pattern
|
||||
return false;
|
||||
}
|
||||
// forbid three or four stage distinct agg inter by distribute
|
||||
if (agg.getAggMode() == AggMode.BUFFER_TO_BUFFER && children.get(0).getPlan() instanceof PhysicalDistribute) {
|
||||
// if distinct without group by key, we prefer three or four stage distinct agg
|
||||
// because the second phase of multi-distinct only have one instance, and it is slow generally.
|
||||
if (agg.getGroupByExpressions().size() == 1
|
||||
&& agg.getOutputExpressions().size() == 1) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
||||
}
|
||||
|
||||
// forbid TWO_PHASE_AGGREGATE_WITH_DISTINCT after shuffle
|
||||
// TODO: this is forbid good plan after cte reuse by mistake
|
||||
if (agg.getAggMode() == AggMode.INPUT_TO_BUFFER
|
||||
@ -160,6 +171,11 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// if distinct without group by key, we prefer three or four stage distinct agg
|
||||
// because the second phase of multi-distinct only have one instance, and it is slow generally.
|
||||
if (agg.getOutputExpressions().size() == 1 && agg.getGroupByExpressions().isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
// process must shuffle
|
||||
|
||||
@ -30,7 +30,6 @@ import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
|
||||
import org.apache.doris.nereids.trees.expressions.WindowExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.Literal;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
|
||||
@ -113,27 +112,27 @@ public class NormalizeAggregate implements RewriteRuleFactory, NormalizeToSlot {
|
||||
|
||||
private LogicalPlan normalizeAgg(LogicalAggregate<Plan> aggregate, Optional<LogicalHaving<?>> having) {
|
||||
// The LogicalAggregate node may contain window agg functions and usual agg functions
|
||||
// we call window agg functions as window-agg and usual agg functions as trival-agg for short
|
||||
// we call window agg functions as window-agg and usual agg functions as trivial-agg for short
|
||||
// This rule simplify LogicalAggregate node by:
|
||||
// 1. Push down some exprs from old LogicalAggregate node to a new child LogicalProject Node,
|
||||
// 2. create a new LogicalAggregate with normalized group by exprs and trival-aggs
|
||||
// 2. create a new LogicalAggregate with normalized group by exprs and trivial-aggs
|
||||
// 3. Pull up normalized old LogicalAggregate's output exprs to a new parent LogicalProject Node
|
||||
// Push down exprs:
|
||||
// 1. all group by exprs
|
||||
// 2. child contains subquery expr in trival-agg
|
||||
// 3. child contains window expr in trival-agg
|
||||
// 4. all input slots of trival-agg
|
||||
// 5. expr(including subquery) in distinct trival-agg
|
||||
// 2. child contains subquery expr in trivial-agg
|
||||
// 3. child contains window expr in trivial-agg
|
||||
// 4. all input slots of trivial-agg
|
||||
// 5. expr(including subquery) in distinct trivial-agg
|
||||
// Normalize LogicalAggregate's output.
|
||||
// 1. normalize group by exprs by outputs of bottom LogicalProject
|
||||
// 2. normalize trival-aggs by outputs of bottom LogicalProject
|
||||
// 2. normalize trivial-aggs by outputs of bottom LogicalProject
|
||||
// 3. build normalized agg outputs
|
||||
// Pull up exprs:
|
||||
// normalize all output exprs in old LogicalAggregate to build a parent project node, typically includes:
|
||||
// 1. simple slots
|
||||
// 2. aliases
|
||||
// a. alias with no aggs child
|
||||
// b. alias with trival-agg child
|
||||
// b. alias with trivial-agg child
|
||||
// c. alias with window-agg
|
||||
|
||||
// Push down exprs:
|
||||
@ -141,13 +140,13 @@ public class NormalizeAggregate implements RewriteRuleFactory, NormalizeToSlot {
|
||||
Set<Expression> groupingByExprs =
|
||||
ImmutableSet.copyOf(aggregate.getGroupByExpressions());
|
||||
|
||||
// collect all trival-agg
|
||||
// collect all trivial-agg
|
||||
List<NamedExpression> aggregateOutput = aggregate.getOutputExpressions();
|
||||
List<AggregateFunction> aggFuncs = CollectNonWindowedAggFuncs.collect(aggregateOutput);
|
||||
|
||||
// split non-distinct agg child as two part
|
||||
// TRUE part 1: need push down itself, if it contains subqury or window expression
|
||||
// FALSE part 2: need push down its input slots, if it DOES NOT contain subqury or window expression
|
||||
// TRUE part 1: need push down itself, if it contains subquery or window expression
|
||||
// FALSE part 2: need push down its input slots, if it DOES NOT contain subquery or window expression
|
||||
Map<Boolean, Set<Expression>> categorizedNoDistinctAggsChildren = aggFuncs.stream()
|
||||
.filter(aggFunc -> !aggFunc.isDistinct())
|
||||
.flatMap(agg -> agg.children().stream())
|
||||
@ -159,10 +158,9 @@ public class NormalizeAggregate implements RewriteRuleFactory, NormalizeToSlot {
|
||||
// TRUE part 1: need push down itself, if it is NOT SlotReference or Literal
|
||||
// FALSE part 2: need push down its input slots, if it is SlotReference or Literal
|
||||
Map<Boolean, Set<Expression>> categorizedDistinctAggsChildren = aggFuncs.stream()
|
||||
.filter(aggFunc -> aggFunc.isDistinct()).flatMap(agg -> agg.children().stream())
|
||||
.collect(Collectors.groupingBy(
|
||||
child -> !(child instanceof SlotReference || child instanceof Literal),
|
||||
Collectors.toSet()));
|
||||
.filter(AggregateFunction::isDistinct)
|
||||
.flatMap(agg -> agg.children().stream())
|
||||
.collect(Collectors.groupingBy(child -> !(child instanceof SlotReference), Collectors.toSet()));
|
||||
|
||||
Set<Expression> needPushSelf = Sets.union(
|
||||
categorizedNoDistinctAggsChildren.getOrDefault(true, new HashSet<>()),
|
||||
@ -176,20 +174,20 @@ public class NormalizeAggregate implements RewriteRuleFactory, NormalizeToSlot {
|
||||
|
||||
// push down 3 kinds of exprs, these pushed exprs will be used to normalize agg output later
|
||||
// 1. group by exprs
|
||||
// 2. trivalAgg children
|
||||
// 3. trivalAgg input slots
|
||||
// 2. trivialAgg children
|
||||
// 3. trivialAgg input slots
|
||||
Set<Expression> allPushDownExprs =
|
||||
Sets.union(groupingByExprs, Sets.union(needPushSelf, needPushInputSlots));
|
||||
NormalizeToSlotContext bottomSlotContext =
|
||||
NormalizeToSlotContext.buildContext(existsAlias, allPushDownExprs);
|
||||
Set<NamedExpression> pushedGroupByExprs =
|
||||
bottomSlotContext.pushDownToNamedExpression(groupingByExprs);
|
||||
Set<NamedExpression> pushedTrivalAggChildren =
|
||||
Set<NamedExpression> pushedTrivialAggChildren =
|
||||
bottomSlotContext.pushDownToNamedExpression(needPushSelf);
|
||||
Set<NamedExpression> pushedTrivalAggInputSlots =
|
||||
Set<NamedExpression> pushedTrivialAggInputSlots =
|
||||
bottomSlotContext.pushDownToNamedExpression(needPushInputSlots);
|
||||
Set<NamedExpression> bottomProjects = Sets.union(pushedGroupByExprs,
|
||||
Sets.union(pushedTrivalAggChildren, pushedTrivalAggInputSlots));
|
||||
Sets.union(pushedTrivialAggChildren, pushedTrivialAggInputSlots));
|
||||
|
||||
// create bottom project
|
||||
Plan bottomPlan;
|
||||
@ -215,7 +213,7 @@ public class NormalizeAggregate implements RewriteRuleFactory, NormalizeToSlot {
|
||||
List<Expression> normalizedGroupExprs =
|
||||
bottomSlotContext.normalizeToUseSlotRef(groupingByExprs);
|
||||
|
||||
// normalize trival-aggs by bottomProjects
|
||||
// normalize trivial-aggs by bottomProjects
|
||||
List<AggregateFunction> normalizedAggFuncs =
|
||||
bottomSlotContext.normalizeToUseSlotRef(aggFuncs);
|
||||
if (normalizedAggFuncs.stream().anyMatch(agg -> !agg.children().isEmpty()
|
||||
@ -237,7 +235,7 @@ public class NormalizeAggregate implements RewriteRuleFactory, NormalizeToSlot {
|
||||
.build();
|
||||
|
||||
// create new agg node
|
||||
LogicalAggregate newAggregate =
|
||||
LogicalAggregate<?> newAggregate =
|
||||
aggregate.withNormalized(normalizedGroupExprs, normalizedAggOutput, bottomPlan);
|
||||
|
||||
// create upper projects by normalize all output exprs in old LogicalAggregate
|
||||
|
||||
Reference in New Issue
Block a user