[fix](agg)the intermediate slots should be materialized as output slots (#12441)

in some case, the output slots of agg info may be materialized by call SlotDescriptor's materializeSrcExpr method, but not the intermediate slots. This pr set intermediate slots materialized info to keep consistent with output slots.
This commit is contained in:
starocean999
2022-09-13 11:28:27 +08:00
committed by GitHub
parent 550b1e531b
commit 6b52e47805
9 changed files with 141 additions and 1 deletions

View File

@ -834,6 +834,30 @@ public final class AggregateInfo extends AggregateInfoBase {
}
}
public void updateMaterializedSlots() {
// why output and intermediate may have different materialized slots?
// because some slot is materialized by materializeSrcExpr method directly
// in that case, only output slots is materialized
// assume output tuple has correct marterialized infomation
// we update intermediate tuple and materializedSlots based on output tuple
materializedSlots.clear();
ArrayList<SlotDescriptor> outputSlots = outputTupleDesc.getSlots();
int groupingExprNum = groupingExprs != null ? groupingExprs.size() : 0;
Preconditions.checkState(groupingExprNum <= outputSlots.size());
for (int i = groupingExprNum; i < outputSlots.size(); ++i) {
if (outputSlots.get(i).isMaterialized()) {
materializedSlots.add(i - groupingExprNum);
}
}
ArrayList<SlotDescriptor> intermediateSlots = intermediateTupleDesc.getSlots();
Preconditions.checkState(intermediateSlots.size() == outputSlots.size());
for (int i = 0; i < outputSlots.size(); ++i) {
intermediateSlots.get(i).setIsMaterialized(outputSlots.get(i).isMaterialized());
}
intermediateTupleDesc.computeStatAndMemLayout();
}
/**
* Mark slots required for this aggregation as materialized:
* - all grouping output slots as well as grouping exprs

View File

@ -31,6 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
/**
@ -106,6 +107,16 @@ public abstract class AggregateInfoBase {
intermediateTupleDesc = createTupleDesc(analyzer, false);
if (requiresIntermediateTuple(aggregateExprs, groupingExprs.size() == 0)) {
outputTupleDesc = createTupleDesc(analyzer, true);
// save the output and intermediate slots info into global desc table
// after creaing the plan, we can call materializeIntermediateSlots method
// to set the materialized info to intermediate slots based on output slots.
ArrayList<SlotDescriptor> outputSlots = outputTupleDesc.getSlots();
ArrayList<SlotDescriptor> intermediateSlots = intermediateTupleDesc.getSlots();
HashMap<SlotDescriptor, SlotDescriptor> mapping = new HashMap<>();
for (int i = 0; i < outputSlots.size(); ++i) {
mapping.put(outputSlots.get(i), intermediateSlots.get(i));
}
analyzer.getDescTbl().addSlotMappingInfo(mapping);
} else {
outputTupleDesc = intermediateTupleDesc;
}

View File

@ -36,6 +36,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
/**
* Repository for tuple (and slot) descriptors.
@ -53,6 +54,8 @@ public class DescriptorTable {
private final IdGenerator<SlotId> slotIdGenerator = SlotId.createGenerator();
private final HashMap<SlotId, SlotDescriptor> slotDescs = Maps.newHashMap();
private final HashMap<SlotDescriptor, SlotDescriptor> outToIntermediateSlots = new HashMap<>();
public DescriptorTable() {
}
@ -166,6 +169,16 @@ public class DescriptorTable {
}
}
public void addSlotMappingInfo(Map<SlotDescriptor, SlotDescriptor> mapping) {
outToIntermediateSlots.putAll(mapping);
}
public void materializeIntermediateSlots() {
for (Map.Entry<SlotDescriptor, SlotDescriptor> entry : outToIntermediateSlots.entrySet()) {
entry.getValue().setIsMaterialized(entry.getKey().isMaterialized());
}
}
public TDescriptorTable toThrift() {
TDescriptorTable result = new TDescriptorTable();
HashSet<TableIf> referencedTbls = Sets.newHashSet();

View File

@ -1995,4 +1995,16 @@ public abstract class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
public void finalizeImplForNereids() throws AnalysisException {
throw new AnalysisException("analyze for Nereids do not implementation.");
}
public void materializeSrcExpr() {
if (this instanceof SlotRef) {
SlotRef thisRef = (SlotRef) this;
SlotDescriptor slotDesc = thisRef.getDesc();
slotDesc.setIsMaterialized(true);
slotDesc.getSourceExprs().forEach(Expr::materializeSrcExpr);
}
for (Expr child : children) {
child.materializeSrcExpr();
}
}
}

View File

@ -168,6 +168,7 @@ public class SlotDescriptor {
}
for (Expr expr : sourceExprs) {
if (!(expr instanceof SlotRef)) {
expr.materializeSrcExpr();
continue;
}
SlotRef slotRef = (SlotRef) expr;

View File

@ -248,6 +248,7 @@ public class AggregationNode extends PlanNode {
@Override
protected void toThrift(TPlanNode msg) {
aggInfo.updateMaterializedSlots();
msg.node_type = TPlanNodeType.AGGREGATION_NODE;
List<TExpr> aggregateFunctions = Lists.newArrayList();
List<TSortInfo> aggSortInfos = Lists.newArrayList();
@ -289,6 +290,7 @@ public class AggregationNode extends PlanNode {
@Override
public String getNodeExplainString(String detailPrefix, TExplainLevel detailLevel) {
aggInfo.updateMaterializedSlots();
StringBuilder output = new StringBuilder();
String nameDetail = getDisplayLabelDetail();
if (nameDetail != null) {
@ -301,7 +303,7 @@ public class AggregationNode extends PlanNode {
if (aggInfo.getAggregateExprs() != null && aggInfo.getMaterializedAggregateExprs().size() > 0) {
output.append(detailPrefix + "output: ").append(
getExplainString(aggInfo.getAggregateExprs()) + "\n");
getExplainString(aggInfo.getMaterializedAggregateExprs()) + "\n");
}
// TODO: group by can be very long. Break it into multiple lines
output.append(detailPrefix + "group by: ").append(getExplainString(aggInfo.getGroupingExprs()) + "\n");

View File

@ -171,6 +171,7 @@ public class SingleNodePlanner {
PlanNode singleNodePlan = createQueryPlan(queryStmt, analyzer,
ctx.getQueryOptions().getDefaultOrderByLimit());
Preconditions.checkNotNull(singleNodePlan);
analyzer.getDescTbl().materializeIntermediateSlots();
return singleNodePlan;
}