[fix](nereids) unnest in-subquery with agg node in proper condition (#25800)
consider sql having in-subquery
SELECT count(*)
FROM sub_query_correlated_subquery6
WHERE k1 IN
(SELECT k1
FROM
(**SELECT k1,
sum(k3) AS bbb,
count(k2) AS aaa
FROM sub_query_correlated_subquery7
WHERE k1 > 0
AND k3 > 0
GROUP BY k1** ) y
WHERE y.aaa>0
AND k1>1);
The subquery part having agg is un-correlated, which can be unnested.
on the other side:
SELECT count(*)
FROM sub_query_correlated_subquery6
WHERE k1 IN
(SELECT k1
FROM
(**SELECT k1,
sum(k3) AS bbb,
count(k2) AS aaa
FROM sub_query_correlated_subquery7
WHERE k1 > 0
AND k3 > 0 and sub_query_correlated_subquery6.k1 > 2
GROUP BY k1** ) y
WHERE y.aaa>0
AND k1>1);
The subquery part having agg is correlated, which can't be unnested.
This commit is contained in:
@ -31,6 +31,7 @@ import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
|
||||
import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
@ -38,6 +39,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
@ -89,8 +91,7 @@ class SubExprAnalyzer extends DefaultExpressionRewriter<CascadesContext> {
|
||||
AnalyzedResult analyzedResult = analyzeSubquery(expr);
|
||||
|
||||
checkOutputColumn(analyzedResult.getLogicalPlan());
|
||||
checkHasNotAgg(analyzedResult);
|
||||
checkHasGroupBy(analyzedResult);
|
||||
checkNoCorrelatedSlotsUnderAgg(analyzedResult);
|
||||
checkRootIsLimit(analyzedResult);
|
||||
|
||||
return new InSubquery(
|
||||
@ -105,7 +106,7 @@ class SubExprAnalyzer extends DefaultExpressionRewriter<CascadesContext> {
|
||||
|
||||
checkOutputColumn(analyzedResult.getLogicalPlan());
|
||||
checkHasAgg(analyzedResult);
|
||||
checkHasGroupBy(analyzedResult);
|
||||
checkHasNoGroupBy(analyzedResult);
|
||||
|
||||
return new ScalarSubquery(analyzedResult.getLogicalPlan(), analyzedResult.getCorrelatedSlots());
|
||||
}
|
||||
@ -135,7 +136,7 @@ class SubExprAnalyzer extends DefaultExpressionRewriter<CascadesContext> {
|
||||
}
|
||||
}
|
||||
|
||||
private void checkHasGroupBy(AnalyzedResult analyzedResult) {
|
||||
private void checkHasNoGroupBy(AnalyzedResult analyzedResult) {
|
||||
if (!analyzedResult.isCorrelated()) {
|
||||
return;
|
||||
}
|
||||
@ -145,13 +146,11 @@ class SubExprAnalyzer extends DefaultExpressionRewriter<CascadesContext> {
|
||||
}
|
||||
}
|
||||
|
||||
private void checkHasNotAgg(AnalyzedResult analyzedResult) {
|
||||
if (!analyzedResult.isCorrelated()) {
|
||||
return;
|
||||
}
|
||||
if (analyzedResult.hasAgg()) {
|
||||
throw new AnalysisException("Unsupported correlated subquery with grouping and/or aggregation "
|
||||
+ analyzedResult.getLogicalPlan());
|
||||
private void checkNoCorrelatedSlotsUnderAgg(AnalyzedResult analyzedResult) {
|
||||
if (analyzedResult.hasCorrelatedSlotsUnderAgg()) {
|
||||
throw new AnalysisException(
|
||||
"Unsupported correlated subquery with grouping and/or aggregation "
|
||||
+ analyzedResult.getLogicalPlan());
|
||||
}
|
||||
}
|
||||
|
||||
@ -223,6 +222,29 @@ class SubExprAnalyzer extends DefaultExpressionRewriter<CascadesContext> {
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean hasCorrelatedSlotsUnderAgg() {
|
||||
return correlatedSlots.isEmpty() ? false
|
||||
: findAggContainsCorrelatedSlots(logicalPlan, ImmutableSet.copyOf(correlatedSlots));
|
||||
}
|
||||
|
||||
private boolean findAggContainsCorrelatedSlots(Plan rootPlan, ImmutableSet<Slot> slots) {
|
||||
ArrayDeque<Plan> planQueue = new ArrayDeque<>();
|
||||
planQueue.add(rootPlan);
|
||||
while (!planQueue.isEmpty()) {
|
||||
Plan plan = planQueue.poll();
|
||||
if (plan instanceof LogicalAggregate) {
|
||||
if (plan.containsSlots(slots)) {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
for (Plan child : plan.children()) {
|
||||
planQueue.add(child);
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean rootIsLimit() {
|
||||
return logicalPlan instanceof LogicalLimit;
|
||||
}
|
||||
|
||||
@ -38,6 +38,7 @@ import org.apache.doris.nereids.trees.expressions.literal.BooleanLiteral;
|
||||
import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
|
||||
@ -269,8 +270,10 @@ public class SubqueryToApply implements AnalysisRuleFactory {
|
||||
private boolean nonMarkJoinExistsWithAgg(SubqueryExpr exists,
|
||||
Map<SubqueryExpr, Optional<MarkJoinSlotReference>> subqueryToMarkJoinSlot) {
|
||||
return exists instanceof Exists
|
||||
&& exists.getQueryPlan().anyMatch(Aggregate.class::isInstance)
|
||||
&& !subqueryToMarkJoinSlot.get(exists).isPresent();
|
||||
&& exists.getQueryPlan()
|
||||
.anyMatch(planTreeNode -> planTreeNode instanceof LogicalAggregate
|
||||
&& ((LogicalAggregate<?>) planTreeNode).getGroupByExpressions().isEmpty())
|
||||
&& !subqueryToMarkJoinSlot.get(exists).isPresent();
|
||||
}
|
||||
|
||||
private LogicalPlan addApply(SubqueryExpr subquery, LogicalPlan childPlan,
|
||||
|
||||
@ -65,6 +65,12 @@ public interface Plan extends TreeNode<Plan> {
|
||||
return getExpressions().stream().anyMatch(Expression::hasUnbound);
|
||||
}
|
||||
|
||||
default boolean containsSlots(ImmutableSet<Slot> slots) {
|
||||
return getExpressions().stream().anyMatch(
|
||||
expression -> !Sets.intersection(slots, expression.getInputSlots()).isEmpty()
|
||||
|| children().stream().anyMatch(plan -> plan.containsSlots(slots)));
|
||||
}
|
||||
|
||||
default LogicalProperties computeLogicalProperties() {
|
||||
throw new IllegalStateException("Not support compute logical properties for " + getClass().getName());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user