[enhancement](Nereids): refactor eliminating inner join by foreign key (#28816)
This commit is contained in:
@ -29,24 +29,17 @@ import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.ExpressionTrait;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
|
||||
import org.apache.doris.nereids.util.ImmutableEquivalenceSet;
|
||||
|
||||
import com.google.common.collect.BiMap;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableMap.Builder;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
@ -54,7 +47,6 @@ import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nullable;
|
||||
@ -80,7 +72,9 @@ public class EliminateJoinByFK extends DefaultPlanRewriter<JobContext> implement
|
||||
@Override
|
||||
public Plan visit(Plan plan, ForeignKeyContext context) {
|
||||
Plan newPlan = visitChildren(this, plan, context);
|
||||
context.passThroughPlan(plan);
|
||||
// always expire primary key except filter, project and join.
|
||||
// always keep foreign key alive
|
||||
context.expirePrimaryKey(plan);
|
||||
return newPlan;
|
||||
}
|
||||
|
||||
@ -97,35 +91,29 @@ public class EliminateJoinByFK extends DefaultPlanRewriter<JobContext> implement
|
||||
return relation;
|
||||
}
|
||||
|
||||
private boolean canEliminate(LogicalJoin<?, ?> join, BiMap<Slot, Slot> equalSlots, ForeignKeyContext context) {
|
||||
private boolean canEliminate(LogicalJoin<?, ?> join, Map<Slot, Slot> primaryToForeign,
|
||||
ForeignKeyContext context) {
|
||||
if (!join.getOtherJoinConjuncts().isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
if (!join.getJoinType().isInnerJoin() && !join.getJoinType().isSemiJoin()) {
|
||||
return false;
|
||||
}
|
||||
return context.satisfyConstraint(equalSlots, join);
|
||||
return context.satisfyConstraint(primaryToForeign, join);
|
||||
}
|
||||
|
||||
private boolean isForeignKeyAndUnique(Plan plan,
|
||||
Set<Slot> keySet, ForeignKeyContext context) {
|
||||
boolean unique = keySet.stream()
|
||||
.allMatch(s -> plan.getLogicalProperties().getFunctionalDependencies().isUnique(s));
|
||||
return unique && context.isForeignKey(keySet);
|
||||
}
|
||||
|
||||
private @Nullable Map<Expression, Expression> tryGetOutputToChildMap(Plan child,
|
||||
Set<Slot> output, BiMap<Slot, Slot> equalSlots) {
|
||||
Set<Slot> residual = Sets.difference(output, child.getOutputSet());
|
||||
if (equalSlots.keySet().containsAll(residual)) {
|
||||
return residual.stream()
|
||||
.collect(ImmutableMap.toImmutableMap(e -> e, equalSlots::get));
|
||||
private @Nullable Map<Expression, Expression> tryMapOutputToForeignPlan(Plan foreignPlan,
|
||||
Set<Slot> output, Map<Slot, Slot> primaryToForeign) {
|
||||
Set<Slot> residualPrimary = Sets.difference(output, foreignPlan.getOutputSet());
|
||||
ImmutableMap.Builder<Expression, Expression> builder = new ImmutableMap.Builder<>();
|
||||
for (Slot slot : residualPrimary) {
|
||||
if (primaryToForeign.containsKey(slot)) {
|
||||
builder.put(slot, primaryToForeign.get(slot));
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
if (equalSlots.values().containsAll(residual)) {
|
||||
return residual.stream()
|
||||
.collect(ImmutableMap.toImmutableMap(e -> e, e -> equalSlots.inverse().get(e)));
|
||||
}
|
||||
return null;
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
private Plan applyNullCompensationFilter(Plan child, Set<Slot> childSlots) {
|
||||
@ -139,47 +127,56 @@ public class EliminateJoinByFK extends DefaultPlanRewriter<JobContext> implement
|
||||
return new LogicalFilter<>(predicates, child);
|
||||
}
|
||||
|
||||
private @Nullable Plan tryConstructPlanWithJoinChild(LogicalProject<LogicalJoin<?, ?>> project, Plan child,
|
||||
BiMap<Slot, Slot> equalSlots, ForeignKeyContext context) {
|
||||
private Plan tryEliminatePrimaryPlan(LogicalProject<LogicalJoin<?, ?>> project,
|
||||
Plan foreignPlan, Set<Slot> foreignKeys,
|
||||
Map<Slot, Slot> primaryToForeign, ForeignKeyContext context) {
|
||||
Set<Slot> output = project.getInputSlots();
|
||||
Set<Slot> keySet = child.getOutputSet().containsAll(equalSlots.keySet())
|
||||
? equalSlots.keySet()
|
||||
: equalSlots.values();
|
||||
Map<Expression, Expression> outputToRight = tryGetOutputToChildMap(child, output, equalSlots);
|
||||
if (outputToRight != null && isForeignKeyAndUnique(child, keySet, context)) {
|
||||
Map<Expression, Expression> outputToForeign =
|
||||
tryMapOutputToForeignPlan(foreignPlan, output, primaryToForeign);
|
||||
if (outputToForeign != null && canEliminate(project.child(), primaryToForeign, context)) {
|
||||
List<NamedExpression> newProjects = project.getProjects().stream()
|
||||
.map(e -> outputToRight.containsKey(e)
|
||||
? new Alias(e.getExprId(), outputToRight.get(e), e.toSql())
|
||||
: (NamedExpression) e.rewriteUp(s -> outputToRight.getOrDefault(s, s)))
|
||||
.map(e -> outputToForeign.containsKey(e)
|
||||
? new Alias(e.getExprId(), outputToForeign.get(e), e.toSql())
|
||||
: (NamedExpression) e.rewriteUp(s -> outputToForeign.getOrDefault(s, s)))
|
||||
.collect(ImmutableList.toImmutableList());
|
||||
return project.withProjects(newProjects)
|
||||
.withChildren(applyNullCompensationFilter(child, keySet));
|
||||
.withChildren(applyNullCompensationFilter(foreignPlan, foreignKeys));
|
||||
}
|
||||
return null;
|
||||
return project;
|
||||
}
|
||||
|
||||
private @Nullable Map<Slot, Slot> mapPrimaryToForeign(ImmutableEquivalenceSet<Slot> equivalenceSet,
|
||||
Set<Slot> foreignKeys) {
|
||||
ImmutableMap.Builder<Slot, Slot> builder = new ImmutableMap.Builder<>();
|
||||
for (Slot foreignSlot : foreignKeys) {
|
||||
Set<Slot> primarySlots = equivalenceSet.calEqualSet(foreignSlot);
|
||||
if (primarySlots.size() != 1) {
|
||||
return null;
|
||||
}
|
||||
builder.put(primarySlots.iterator().next(), foreignSlot);
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
// Right now we only support eliminate inner join, which should meet the following condition:
|
||||
// 1. only contain null-reject equal condition, and which all meet fk-pk constraint
|
||||
// 2. only output foreign table output or can be converted to foreign table output
|
||||
// 3. foreign key is unique
|
||||
// 4. if foreign key is null, add a isNotNull predicate for null-reject join condition
|
||||
private Plan eliminateJoin(LogicalProject<LogicalJoin<?, ?>> project, ForeignKeyContext context) {
|
||||
LogicalJoin<?, ?> join = project.child();
|
||||
ImmutableEquivalenceSet<Slot> equalSet = join.getEqualSlots();
|
||||
BiMap<Slot, Slot> equalSlots = equalSet.tryToMap();
|
||||
if (equalSlots == null) {
|
||||
return project;
|
||||
}
|
||||
if (!canEliminate(join, equalSlots, context)) {
|
||||
return project;
|
||||
}
|
||||
Plan keepLeft = tryConstructPlanWithJoinChild(project, join.left(), equalSlots, context);
|
||||
if (keepLeft != null) {
|
||||
return keepLeft;
|
||||
}
|
||||
Plan keepRight = tryConstructPlanWithJoinChild(project, join.right(), equalSlots, context);
|
||||
if (keepRight != null) {
|
||||
return keepRight;
|
||||
Set<Slot> leftSlots = Sets.intersection(join.left().getOutputSet(), equalSet.getAllItemSet());
|
||||
Set<Slot> rightSlots = Sets.intersection(join.right().getOutputSet(), equalSet.getAllItemSet());
|
||||
if (context.isForeignKey(leftSlots) && context.isPrimaryKey(rightSlots)) {
|
||||
Map<Slot, Slot> primaryToForeignSlot = mapPrimaryToForeign(equalSet, leftSlots);
|
||||
if (primaryToForeignSlot != null) {
|
||||
return tryEliminatePrimaryPlan(project, join.left(), leftSlots, primaryToForeignSlot, context);
|
||||
}
|
||||
} else if (context.isForeignKey(rightSlots) && context.isPrimaryKey(leftSlots)) {
|
||||
Map<Slot, Slot> primaryToForeignSlot = mapPrimaryToForeign(equalSet, rightSlots);
|
||||
if (primaryToForeignSlot != null) {
|
||||
return tryEliminatePrimaryPlan(project, join.right(), rightSlots, primaryToForeignSlot, context);
|
||||
}
|
||||
}
|
||||
return project;
|
||||
}
|
||||
@ -197,32 +194,29 @@ public class EliminateJoinByFK extends DefaultPlanRewriter<JobContext> implement
|
||||
}
|
||||
return project;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Plan visitLogicalJoin(LogicalJoin<?, ?> join, ForeignKeyContext context) {
|
||||
Plan plan = visitChildren(this, join, context);
|
||||
context.addJoin(join);
|
||||
return plan;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Plan visitLogicalFilter(LogicalFilter<?> filter, ForeignKeyContext context) {
|
||||
Plan plan = visitChildren(this, filter, context);
|
||||
context.addFilter(filter);
|
||||
return plan;
|
||||
}
|
||||
}
|
||||
|
||||
private static class ForeignKeyContext {
|
||||
static Set<Class<?>> propagatePrimaryKeyOperator = ImmutableSet
|
||||
.<Class<?>>builder()
|
||||
.add(LogicalProject.class)
|
||||
.add(LogicalSort.class)
|
||||
.add(LogicalJoin.class)
|
||||
.build();
|
||||
static Set<Class<?>> propagateForeignKeyOperator = ImmutableSet
|
||||
.<Class<?>>builder()
|
||||
.add(LogicalProject.class)
|
||||
.add(LogicalSort.class)
|
||||
.add(LogicalJoin.class)
|
||||
.add(LogicalFilter.class)
|
||||
.add(LogicalTopN.class)
|
||||
.add(LogicalLimit.class)
|
||||
.add(LogicalAggregate.class)
|
||||
.add(LogicalWindow.class)
|
||||
.build();
|
||||
Set<Map<Column, Column>> constraints = new HashSet<>();
|
||||
Set<Column> foreignKeys = new HashSet<>();
|
||||
Set<Column> primaryKeys = new HashSet<>();
|
||||
Map<Slot, Column> slotToColumn = new HashMap<>();
|
||||
Map<Column, Set<LogicalJoin<?, ?>>> columnWithJoin = new HashMap<>();
|
||||
Map<Column, Set<Expression>> columnWithPredicates = new HashMap<>();
|
||||
Map<Slot, Set<LogicalJoin<?, ?>>> slotWithJoin = new HashMap<>();
|
||||
Map<Slot, Set<Expression>> slotWithPredicates = new HashMap<>();
|
||||
|
||||
public void putAllForeignKeys(TableIf table) {
|
||||
table.getForeignKeyConstraints().forEach(c -> {
|
||||
@ -238,6 +232,11 @@ public class EliminateJoinByFK extends DefaultPlanRewriter<JobContext> implement
|
||||
key.stream().map(s -> slotToColumn.get(s)).collect(Collectors.toSet()));
|
||||
}
|
||||
|
||||
public boolean isPrimaryKey(Set<Slot> key) {
|
||||
return primaryKeys.containsAll(
|
||||
key.stream().map(s -> slotToColumn.get(s)).collect(Collectors.toSet()));
|
||||
}
|
||||
|
||||
public void putSlot(SlotReference slot) {
|
||||
if (!slot.getColumn().isPresent()) {
|
||||
return;
|
||||
@ -252,81 +251,62 @@ public class EliminateJoinByFK extends DefaultPlanRewriter<JobContext> implement
|
||||
}
|
||||
}
|
||||
|
||||
public void passThroughPlan(Plan plan) {
|
||||
Set<Column> output = plan.getOutput().stream()
|
||||
public void addFilter(LogicalFilter<?> filter) {
|
||||
filter.getOutput().stream()
|
||||
.filter(slotToColumn::containsKey)
|
||||
.map(s -> slotToColumn.get(s))
|
||||
.collect(ImmutableSet.toImmutableSet());
|
||||
if (plan instanceof LogicalJoin) {
|
||||
output.forEach(c ->
|
||||
columnWithJoin.computeIfAbsent(c, v -> Sets.newHashSet((LogicalJoin<?, ?>) plan)));
|
||||
return;
|
||||
}
|
||||
if (plan instanceof LogicalFilter) {
|
||||
output.forEach(c -> {
|
||||
columnWithPredicates.computeIfAbsent(c, v -> new HashSet<>());
|
||||
columnWithPredicates.get(c).addAll(((LogicalFilter<?>) plan).getConjuncts());
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (!propagatePrimaryKeyOperator.contains(plan.getClass())) {
|
||||
output.forEach(primaryKeys::remove);
|
||||
}
|
||||
if (!propagateForeignKeyOperator.contains(plan.getClass())) {
|
||||
output.forEach(foreignKeys::remove);
|
||||
}
|
||||
.forEach(slot -> {
|
||||
slotWithPredicates.computeIfAbsent(slot, v -> new HashSet<>());
|
||||
slotWithPredicates.get(slot).addAll(filter.getConjuncts());
|
||||
});
|
||||
}
|
||||
|
||||
public boolean satisfyConstraint(BiMap<Slot, Slot> equalSlots, LogicalJoin<?, ?> join) {
|
||||
ImmutableMap.Builder<Column, Column> foreignToPrimaryBuilder = new Builder<>();
|
||||
for (Entry<Slot, Slot> entry : equalSlots.entrySet()) {
|
||||
Slot left = entry.getKey();
|
||||
Slot right = entry.getValue();
|
||||
if (!slotToColumn.containsKey(left) || !slotToColumn.containsKey(right)) {
|
||||
return false;
|
||||
}
|
||||
Column leftColumn = slotToColumn.get(left);
|
||||
Column rightColumn = slotToColumn.get(right);
|
||||
if (foreignKeys.contains(leftColumn)) {
|
||||
foreignToPrimaryBuilder.put(leftColumn, rightColumn);
|
||||
} else if (foreignKeys.contains(rightColumn)) {
|
||||
foreignToPrimaryBuilder.put(rightColumn, leftColumn);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Map<Column, Column> foreignToPrimary = foreignToPrimaryBuilder.build();
|
||||
public void addJoin(LogicalJoin<?, ?> join) {
|
||||
join.getOutput().stream()
|
||||
.filter(slotToColumn::containsKey)
|
||||
.forEach(slot ->
|
||||
slotWithJoin.computeIfAbsent(slot, v -> Sets.newHashSet((join))));
|
||||
}
|
||||
|
||||
public void expirePrimaryKey(Plan plan) {
|
||||
plan.getOutput().stream()
|
||||
.filter(slotToColumn::containsKey)
|
||||
.map(s -> slotToColumn.get(s))
|
||||
.forEach(primaryKeys::remove);
|
||||
}
|
||||
|
||||
public boolean satisfyConstraint(Map<Slot, Slot> primaryToForeign, LogicalJoin<?, ?> join) {
|
||||
Map<Column, Column> foreignToPrimary = primaryToForeign.entrySet().stream()
|
||||
.collect(ImmutableMap.toImmutableMap(
|
||||
e -> slotToColumn.get(e.getValue()),
|
||||
e -> slotToColumn.get(e.getKey())));
|
||||
// The primary key can only contain join that may be eliminated
|
||||
if (!foreignToPrimary.values().stream().allMatch(p ->
|
||||
columnWithJoin.get(p).size() == 1 && columnWithJoin.get(p).iterator().next() == join)) {
|
||||
if (!primaryToForeign.keySet().stream().allMatch(p ->
|
||||
slotWithJoin.get(p).size() == 1 && slotWithJoin.get(p).iterator().next() == join)) {
|
||||
return false;
|
||||
}
|
||||
// The foreign key's filters must contain primary filters
|
||||
if (!isPredicateCompatible(equalSlots, foreignToPrimary)) {
|
||||
if (!isPredicateCompatible(primaryToForeign)) {
|
||||
return false;
|
||||
}
|
||||
return constraints.contains(foreignToPrimary);
|
||||
}
|
||||
|
||||
private boolean isPredicateCompatible(BiMap<Slot, Slot> equalSlots, Map<Column, Column> foreignToPrimary) {
|
||||
return foreignToPrimary.entrySet().stream().allMatch(fp -> {
|
||||
BiMap<Slot, Slot> primarySlotToForeign = equalSlots.keySet().stream()
|
||||
.map(slotToColumn::get)
|
||||
.anyMatch(primaryKeys::contains)
|
||||
? equalSlots :
|
||||
equalSlots.inverse();
|
||||
if (!columnWithPredicates.containsKey(fp.getValue())) {
|
||||
// When predicates of foreign keys is a subset of that of primary keys
|
||||
private boolean isPredicateCompatible(Map<Slot, Slot> primaryToForeign) {
|
||||
return primaryToForeign.entrySet().stream().allMatch(pf -> {
|
||||
// There is no predicate in primary key
|
||||
if (!slotWithPredicates.containsKey(pf.getKey()) || slotWithPredicates.get(pf.getKey()).isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
Set<Expression> primaryPredicates = columnWithPredicates.get(fp.getValue()).stream()
|
||||
.map(e -> e.rewriteUp(
|
||||
s -> s instanceof Slot ? primarySlotToForeign.getOrDefault(s, (Slot) s) : s))
|
||||
.collect(Collectors.toSet());
|
||||
if (columnWithPredicates.get(fp.getKey()) == null && !columnWithPredicates.isEmpty()) {
|
||||
// There are some predicates in primary key but there is no predicate in foreign key
|
||||
if (slotWithPredicates.containsKey(pf.getValue()) && slotWithPredicates.get(pf.getValue()).isEmpty()) {
|
||||
return false;
|
||||
} else {
|
||||
return columnWithPredicates.get(fp.getKey()).containsAll(primaryPredicates);
|
||||
}
|
||||
Set<Expression> primaryPredicates = slotWithPredicates.get(pf.getKey()).stream()
|
||||
.map(e -> e.rewriteUp(
|
||||
s -> s instanceof Slot ? primaryToForeign.getOrDefault(s, (Slot) s) : s))
|
||||
.collect(Collectors.toSet());
|
||||
return slotWithPredicates.get(pf.getValue()).containsAll(primaryPredicates);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,15 +17,12 @@
|
||||
|
||||
package org.apache.doris.nereids.util;
|
||||
|
||||
import com.google.common.collect.BiMap;
|
||||
import com.google.common.collect.ImmutableBiMap;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* EquivalenceSet
|
||||
@ -77,25 +74,16 @@ public class ImmutableEquivalenceSet<T> {
|
||||
}
|
||||
|
||||
/**
|
||||
* cal equal set for a
|
||||
* cal equal set for a except self
|
||||
*/
|
||||
public Set<T> calEqualSet(T a) {
|
||||
T ra = root.get(a);
|
||||
return root.keySet().stream()
|
||||
.filter(t -> root.get(t).equals(ra))
|
||||
.filter(t -> root.get(t).equals(ra) && !t.equals(a))
|
||||
.collect(ImmutableSet.toImmutableSet());
|
||||
}
|
||||
|
||||
/**
|
||||
* try to convert it to a map, such as a = b c = d.
|
||||
* When meets a = b a = c, return null
|
||||
*/
|
||||
public @Nullable BiMap<T, T> tryToMap() {
|
||||
if (root.values().stream().distinct().count() * 2 != root.size()) {
|
||||
return null;
|
||||
}
|
||||
return root.keySet().stream()
|
||||
.filter(t -> !root.get(t).equals(t))
|
||||
.collect(ImmutableBiMap.toImmutableBiMap(t -> t, root::get));
|
||||
public Set<T> getAllItemSet() {
|
||||
return ImmutableSet.copyOf(root.keySet());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user