[fix](Nereids) producer to consumer should be multimap in cte (#39850) (#39867)

This commit is contained in:
morrySnow
2024-08-23 23:25:11 +08:00
committed by GitHub
parent 508c7a7040
commit 9d597bde68
10 changed files with 100 additions and 64 deletions

View File

@ -71,6 +71,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -617,8 +618,8 @@ public class CascadesContext implements ScheduleContext {
return this.statementContext.getConsumerIdToFilters();
}
public void addCTEConsumerGroup(CTEId cteId, Group g, Map<Slot, Slot> producerSlotToConsumerSlot) {
List<Pair<Map<Slot, Slot>, Group>> consumerGroups =
public void addCTEConsumerGroup(CTEId cteId, Group g, Multimap<Slot, Slot> producerSlotToConsumerSlot) {
List<Pair<Multimap<Slot, Slot>, Group>> consumerGroups =
this.statementContext.getCteIdToConsumerGroup().computeIfAbsent(cteId, k -> new ArrayList<>());
consumerGroups.add(Pair.of(producerSlotToConsumerSlot, g));
}
@ -627,12 +628,18 @@ public class CascadesContext implements ScheduleContext {
* Update CTE consumer group as producer's stats update
*/
public void updateConsumerStats(CTEId cteId, Statistics statistics) {
List<Pair<Map<Slot, Slot>, Group>> consumerGroups = this.statementContext.getCteIdToConsumerGroup().get(cteId);
for (Pair<Map<Slot, Slot>, Group> p : consumerGroups) {
Map<Slot, Slot> producerSlotToConsumerSlot = p.first;
List<Pair<Multimap<Slot, Slot>, Group>> consumerGroups
= this.statementContext.getCteIdToConsumerGroup().get(cteId);
for (Pair<Multimap<Slot, Slot>, Group> p : consumerGroups) {
Multimap<Slot, Slot> producerSlotToConsumerSlot = p.first;
Statistics updatedConsumerStats = new StatisticsBuilder(statistics).build();
for (Entry<Expression, ColumnStatistic> entry : statistics.columnStatistics().entrySet()) {
updatedConsumerStats.addColumnStats(producerSlotToConsumerSlot.get(entry.getKey()), entry.getValue());
if (!(entry.getKey() instanceof Slot)) {
continue;
}
for (Slot consumer : producerSlotToConsumerSlot.get((Slot) entry.getKey())) {
updatedConsumerStats.addColumnStats(consumer, entry.getValue());
}
}
p.value().setStatistics(updatedConsumerStats);
}

View File

@ -53,6 +53,7 @@ import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -114,7 +115,7 @@ public class StatementContext implements Closeable {
private final Map<CTEId, Set<Slot>> cteIdToOutputIds = new HashMap<>();
private final Map<RelationId, Set<Expression>> consumerIdToFilters = new HashMap<>();
// Used to update consumer's stats
private final Map<CTEId, List<Pair<Map<Slot, Slot>, Group>>> cteIdToConsumerGroup = new HashMap<>();
private final Map<CTEId, List<Pair<Multimap<Slot, Slot>, Group>>> cteIdToConsumerGroup = new HashMap<>();
private final Map<CTEId, LogicalPlan> rewrittenCteProducer = new HashMap<>();
private final Map<CTEId, LogicalPlan> rewrittenCteConsumer = new HashMap<>();
private final Set<String> viewDdlSqlSet = Sets.newHashSet();
@ -354,7 +355,7 @@ public class StatementContext implements Closeable {
return idToPlaceholderRealExpr;
}
public Map<CTEId, List<Pair<Map<Slot, Slot>, Group>>> getCteIdToConsumerGroup() {
public Map<CTEId, List<Pair<Multimap<Slot, Slot>, Group>>> getCteIdToConsumerGroup() {
return cteIdToConsumerGroup;
}

View File

@ -1173,12 +1173,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
for (Slot producerSlot : cteProducer.getOutput()) {
SlotRef slotRef = context.findSlotRef(producerSlot.getExprId());
tupleDescriptor = slotRef.getDesc().getParent();
Slot consumerSlot = cteConsumer.getProducerToConsumerSlotMap().get(producerSlot);
// consumerSlot could be null if we prune partial consumers' columns
if (consumerSlot == null) {
continue;
for (Slot consumerSlot : cteConsumer.getProducerToConsumerSlotMap().get(producerSlot)) {
context.addExprIdSlotRefPair(consumerSlot.getExprId(), slotRef);
}
context.addExprIdSlotRefPair(consumerSlot.getExprId(), slotRef);
}
CTEScanNode cteScanNode = new CTEScanNode(tupleDescriptor);
context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator ->

View File

@ -51,8 +51,10 @@ import org.apache.doris.nereids.util.ExpressionUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import java.util.LinkedHashMap;
import java.util.List;
@ -261,14 +263,15 @@ public class AdjustNullable extends DefaultPlanRewriter<Map<ExprId, Slot>> imple
@Override
public Plan visitLogicalCTEConsumer(LogicalCTEConsumer cteConsumer, Map<ExprId, Slot> replaceMap) {
Map<Slot, Slot> consumerToProducerOutputMap = new LinkedHashMap<>();
Map<Slot, Slot> producerToConsumerOutputMap = new LinkedHashMap<>();
Multimap<Slot, Slot> producerToConsumerOutputMap = LinkedHashMultimap.create();
for (Slot producerOutputSlot : cteConsumer.getConsumerToProducerOutputMap().values()) {
Slot newProducerOutputSlot = updateExpression(producerOutputSlot, replaceMap);
Slot newConsumerOutputSlot = cteConsumer.getProducerToConsumerOutputMap().get(producerOutputSlot)
.withNullable(newProducerOutputSlot.nullable());
producerToConsumerOutputMap.put(newProducerOutputSlot, newConsumerOutputSlot);
consumerToProducerOutputMap.put(newConsumerOutputSlot, newProducerOutputSlot);
replaceMap.put(newConsumerOutputSlot.getExprId(), newConsumerOutputSlot);
for (Slot consumerOutputSlot : cteConsumer.getProducerToConsumerOutputMap().get(producerOutputSlot)) {
Slot newConsumerOutputSlot = consumerOutputSlot.withNullable(newProducerOutputSlot.nullable());
producerToConsumerOutputMap.put(newProducerOutputSlot, newConsumerOutputSlot);
consumerToProducerOutputMap.put(newConsumerOutputSlot, newProducerOutputSlot);
replaceMap.put(newConsumerOutputSlot.getExprId(), newConsumerOutputSlot);
}
}
return cteConsumer.withTwoMaps(consumerToProducerOutputMap, producerToConsumerOutputMap);
}

View File

@ -208,11 +208,11 @@ public class OrExpansion extends DefaultPlanRewriter<OrExpandsionContext> implem
private Map<Slot, Slot> constructReplaceMap(LogicalCTEConsumer leftConsumer, Map<Slot, Slot> leftCloneToLeft,
LogicalCTEConsumer rightConsumer, Map<Slot, Slot> rightCloneToRight) {
Map<Slot, Slot> replaced = new HashMap<>();
for (Entry<Slot, Slot> entry : leftConsumer.getProducerToConsumerOutputMap().entrySet()) {
replaced.put(leftCloneToLeft.get(entry.getKey()), entry.getValue());
for (Entry<Slot, Slot> entry : leftConsumer.getConsumerToProducerOutputMap().entrySet()) {
replaced.put(leftCloneToLeft.get(entry.getValue()), entry.getKey());
}
for (Entry<Slot, Slot> entry : rightConsumer.getProducerToConsumerOutputMap().entrySet()) {
replaced.put(rightCloneToRight.get(entry.getKey()), entry.getValue());
for (Entry<Slot, Slot> entry : rightConsumer.getConsumerToProducerOutputMap().entrySet()) {
replaced.put(rightCloneToRight.get(entry.getValue()), entry.getKey());
}
return replaced;
}

View File

@ -59,8 +59,10 @@ import org.apache.doris.nereids.util.ExpressionUtils;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.util.Collections;
@ -394,7 +396,7 @@ public class VariantSubPathPruning extends DefaultPlanRewriter<PruneContext> imp
return cteConsumer;
}
Map<Slot, Slot> consumerToProducerOutputMap = Maps.newHashMap();
Map<Slot, Slot> producerToConsumerOutputMap = Maps.newHashMap();
Multimap<Slot, Slot> producerToConsumerOutputMap = LinkedHashMultimap.create();
Map<Slot, Map<List<String>, SlotReference>> oriSlotToSubPathToSlot = Maps.newHashMap();
for (Map.Entry<Slot, Slot> consumerToProducer : cteConsumer.getConsumerToProducerOutputMap().entrySet()) {
Slot consumer = consumerToProducer.getKey();

View File

@ -63,6 +63,8 @@ import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import java.util.LinkedHashMap;
import java.util.List;
@ -397,7 +399,7 @@ public class LogicalPlanDeepCopier extends DefaultPlanRewriter<DeepCopierContext
return context.getRelationReplaceMap().get(cteConsumer.getRelationId());
}
Map<Slot, Slot> consumerToProducerOutputMap = new LinkedHashMap<>();
Map<Slot, Slot> producerToConsumerOutputMap = new LinkedHashMap<>();
Multimap<Slot, Slot> producerToConsumerOutputMap = LinkedHashMultimap.create();
for (Slot consumerOutput : cteConsumer.getOutput()) {
Slot newOutput = (Slot) ExpressionDeepCopier.INSTANCE.deepCopy(consumerOutput, context);
consumerToProducerOutputMap.put(newOutput, cteConsumer.getProducerSlot(consumerOutput));

View File

@ -33,6 +33,10 @@ import org.apache.doris.nereids.util.Utils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import java.util.LinkedHashMap;
import java.util.List;
@ -50,20 +54,15 @@ public class LogicalCTEConsumer extends LogicalRelation implements BlockFuncDeps
private final String name;
private final CTEId cteId;
private final Map<Slot, Slot> consumerToProducerOutputMap;
private final Map<Slot, Slot> producerToConsumerOutputMap;
private final Multimap<Slot, Slot> producerToConsumerOutputMap;
/**
* Logical CTE consumer.
*/
public LogicalCTEConsumer(RelationId relationId, CTEId cteId, String name,
Map<Slot, Slot> consumerToProducerOutputMap, Map<Slot, Slot> producerToConsumerOutputMap) {
super(relationId, PlanType.LOGICAL_CTE_CONSUMER, Optional.empty(), Optional.empty());
this.cteId = Objects.requireNonNull(cteId, "cteId should not null");
this.name = Objects.requireNonNull(name, "name should not null");
this.consumerToProducerOutputMap = Objects.requireNonNull(consumerToProducerOutputMap,
"consumerToProducerOutputMap should not null");
this.producerToConsumerOutputMap = Objects.requireNonNull(producerToConsumerOutputMap,
"producerToConsumerOutputMap should not null");
Map<Slot, Slot> consumerToProducerOutputMap, Multimap<Slot, Slot> producerToConsumerOutputMap) {
this(relationId, cteId, name, consumerToProducerOutputMap, producerToConsumerOutputMap,
Optional.empty(), Optional.empty());
}
/**
@ -73,24 +72,31 @@ public class LogicalCTEConsumer extends LogicalRelation implements BlockFuncDeps
super(relationId, PlanType.LOGICAL_CTE_CONSUMER, Optional.empty(), Optional.empty());
this.cteId = Objects.requireNonNull(cteId, "cteId should not null");
this.name = Objects.requireNonNull(name, "name should not null");
this.consumerToProducerOutputMap = new LinkedHashMap<>();
this.producerToConsumerOutputMap = new LinkedHashMap<>();
initOutputMaps(producerPlan);
ImmutableMap.Builder<Slot, Slot> cToPBuilder = ImmutableMap.builder();
ImmutableMultimap.Builder<Slot, Slot> pToCBuilder = ImmutableMultimap.builder();
List<Slot> producerOutput = producerPlan.getOutput();
for (Slot producerOutputSlot : producerOutput) {
Slot consumerSlot = generateConsumerSlot(this.name, producerOutputSlot);
cToPBuilder.put(consumerSlot, producerOutputSlot);
pToCBuilder.put(producerOutputSlot, consumerSlot);
}
consumerToProducerOutputMap = cToPBuilder.build();
producerToConsumerOutputMap = pToCBuilder.build();
}
/**
* Logical CTE consumer.
*/
public LogicalCTEConsumer(RelationId relationId, CTEId cteId, String name,
Map<Slot, Slot> consumerToProducerOutputMap, Map<Slot, Slot> producerToConsumerOutputMap,
Map<Slot, Slot> consumerToProducerOutputMap, Multimap<Slot, Slot> producerToConsumerOutputMap,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties) {
super(relationId, PlanType.LOGICAL_CTE_CONSUMER, groupExpression, logicalProperties);
this.cteId = Objects.requireNonNull(cteId, "cteId should not null");
this.name = Objects.requireNonNull(name, "name should not null");
this.consumerToProducerOutputMap = Objects.requireNonNull(consumerToProducerOutputMap,
"consumerToProducerOutputMap should not null");
this.producerToConsumerOutputMap = Objects.requireNonNull(producerToConsumerOutputMap,
"producerToConsumerOutputMap should not null");
this.consumerToProducerOutputMap = ImmutableMap.copyOf(Objects.requireNonNull(consumerToProducerOutputMap,
"consumerToProducerOutputMap should not null"));
this.producerToConsumerOutputMap = ImmutableMultimap.copyOf(Objects.requireNonNull(producerToConsumerOutputMap,
"producerToConsumerOutputMap should not null"));
}
/**
@ -107,20 +113,11 @@ public class LogicalCTEConsumer extends LogicalRelation implements BlockFuncDeps
slotRef != null ? Optional.of(slotRef.getInternalName()) : Optional.empty());
}
private void initOutputMaps(LogicalPlan childPlan) {
List<Slot> producerOutput = childPlan.getOutput();
for (Slot producerOutputSlot : producerOutput) {
Slot consumerSlot = generateConsumerSlot(this.name, producerOutputSlot);
producerToConsumerOutputMap.put(producerOutputSlot, consumerSlot);
consumerToProducerOutputMap.put(consumerSlot, producerOutputSlot);
}
}
public Map<Slot, Slot> getConsumerToProducerOutputMap() {
return consumerToProducerOutputMap;
}
public Map<Slot, Slot> getProducerToConsumerOutputMap() {
public Multimap<Slot, Slot> getProducerToConsumerOutputMap() {
return producerToConsumerOutputMap;
}
@ -129,7 +126,8 @@ public class LogicalCTEConsumer extends LogicalRelation implements BlockFuncDeps
return visitor.visitLogicalCTEConsumer(this, context);
}
public Plan withTwoMaps(Map<Slot, Slot> consumerToProducerOutputMap, Map<Slot, Slot> producerToConsumerOutputMap) {
public Plan withTwoMaps(Map<Slot, Slot> consumerToProducerOutputMap,
Multimap<Slot, Slot> producerToConsumerOutputMap) {
return new LogicalCTEConsumer(relationId, cteId, name,
consumerToProducerOutputMap, producerToConsumerOutputMap);
}
@ -162,7 +160,8 @@ public class LogicalCTEConsumer extends LogicalRelation implements BlockFuncDeps
@Override
public Plan pruneOutputs(List<NamedExpression> prunedOutputs) {
Map<Slot, Slot> consumerToProducerOutputMap = new LinkedHashMap<>(this.consumerToProducerOutputMap.size());
Map<Slot, Slot> producerToConsumerOutputMap = new LinkedHashMap<>(this.consumerToProducerOutputMap.size());
Multimap<Slot, Slot> producerToConsumerOutputMap = LinkedHashMultimap.create(
this.consumerToProducerOutputMap.size(), this.consumerToProducerOutputMap.size());
for (Entry<Slot, Slot> consumerToProducerSlot : this.consumerToProducerOutputMap.entrySet()) {
if (prunedOutputs.contains(consumerToProducerSlot.getKey())) {
consumerToProducerOutputMap.put(consumerToProducerSlot.getKey(), consumerToProducerSlot.getValue());

View File

@ -31,6 +31,8 @@ import org.apache.doris.statistics.Statistics;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import java.util.List;
import java.util.Map;
@ -43,14 +45,14 @@ import java.util.Optional;
public class PhysicalCTEConsumer extends PhysicalRelation {
private final CTEId cteId;
private final Map<Slot, Slot> producerToConsumerSlotMap;
private final Multimap<Slot, Slot> producerToConsumerSlotMap;
private final Map<Slot, Slot> consumerToProducerSlotMap;
/**
* Constructor
*/
public PhysicalCTEConsumer(RelationId relationId, CTEId cteId, Map<Slot, Slot> consumerToProducerSlotMap,
Map<Slot, Slot> producerToConsumerSlotMap, LogicalProperties logicalProperties) {
Multimap<Slot, Slot> producerToConsumerSlotMap, LogicalProperties logicalProperties) {
this(relationId, cteId, consumerToProducerSlotMap, producerToConsumerSlotMap,
Optional.empty(), logicalProperties);
}
@ -59,7 +61,7 @@ public class PhysicalCTEConsumer extends PhysicalRelation {
* Constructor
*/
public PhysicalCTEConsumer(RelationId relationId, CTEId cteId,
Map<Slot, Slot> consumerToProducerSlotMap, Map<Slot, Slot> producerToConsumerSlotMap,
Map<Slot, Slot> consumerToProducerSlotMap, Multimap<Slot, Slot> producerToConsumerSlotMap,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties) {
this(relationId, cteId, consumerToProducerSlotMap, producerToConsumerSlotMap,
groupExpression, logicalProperties, null, null);
@ -69,14 +71,14 @@ public class PhysicalCTEConsumer extends PhysicalRelation {
* Constructor
*/
public PhysicalCTEConsumer(RelationId relationId, CTEId cteId, Map<Slot, Slot> consumerToProducerSlotMap,
Map<Slot, Slot> producerToConsumerSlotMap, Optional<GroupExpression> groupExpression,
Multimap<Slot, Slot> producerToConsumerSlotMap, Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics) {
super(relationId, PlanType.PHYSICAL_CTE_CONSUMER, groupExpression,
logicalProperties, physicalProperties, statistics);
this.cteId = cteId;
this.consumerToProducerSlotMap = ImmutableMap.copyOf(Objects.requireNonNull(
consumerToProducerSlotMap, "consumerToProducerSlotMap should not null"));
this.producerToConsumerSlotMap = ImmutableMap.copyOf(Objects.requireNonNull(
this.producerToConsumerSlotMap = ImmutableMultimap.copyOf(Objects.requireNonNull(
producerToConsumerSlotMap, "consumerToProducerSlotMap should not null"));
}
@ -84,7 +86,7 @@ public class PhysicalCTEConsumer extends PhysicalRelation {
return cteId;
}
public Map<Slot, Slot> getProducerToConsumerSlotMap() {
public Multimap<Slot, Slot> getProducerToConsumerSlotMap() {
return producerToConsumerSlotMap;
}
@ -99,8 +101,7 @@ public class PhysicalCTEConsumer extends PhysicalRelation {
public String toString() {
StringBuilder builder = new StringBuilder();
if (!getAppliedRuntimeFilters().isEmpty()) {
getAppliedRuntimeFilters()
.stream().forEach(rf -> builder.append(" RF").append(rf.getId().asInt()));
getAppliedRuntimeFilters().forEach(rf -> builder.append(" RF").append(rf.getId().asInt()));
}
return Utils.toSqlString("PhysicalCTEConsumer[" + id.asInt() + "]",
"stats", getStats(), "cteId", cteId, "RFs", builder, "map", consumerToProducerSlotMap);
@ -141,8 +142,7 @@ public class PhysicalCTEConsumer extends PhysicalRelation {
"cteId", cteId));
if (!getAppliedRuntimeFilters().isEmpty()) {
shapeBuilder.append(" apply RFs:");
getAppliedRuntimeFilters()
.stream().forEach(rf -> shapeBuilder.append(" RF").append(rf.getId().asInt()));
getAppliedRuntimeFilters().forEach(rf -> shapeBuilder.append(" RF").append(rf.getId().asInt()));
}
return shapeBuilder.toString();
}