From 6b52e4780594150719c49563d50a0b1af404155e Mon Sep 17 00:00:00 2001 From: starocean999 <40539150+starocean999@users.noreply.github.com> Date: Tue, 13 Sep 2022 11:28:27 +0800 Subject: [PATCH] [fix](agg)the intermediate slots should be materialized as output slots (#12441) in some case, the output slots of agg info may be materialized by call SlotDescriptor's materializeSrcExpr method, but not the intermediate slots. This pr set intermediate slots materialized info to keep consistent with output slots. --- .../apache/doris/analysis/AggregateInfo.java | 24 +++++++ .../doris/analysis/AggregateInfoBase.java | 11 +++ .../doris/analysis/DescriptorTable.java | 13 ++++ .../java/org/apache/doris/analysis/Expr.java | 12 ++++ .../apache/doris/analysis/SlotDescriptor.java | 1 + .../apache/doris/planner/AggregationNode.java | 4 +- .../doris/planner/SingleNodePlanner.java | 1 + .../correctness_p0/test_subquery_with_agg.out | 6 ++ .../test_subquery_with_agg.groovy | 70 +++++++++++++++++++ 9 files changed, 141 insertions(+), 1 deletion(-) create mode 100644 regression-test/data/correctness_p0/test_subquery_with_agg.out create mode 100644 regression-test/suites/correctness_p0/test_subquery_with_agg.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java index 77c91131a2..d4137f148e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfo.java @@ -834,6 +834,30 @@ public final class AggregateInfo extends AggregateInfoBase { } } + public void updateMaterializedSlots() { + // why output and intermediate may have different materialized slots? + // because some slot is materialized by materializeSrcExpr method directly + // in that case, only output slots is materialized + // assume output tuple has correct marterialized infomation + // we update intermediate tuple and materializedSlots based on output tuple + materializedSlots.clear(); + ArrayList outputSlots = outputTupleDesc.getSlots(); + int groupingExprNum = groupingExprs != null ? groupingExprs.size() : 0; + Preconditions.checkState(groupingExprNum <= outputSlots.size()); + for (int i = groupingExprNum; i < outputSlots.size(); ++i) { + if (outputSlots.get(i).isMaterialized()) { + materializedSlots.add(i - groupingExprNum); + } + } + + ArrayList intermediateSlots = intermediateTupleDesc.getSlots(); + Preconditions.checkState(intermediateSlots.size() == outputSlots.size()); + for (int i = 0; i < outputSlots.size(); ++i) { + intermediateSlots.get(i).setIsMaterialized(outputSlots.get(i).isMaterialized()); + } + intermediateTupleDesc.computeStatAndMemLayout(); + } + /** * Mark slots required for this aggregation as materialized: * - all grouping output slots as well as grouping exprs diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java index 88cd05c78f..a8d4aef161 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AggregateInfoBase.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; /** @@ -106,6 +107,16 @@ public abstract class AggregateInfoBase { intermediateTupleDesc = createTupleDesc(analyzer, false); if (requiresIntermediateTuple(aggregateExprs, groupingExprs.size() == 0)) { outputTupleDesc = createTupleDesc(analyzer, true); + // save the output and intermediate slots info into global desc table + // after creaing the plan, we can call materializeIntermediateSlots method + // to set the materialized info to intermediate slots based on output slots. + ArrayList outputSlots = outputTupleDesc.getSlots(); + ArrayList intermediateSlots = intermediateTupleDesc.getSlots(); + HashMap mapping = new HashMap<>(); + for (int i = 0; i < outputSlots.size(); ++i) { + mapping.put(outputSlots.get(i), intermediateSlots.get(i)); + } + analyzer.getDescTbl().addSlotMappingInfo(mapping); } else { outputTupleDesc = intermediateTupleDesc; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java index fe2b6c3abf..dcf05ad6d3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java @@ -36,6 +36,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; /** * Repository for tuple (and slot) descriptors. @@ -53,6 +54,8 @@ public class DescriptorTable { private final IdGenerator slotIdGenerator = SlotId.createGenerator(); private final HashMap slotDescs = Maps.newHashMap(); + private final HashMap outToIntermediateSlots = new HashMap<>(); + public DescriptorTable() { } @@ -166,6 +169,16 @@ public class DescriptorTable { } } + public void addSlotMappingInfo(Map mapping) { + outToIntermediateSlots.putAll(mapping); + } + + public void materializeIntermediateSlots() { + for (Map.Entry entry : outToIntermediateSlots.entrySet()) { + entry.getValue().setIsMaterialized(entry.getKey().isMaterialized()); + } + } + public TDescriptorTable toThrift() { TDescriptorTable result = new TDescriptorTable(); HashSet referencedTbls = Sets.newHashSet(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java index 746137af2c..d0a07c68e7 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java @@ -1995,4 +1995,16 @@ public abstract class Expr extends TreeNode implements ParseNode, Cloneabl public void finalizeImplForNereids() throws AnalysisException { throw new AnalysisException("analyze for Nereids do not implementation."); } + + public void materializeSrcExpr() { + if (this instanceof SlotRef) { + SlotRef thisRef = (SlotRef) this; + SlotDescriptor slotDesc = thisRef.getDesc(); + slotDesc.setIsMaterialized(true); + slotDesc.getSourceExprs().forEach(Expr::materializeSrcExpr); + } + for (Expr child : children) { + child.materializeSrcExpr(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java index 330bcea5e2..8541fc4343 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SlotDescriptor.java @@ -168,6 +168,7 @@ public class SlotDescriptor { } for (Expr expr : sourceExprs) { if (!(expr instanceof SlotRef)) { + expr.materializeSrcExpr(); continue; } SlotRef slotRef = (SlotRef) expr; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index aca046c7a1..a1c6b19d1c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -248,6 +248,7 @@ public class AggregationNode extends PlanNode { @Override protected void toThrift(TPlanNode msg) { + aggInfo.updateMaterializedSlots(); msg.node_type = TPlanNodeType.AGGREGATION_NODE; List aggregateFunctions = Lists.newArrayList(); List aggSortInfos = Lists.newArrayList(); @@ -289,6 +290,7 @@ public class AggregationNode extends PlanNode { @Override public String getNodeExplainString(String detailPrefix, TExplainLevel detailLevel) { + aggInfo.updateMaterializedSlots(); StringBuilder output = new StringBuilder(); String nameDetail = getDisplayLabelDetail(); if (nameDetail != null) { @@ -301,7 +303,7 @@ public class AggregationNode extends PlanNode { if (aggInfo.getAggregateExprs() != null && aggInfo.getMaterializedAggregateExprs().size() > 0) { output.append(detailPrefix + "output: ").append( - getExplainString(aggInfo.getAggregateExprs()) + "\n"); + getExplainString(aggInfo.getMaterializedAggregateExprs()) + "\n"); } // TODO: group by can be very long. Break it into multiple lines output.append(detailPrefix + "group by: ").append(getExplainString(aggInfo.getGroupingExprs()) + "\n"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index faa1aebe63..d201b21938 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -171,6 +171,7 @@ public class SingleNodePlanner { PlanNode singleNodePlan = createQueryPlan(queryStmt, analyzer, ctx.getQueryOptions().getDefaultOrderByLimit()); Preconditions.checkNotNull(singleNodePlan); + analyzer.getDescTbl().materializeIntermediateSlots(); return singleNodePlan; } diff --git a/regression-test/data/correctness_p0/test_subquery_with_agg.out b/regression-test/data/correctness_p0/test_subquery_with_agg.out new file mode 100644 index 0000000000..4c443b8493 --- /dev/null +++ b/regression-test/data/correctness_p0/test_subquery_with_agg.out @@ -0,0 +1,6 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +1 +2 +3 + diff --git a/regression-test/suites/correctness_p0/test_subquery_with_agg.groovy b/regression-test/suites/correctness_p0/test_subquery_with_agg.groovy new file mode 100644 index 0000000000..49362ba86e --- /dev/null +++ b/regression-test/suites/correctness_p0/test_subquery_with_agg.groovy @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_subquery_with_agg") { + sql """ + drop table if exists agg_subquery_table; + """ + + sql """ + CREATE TABLE agg_subquery_table + ( + gid varchar(50) NOT NULL, + num int(11) SUM NOT NULL DEFAULT "0", + id_bitmap bitmap BITMAP_UNION NOT NULL + ) ENGINE = OLAP + AGGREGATE KEY(gid) + DISTRIBUTED BY HASH(gid) BUCKETS 4 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + sql """ + INSERT INTO agg_subquery_table VALUES + ('1',4,to_bitmap(7)), + ('2',5,to_bitmap(8)), + ('3',6,to_bitmap(9)); + """ + + qt_select """ + SELECT + subq_1.gid AS c0 + FROM + agg_subquery_table AS subq_1 + WHERE + EXISTS ( + SELECT + ref_2.amt AS c2 + FROM + ( + SELECT + bitmap_union_count(id_bitmap) AS unt, + sum(num) AS amt + FROM + agg_subquery_table + ) AS ref_2 + ) + order by + subq_1.gid; + """ + + sql """ + drop table if exists agg_subquery_table; + """ + +}