[enhancement](nereids) support unnest subquery with group by and having clause (#32002)

This commit is contained in:
starocean999
2024-03-13 19:13:12 +08:00
committed by yiguolei
parent 56a14c912a
commit bede948029
6 changed files with 109 additions and 103 deletions

View File

@ -105,7 +105,6 @@ import org.apache.doris.nereids.rules.rewrite.PushConjunctsIntoOdbcScan;
import org.apache.doris.nereids.rules.rewrite.PushDownAggThroughJoin;
import org.apache.doris.nereids.rules.rewrite.PushDownAggThroughJoinOneSide;
import org.apache.doris.nereids.rules.rewrite.PushDownDistinctThroughJoin;
import org.apache.doris.nereids.rules.rewrite.PushDownFilterThroughAggregation;
import org.apache.doris.nereids.rules.rewrite.PushDownFilterThroughProject;
import org.apache.doris.nereids.rules.rewrite.PushDownLimit;
import org.apache.doris.nereids.rules.rewrite.PushDownLimitDistinctThroughJoin;
@ -171,51 +170,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
// after doing NormalizeAggregate in analysis job
// we need run the following 2 rules to make AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION work
bottomUp(new PullUpProjectUnderApply()),
topDown(
/*
* for subquery unnest, we need hand sql like
*
* SELECT *
* FROM table1 AS t1
* WHERE EXISTS
* (SELECT `pk`
* FROM table2 AS t2
* WHERE t1.pk = t2 .pk
* GROUP BY t2.pk
* HAVING t2.pk > 0) ;
*
* before:
* apply
* / \
* child Filter(t2.pk > 0)
* |
* Project(t2.pk)
* |
* agg
* |
* Project(t2.pk)
* |
* Filter(t1.pk=t2.pk)
* |
* child
*
* after:
* apply
* / \
* child agg
* |
* Project(t2.pk)
* |
* Filter(t1.pk=t2.pk and t2.pk >0)
* |
* child
*
* then PullUpCorrelatedFilterUnderApplyAggregateProject rule can match the node pattern
*/
new PushDownFilterThroughAggregation(),
new PushDownFilterThroughProject(),
new MergeFilters()
),
topDown(new PushDownFilterThroughProject()),
custom(RuleType.AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION,
AggScalarSubQueryToWindowFunction::new),
bottomUp(

View File

@ -137,7 +137,9 @@ public enum RuleType {
UN_CORRELATED_APPLY_FILTER(RuleTypeClass.REWRITE),
UN_CORRELATED_APPLY_PROJECT_FILTER(RuleTypeClass.REWRITE),
UN_CORRELATED_APPLY_AGGREGATE_FILTER(RuleTypeClass.REWRITE),
UN_CORRELATED_APPLY_FILTER_AGGREGATE_FILTER(RuleTypeClass.REWRITE),
PULL_UP_CORRELATED_FILTER_UNDER_APPLY_AGGREGATE_PROJECT(RuleTypeClass.REWRITE),
PULL_UP_CORRELATED_FILTER_UNDER_APPLY_FILTER_AGGREGATE_PROJECT(RuleTypeClass.REWRITE),
SCALAR_APPLY_TO_JOIN(RuleTypeClass.REWRITE),
IN_APPLY_TO_JOIN(RuleTypeClass.REWRITE),
EXISTS_APPLY_TO_JOIN(RuleTypeClass.REWRITE),

View File

@ -36,8 +36,10 @@ import java.util.List;
* <pre>
* before:
* apply
* / \
* Input(output:b) agg
* / \
* Input(output:b) Filter(this node's existence depends on having clause's existence)
* |
* agg
* |
* Project(output:a)
* |
@ -47,8 +49,10 @@ import java.util.List;
*
* after:
* apply
* / \
* Input(output:b) agg
* / \
* Input(output:b) Filter(this node's existence depends on having clause's existence)
* |
* agg
* |
* Filter(correlated predicate(Input.e = this.f)/Unapply predicate)
* |
@ -57,27 +61,43 @@ import java.util.List;
* child
* </pre>
*/
public class PullUpCorrelatedFilterUnderApplyAggregateProject extends OneRewriteRuleFactory {
public class PullUpCorrelatedFilterUnderApplyAggregateProject implements RewriteRuleFactory {
@Override
public Rule build() {
return logicalApply(any(), logicalAggregate(logicalProject(logicalFilter())))
.when(LogicalApply::isCorrelated).then(apply -> {
LogicalAggregate<LogicalProject<LogicalFilter<Plan>>> agg = apply.right();
public List<Rule> buildRules() {
return ImmutableList.of(logicalApply(any(), logicalAggregate(
logicalProject(logicalFilter()))).when(LogicalApply::isCorrelated).then(
PullUpCorrelatedFilterUnderApplyAggregateProject::pullUpCorrelatedFilter)
.toRule(RuleType.PULL_UP_CORRELATED_FILTER_UNDER_APPLY_AGGREGATE_PROJECT),
logicalApply(any(), logicalFilter((logicalAggregate(
logicalProject(logicalFilter()))))).when(LogicalApply::isCorrelated).then(
PullUpCorrelatedFilterUnderApplyAggregateProject::pullUpCorrelatedFilter)
.toRule(RuleType.PULL_UP_CORRELATED_FILTER_UNDER_APPLY_FILTER_AGGREGATE_PROJECT));
}
LogicalProject<LogicalFilter<Plan>> project = agg.child();
LogicalFilter<Plan> filter = project.child();
List<NamedExpression> newProjects = Lists.newArrayList();
newProjects.addAll(project.getProjects());
filter.child().getOutput().forEach(slot -> {
if (!newProjects.contains(slot)) {
newProjects.add(slot);
}
});
private static LogicalApply<?, ?> pullUpCorrelatedFilter(LogicalApply<?, ?> apply) {
boolean isRightChildAgg = apply.right() instanceof LogicalAggregate;
// locate agg node
LogicalAggregate<LogicalProject<LogicalFilter<Plan>>> agg = isRightChildAgg
? (LogicalAggregate<LogicalProject<LogicalFilter<Plan>>>) (apply.right())
: (LogicalAggregate<LogicalProject<LogicalFilter<Plan>>>) (apply.right().child(0));
LogicalProject<Plan> newProject = project.withProjectsAndChild(newProjects, filter.child());
LogicalFilter<Plan> newFilter = new LogicalFilter<>(filter.getConjuncts(), newProject);
LogicalAggregate<Plan> newAgg = agg.withChildren(ImmutableList.of(newFilter));
return apply.withChildren(apply.left(), newAgg);
}).toRule(RuleType.PULL_UP_CORRELATED_FILTER_UNDER_APPLY_AGGREGATE_PROJECT);
// pull up filter under the project
LogicalProject<LogicalFilter<Plan>> project = agg.child();
LogicalFilter<Plan> filter = project.child();
List<NamedExpression> newProjects = Lists.newArrayList();
newProjects.addAll(project.getProjects());
// filter may use all slots from its child, so add all the slots to newProjects
filter.child().getOutput().forEach(slot -> {
if (!newProjects.contains(slot)) {
newProjects.add(slot);
}
});
LogicalProject<Plan> newProject = project.withProjectsAndChild(newProjects, filter.child());
LogicalFilter<Plan> newFilter = new LogicalFilter<>(filter.getConjuncts(), newProject);
LogicalAggregate<Plan> newAgg = agg.withChildren(ImmutableList.of(newFilter));
return (LogicalApply<?, ?>) (apply.withChildren(apply.left(),
isRightChildAgg ? newAgg : apply.right().withChildren(newAgg)));
}
}

View File

@ -44,52 +44,69 @@ import java.util.Map;
* the output column is the correlated column and the input column.
* <pre>
* before:
* apply
* / \
* Input(output:b) agg(output:fn; group by:null)
* apply
* / \
* Input(output:b) Filter(this node's existence depends on having clause's existence)
* |
* agg(output:fn; group by:null)
* |
* Filter(correlated predicate(Input.e = this.f)/Unapply predicate)
*
* end:
* apply(correlated predicate(Input.e = this.f))
* / \
* Input(output:b) agg(output:fn,this.f; group by:this.f)
* Input(output:b) Filter(this node's existence depends on having clause's existence)
* |
* agg(output:fn,this.f; group by:this.f)
* |
* Filter(Uncorrelated predicate)
* </pre>
*/
public class UnCorrelatedApplyAggregateFilter extends OneRewriteRuleFactory {
public class UnCorrelatedApplyAggregateFilter implements RewriteRuleFactory {
@Override
public Rule build() {
return logicalApply(any(), logicalAggregate(logicalFilter())).when(LogicalApply::isCorrelated).then(apply -> {
LogicalAggregate<LogicalFilter<Plan>> agg = apply.right();
LogicalFilter<Plan> filter = agg.child();
Map<Boolean, List<Expression>> split = Utils.splitCorrelatedConjuncts(
filter.getConjuncts(), apply.getCorrelationSlot());
List<Expression> correlatedPredicate = split.get(true);
List<Expression> unCorrelatedPredicate = split.get(false);
public List<Rule> buildRules() {
return ImmutableList.of(
logicalApply(any(), logicalAggregate(logicalFilter()))
.when(LogicalApply::isCorrelated)
.then(UnCorrelatedApplyAggregateFilter::pullUpCorrelatedFilter)
.toRule(RuleType.UN_CORRELATED_APPLY_AGGREGATE_FILTER),
logicalApply(any(), logicalFilter(logicalAggregate(logicalFilter())))
.when(LogicalApply::isCorrelated)
.then(UnCorrelatedApplyAggregateFilter::pullUpCorrelatedFilter)
.toRule(RuleType.UN_CORRELATED_APPLY_FILTER_AGGREGATE_FILTER));
}
// the representative has experienced the rule and added the correlated predicate to the apply node
if (correlatedPredicate.isEmpty()) {
return apply;
}
private static LogicalApply<?, ?> pullUpCorrelatedFilter(LogicalApply<?, ?> apply) {
boolean isRightChildAgg = apply.right() instanceof LogicalAggregate;
// locate agg node
LogicalAggregate<LogicalFilter<Plan>> agg =
isRightChildAgg ? (LogicalAggregate<LogicalFilter<Plan>>) (apply.right())
: (LogicalAggregate<LogicalFilter<Plan>>) (apply.right().child(0));
LogicalFilter<Plan> filter = agg.child();
// split filter conjuncts to correlated and unCorrelated ones
Map<Boolean, List<Expression>> split =
Utils.splitCorrelatedConjuncts(filter.getConjuncts(), apply.getCorrelationSlot());
List<Expression> correlatedPredicate = split.get(true);
List<Expression> unCorrelatedPredicate = split.get(false);
List<NamedExpression> newAggOutput = new ArrayList<>(agg.getOutputExpressions());
List<Expression> newGroupby = Utils.getCorrelatedSlots(correlatedPredicate,
apply.getCorrelationSlot());
newGroupby.addAll(agg.getGroupByExpressions());
newAggOutput.addAll(newGroupby.stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList()));
LogicalAggregate newAgg = new LogicalAggregate<>(
newGroupby, newAggOutput,
PlanUtils.filterOrSelf(ImmutableSet.copyOf(unCorrelatedPredicate), filter.child()));
return new LogicalApply<>(apply.getCorrelationSlot(),
apply.getSubqueryExpr(),
ExpressionUtils.optionalAnd(correlatedPredicate),
apply.getMarkJoinSlotReference(),
apply.isNeedAddSubOutputToProjects(),
apply.isInProject(), apply.isMarkJoinSlotNotNull(), apply.left(), newAgg);
}).toRule(RuleType.UN_CORRELATED_APPLY_AGGREGATE_FILTER);
// the representative has experienced the rule and added the correlated predicate to the apply node
if (correlatedPredicate.isEmpty()) {
return apply;
}
// pull up correlated filter into apply node
List<NamedExpression> newAggOutput = new ArrayList<>(agg.getOutputExpressions());
List<Expression> newGroupby =
Utils.getCorrelatedSlots(correlatedPredicate, apply.getCorrelationSlot());
newGroupby.addAll(agg.getGroupByExpressions());
newAggOutput.addAll(newGroupby.stream().map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList()));
LogicalAggregate newAgg = new LogicalAggregate<>(newGroupby, newAggOutput,
PlanUtils.filterOrSelf(ImmutableSet.copyOf(unCorrelatedPredicate), filter.child()));
return new LogicalApply<>(apply.getCorrelationSlot(), apply.getSubqueryExpr(),
ExpressionUtils.optionalAnd(correlatedPredicate), apply.getMarkJoinSlotReference(),
apply.isNeedAddSubOutputToProjects(), apply.isInProject(),
apply.isMarkJoinSlotNotNull(), apply.left(),
isRightChildAgg ? newAgg : apply.right().withChildren(newAgg));
}
}