[runtimefilter](nereids)push down RF into cte producer (#30568)

* push down Rf into CTE
This commit is contained in:
minghong
2024-02-02 14:42:56 +08:00
committed by yiguolei
parent 9889683ae3
commit e5bdc369e2
174 changed files with 691 additions and 668 deletions

View File

@ -67,6 +67,7 @@ public class PlanPostProcessors {
builder.add(new FragmentProcessor());
if (!cascadesContext.getConnectContext().getSessionVariable().getRuntimeFilterMode()
.toUpperCase().equals(TRuntimeFilterMode.OFF.name())) {
builder.add(new RegisterParent());
builder.add(new RuntimeFilterGenerator());
if (ConnectContext.get().getSessionVariable().enableRuntimeFilterPrune) {
builder.add(new RuntimeFilterPruner());

View File

@ -0,0 +1,36 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.processor.post;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.util.MutableState;
/**
* set parent for the tree nodes
*/
public class RegisterParent extends PlanPostProcessor {
@Override
public Plan visit(Plan plan, CascadesContext context) {
for (Plan child : plan.children()) {
child.setMutableState(MutableState.KEY_PARENT, plan);
child.accept(this, context);
}
return plan;
}
}

View File

@ -18,11 +18,9 @@
package org.apache.doris.nereids.processor.post;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.trees.expressions.CTEId;
import org.apache.doris.nereids.trees.expressions.EqualPredicate;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
@ -37,7 +35,6 @@ import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TRuntimeFilterType;
@ -97,10 +94,6 @@ public class RuntimeFilterContext {
public List<RuntimeFilter> prunedRF = Lists.newArrayList();
public final List<Plan> needRfPlans = Lists.newArrayList();
private final IdGenerator<RuntimeFilterId> generator = RuntimeFilterId.createGenerator();
// exprId of target to runtime filter.
private final Map<ExprId, List<RuntimeFilter>> targetExprIdToFilter = Maps.newHashMap();
@ -128,20 +121,11 @@ public class RuntimeFilterContext {
private final Map<Plan, EffectiveSrcType> effectiveSrcNodes = Maps.newHashMap();
// cte to related joins map which can extract common runtime filter to cte inside
private final Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = Maps.newLinkedHashMap();
// cte candidates which can be pushed into common runtime filter into from outside
private final Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = Maps.newLinkedHashMap();
private final Map<CTEId, PhysicalCTEProducer> cteProducerMap = Maps.newLinkedHashMap();
// cte whose runtime filter has been extracted
private final Set<CTEId> processedCTE = Sets.newHashSet();
// cte whose outer runtime filter has been pushed down into
private final Set<CTEId> pushedDownCTE = Sets.newHashSet();
private final SessionVariable sessionVariable;
private final FilterSizeLimits limits;
@ -189,22 +173,10 @@ public class RuntimeFilterContext {
return cteProducerMap;
}
public Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> getCteRFPushDownMap() {
return cteRFPushDownMap;
}
public Map<CTEId, Set<PhysicalHashJoin>> getCteToJoinsMap() {
return cteToJoinsMap;
}
public Set<CTEId> getProcessedCTE() {
return processedCTE;
}
public Set<CTEId> getPushedDownCTE() {
return pushedDownCTE;
}
public void setTargetExprIdToFilter(ExprId id, RuntimeFilter filter) {
Preconditions.checkArgument(filter.getTargetSlots().stream().anyMatch(expr -> expr.getExprId() == id));
this.targetExprIdToFilter.computeIfAbsent(id, k -> Lists.newArrayList()).add(filter);
@ -230,14 +202,6 @@ public class RuntimeFilterContext {
rf.getTargetScans().get(i).removeAppliedRuntimeFilter(rf);
}
}
// for (Slot target : rf.getTargetSlots()) {
// if (target.getExprId().equals(targetId)) {
// Pair<PhysicalRelation, Slot> pair = aliasTransferMap.get(target);
// if (pair != null) {
// pair.first.removeAppliedRuntimeFilter(rf);
// }
// }
// }
iter.remove();
prunedRF.add(rf);
}

View File

@ -39,7 +39,6 @@ import org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContain
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
@ -65,13 +64,13 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -99,6 +98,137 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
private final IdGenerator<RuntimeFilterId> generator = RuntimeFilterId.createGenerator();
@Override
public Plan processRoot(Plan plan, CascadesContext ctx) {
Plan result = plan.accept(this, ctx);
// cte rf
RuntimeFilterContext rfCtx = ctx.getRuntimeFilterContext();
int cteCount = rfCtx.getProcessedCTE().size();
if (cteCount != 0) {
Map<CTEId, Set<PhysicalCTEConsumer>> cteIdToConsumersWithRF = Maps.newHashMap();
Map<CTEId, List<RuntimeFilter>> cteToRFsMap = Maps.newHashMap();
Map<PhysicalCTEConsumer, Set<RuntimeFilter>> consumerToRFs = Maps.newHashMap();
Map<PhysicalCTEConsumer, Set<Expression>> consumerToSrcExpression = Maps.newHashMap();
List<RuntimeFilter> allRFs = rfCtx.getNereidsRuntimeFilter();
for (RuntimeFilter rf : allRFs) {
for (PhysicalRelation rel : rf.getTargetScans()) {
if (rel instanceof PhysicalCTEConsumer) {
PhysicalCTEConsumer consumer = (PhysicalCTEConsumer) rel;
CTEId cteId = consumer.getCteId();
cteToRFsMap.computeIfAbsent(cteId, key -> Lists.newArrayList()).add(rf);
cteIdToConsumersWithRF.computeIfAbsent(cteId, key -> Sets.newHashSet()).add(consumer);
consumerToRFs.computeIfAbsent(consumer, key -> Sets.newHashSet()).add(rf);
consumerToSrcExpression.computeIfAbsent(consumer, key -> Sets.newHashSet())
.add(rf.getSrcExpr());
}
}
}
for (CTEId cteId : rfCtx.getCteProduceMap().keySet()) {
// if any consumer does not have RF, RF cannot be pushed down.
// cteIdToConsumersWithRF.get(cteId).size() can not be 1, o.w. this cte will be inlined.
if (cteIdToConsumersWithRF.get(cteId) != null
&& ctx.getCteIdToConsumers().get(cteId).size() == cteIdToConsumersWithRF.get(cteId).size()
&& cteIdToConsumersWithRF.get(cteId).size() >= 2) {
// check if there is a common srcExpr among all the consumers
Set<PhysicalCTEConsumer> consumers = cteIdToConsumersWithRF.get(cteId);
PhysicalCTEConsumer consumer0 = consumers.iterator().next();
Set<Expression> candidateSrcExpressions = consumerToSrcExpression.get(consumer0);
for (PhysicalCTEConsumer currentConsumer : consumers) {
Set<Expression> srcExpressionsOnCurrentConsumer = consumerToSrcExpression.get(currentConsumer);
candidateSrcExpressions.retainAll(srcExpressionsOnCurrentConsumer);
if (candidateSrcExpressions.isEmpty()) {
break;
}
}
if (!candidateSrcExpressions.isEmpty()) {
// find RFs to push down
for (Expression srcExpr : candidateSrcExpressions) {
List<RuntimeFilter> rfsToPushDown = Lists.newArrayList();
for (PhysicalCTEConsumer consumer : cteIdToConsumersWithRF.get(cteId)) {
for (RuntimeFilter rf : consumerToRFs.get(consumer)) {
if (rf.getSrcExpr().equals(srcExpr)) {
rfsToPushDown.add(rf);
}
}
}
if (rfsToPushDown.isEmpty()) {
break;
}
// the most right deep buildNode from rfsToPushDown is used as buildNode for pushDown rf
// since the srcExpr are the same, all buildNodes of rfToPushDown are in the same tree path
// the longest ancestors means its corresponding rf build node is the most right deep one.
RuntimeFilter rightDeep = rfsToPushDown.get(0);
List<Plan> rightDeepAncestors = rfsToPushDown.get(0).getBuilderNode().getAncestors();
int rightDeepAncestorsSize = rightDeepAncestors.size();
RuntimeFilter leftTop = rfsToPushDown.get(0);
int leftTopAncestorsSize = rightDeepAncestorsSize;
for (RuntimeFilter rf : rfsToPushDown) {
List<Plan> ancestors = rf.getBuilderNode().getAncestors();
int currentAncestorsSize = ancestors.size();
if (currentAncestorsSize > rightDeepAncestorsSize) {
rightDeep = rf;
rightDeepAncestorsSize = currentAncestorsSize;
rightDeepAncestors = ancestors;
}
if (currentAncestorsSize < leftTopAncestorsSize) {
leftTopAncestorsSize = currentAncestorsSize;
leftTop = rf;
}
}
Preconditions.checkArgument(rightDeepAncestors.contains(leftTop.getBuilderNode()));
// check nodes between right deep and left top are SPJ and not denied join and not mark join
boolean valid = true;
for (Plan cursor : rightDeepAncestors) {
if (cursor.equals(leftTop.getBuilderNode())) {
break;
}
// valid = valid && SPJ_PLAN.contains(cursor.getClass());
if (cursor instanceof AbstractPhysicalJoin) {
AbstractPhysicalJoin cursorJoin = (AbstractPhysicalJoin) cursor;
valid = (!RuntimeFilterGenerator.DENIED_JOIN_TYPES
.contains(cursorJoin.getJoinType())
|| cursorJoin.isMarkJoin()) && valid;
}
if (!valid) {
break;
}
}
if (!valid) {
break;
}
Expression rightDeepTargetExpressionOnCTE = null;
int targetCount = rightDeep.getTargetExpressions().size();
for (int i = 0; i < targetCount; i++) {
PhysicalRelation rel = rightDeep.getTargetScans().get(i);
if (rel instanceof PhysicalCTEConsumer
&& ((PhysicalCTEConsumer) rel).getCteId().equals(cteId)) {
rightDeepTargetExpressionOnCTE = rightDeep.getTargetExpressions().get(i);
break;
}
}
boolean pushedDown = doPushDownIntoCTEProducerInternal(
rightDeep,
rightDeepTargetExpressionOnCTE,
rfCtx,
rfCtx.getCteProduceMap().get(cteId)
);
if (pushedDown) {
rfCtx.removeFilter(
rightDeepTargetExpressionOnCTE.getInputSlotExprIds().iterator().next(),
(PhysicalHashJoin) rightDeep.getBuilderNode());
}
}
}
}
}
}
return result;
}
/**
* the runtime filter generator run at the phase of post process and plan translation of nereids planner.
* post process:
@ -117,18 +247,40 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
@Override
public PhysicalPlan visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
join.right().accept(this, context);
join.left().accept(this, context);
if (RuntimeFilterGenerator.DENIED_JOIN_TYPES.contains(join.getJoinType()) || join.isMarkJoin()) {
join.right().getOutput().forEach(slot ->
context.getRuntimeFilterContext().aliasTransferMapRemove(slot));
}
collectPushDownCTEInfos(join, context);
if (!getPushDownCTECandidates(ctx).isEmpty()) {
pushDownRuntimeFilterIntoCTE(ctx);
} else {
pushDownRuntimeFilterCommon(join, context);
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
.filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
.collect(Collectors.toList());
List<Expression> hashJoinConjuncts = join.getHashJoinConjuncts().stream().collect(Collectors.toList());
boolean buildSideContainsConsumer = hasCTEConsumerDescendant((PhysicalPlan) join.right());
for (int i = 0; i < hashJoinConjuncts.size(); i++) {
// BE do not support RF generated from NullSafeEqual, skip them
if (hashJoinConjuncts.get(i) instanceof EqualTo) {
EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
(EqualTo) hashJoinConjuncts.get(i), join.left().getOutputSet()));
for (TRuntimeFilterType type : legalTypes) {
//bitmap rf is generated by nested loop join.
if (type == TRuntimeFilterType.BITMAP) {
continue;
}
long buildSideNdv = getBuildSideNdv(join, equalTo);
Pair<PhysicalRelation, Slot> pair = ctx.getAliasTransferMap().get(equalTo.right());
// CteConsumer is not allowed to generate RF in order to avoid RF cycle.
if ((pair == null && buildSideContainsConsumer)
|| (pair != null && pair.first instanceof PhysicalCTEConsumer)) {
continue;
}
join.pushDownRuntimeFilter(context, generator, join, equalTo.right(),
equalTo.left(), type, buildSideNdv, i);
}
}
}
return join;
}
@ -362,235 +514,18 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
return expression instanceof Slot ? ((Slot) expression) : null;
}
private void pushDownRuntimeFilterCommon(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
.filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
.collect(Collectors.toList());
List<Expression> hashJoinConjuncts = join.getHashJoinConjuncts().stream().collect(Collectors.toList());
for (int i = 0; i < hashJoinConjuncts.size(); i++) {
// BE do not support RF generated from NullSafeEqual, skip them
if (hashJoinConjuncts.get(i) instanceof EqualTo) {
EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
(EqualTo) hashJoinConjuncts.get(i), join.left().getOutputSet()));
for (TRuntimeFilterType type : legalTypes) {
//bitmap rf is generated by nested loop join.
if (type == TRuntimeFilterType.BITMAP) {
continue;
}
long buildSideNdv = getBuildSideNdv(join, equalTo);
join.pushDownRuntimeFilter(context, generator, join, equalTo.right(),
equalTo.left(), type, buildSideNdv, i);
}
}
}
}
private void collectPushDownCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
Set<CTEId> cteIds = new LinkedHashSet<>(); // use LinkedHashSet to make runtime filter order stable
PhysicalPlan leftChild = (PhysicalPlan) join.left();
PhysicalPlan rightChild = (PhysicalPlan) join.right();
Preconditions.checkState(leftChild != null && rightChild != null);
boolean leftHasCTE = hasCTEConsumerUnderJoin(leftChild, cteIds);
boolean rightHasCTE = hasCTEConsumerUnderJoin(rightChild, cteIds);
// only support single cte in join currently
if ((leftHasCTE && !rightHasCTE) || (!leftHasCTE && rightHasCTE)) {
for (CTEId id : cteIds) {
if (ctx.getCteToJoinsMap().get(id) == null) {
// use LinkedHashSet to make runtime filter order stable
Set<PhysicalHashJoin> newJoin = new LinkedHashSet<>();
newJoin.add(join);
ctx.getCteToJoinsMap().put(id, newJoin);
} else {
ctx.getCteToJoinsMap().get(id).add(join);
}
}
}
if (!ctx.getCteToJoinsMap().isEmpty()) {
analyzeRuntimeFilterPushDownIntoCTEInfos(join, context);
}
}
private List<CTEId> getPushDownCTECandidates(RuntimeFilterContext ctx) {
List<CTEId> candidates = new ArrayList<>();
Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
CTEId cteId = entry.getKey().getCteId();
if (ctx.getPushedDownCTE().contains(cteId)) {
continue;
}
candidates.add(cteId);
}
return candidates;
}
private boolean hasCTEConsumerUnderJoin(PhysicalPlan root, Set<CTEId> cteIds) {
if (root instanceof PhysicalCTEConsumer) {
cteIds.add(((PhysicalCTEConsumer) root).getCteId());
return true;
} else if (root.children().size() != 1) {
// only collect cte in one side
return false;
} else if (root instanceof PhysicalDistribute
|| root instanceof PhysicalFilter
|| root instanceof PhysicalProject) {
// only collect cte as single child node under join
return hasCTEConsumerUnderJoin((PhysicalPlan) root.child(0), cteIds);
} else {
return false;
}
}
private void analyzeRuntimeFilterPushDownIntoCTEInfos(PhysicalHashJoin<? extends Plan, ? extends Plan> curJoin,
CascadesContext context) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
Map<CTEId, Set<PhysicalHashJoin>> cteToJoinsMap = ctx.getCteToJoinsMap();
for (Map.Entry<CTEId, Set<PhysicalHashJoin>> entry : cteToJoinsMap.entrySet()) {
CTEId cteId = entry.getKey();
Set<PhysicalHashJoin> joinSet = entry.getValue();
if (joinSet.contains(curJoin)) {
// skip current join
continue;
}
Set<LogicalCTEConsumer> cteSet = context.getCteIdToConsumers().get(cteId);
Preconditions.checkState(!cteSet.isEmpty());
String cteName = cteSet.iterator().next().getName();
// preconditions for rf pushing into cte producer:
// multiple joins whose join condition is on the same cte's column of the same cte
// the other side of these join conditions are the same column of the same table, or
// they in the same equal sets, such as under an equal join condition
// case 1: two joins with t1.c1 = cte1_consumer1.c1 and t1.c1 = cte1_consumer2.c1 conditions
// rf of t1.c1 can be pushed down into cte1 producer.
// ----------------------hashJoin(t1.c1 = cte2_consumer1.c1)
// ----------------------------CteConsumer[cteId= ( CTEId#1=] )
// ----------------------------PhysicalOlapScan[t1]
// ----------------------hashJoin(t1.c1 = cte2_consumer2.c1)
// ----------------------------CteConsumer[cteId= ( CTEId#1=] )
// ----------------------------PhysicalOlapScan[t1]
// case 2: two joins with t1.c1 = cte2_consumer1.c1 and t2.c2 = cte2_consumer2.c1 and another equal join
// condition t1.c1 = t2.c2, which means t1.c1 and t2.c2 are in the same equal set.
// rf of t1.c1 and t2.c2 can be pushed down into cte2 producer.
// --------------------hashJoin(t1.c1 = t2.c2)
// ----------------------hashJoin(t2.c2 = cte2_consumer1.c1)
// ----------------------------CteConsumer[cteId= ( CTEId#1=] )
// ----------------------------PhysicalOlapScan[t2]
// ----------------------hashJoin(t1.c1 = cte2_consumer2.c1)
// ----------------------------CteConsumer[cteId= ( CTEId#1=] )
// ----------------------------PhysicalOlapScan[t1]
if (joinSet.size() != cteSet.size()) {
continue;
}
List<EqualTo> equalTos = new ArrayList<>();
Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = new LinkedHashMap<>();
for (PhysicalHashJoin join : joinSet) {
// precondition:
// 1. no non-equal join condition
// 2. only equalTo and slotReference both sides
// 3. only support one join condition (will be refined further)
if (join.getOtherJoinConjuncts().size() > 1
|| join.getHashJoinConjuncts().size() != 1
|| !(join.getHashJoinConjuncts().get(0) instanceof EqualTo)) {
break;
} else {
EqualTo equalTo = join.getEqualToConjuncts().get(0);
equalTos.add(equalTo);
equalCondToJoinMap.put(equalTo, join);
}
}
if (joinSet.size() == equalTos.size()) {
int matchNum = 0;
Set<String> cteNameSet = new HashSet<>();
Set<SlotReference> anotherSideSlotSet = new HashSet<>();
for (EqualTo equalTo : equalTos) {
SlotReference left = (SlotReference) equalTo.left();
SlotReference right = (SlotReference) equalTo.right();
if (left.getQualifier().size() == 1 && left.getQualifier().get(0).equals(cteName)) {
matchNum += 1;
anotherSideSlotSet.add(right);
cteNameSet.add(left.getQualifiedName());
} else if (right.getQualifier().size() == 1 && right.getQualifier().get(0).equals(cteName)) {
matchNum += 1;
anotherSideSlotSet.add(left);
cteNameSet.add(right.getQualifiedName());
}
}
if (matchNum == equalTos.size() && cteNameSet.size() == 1) {
// means all join condition points to the same cte on the same cte column.
// collect the other side columns besides cte column side.
Preconditions.checkState(equalTos.size() == equalCondToJoinMap.size(),
"equalTos.size() != equalCondToJoinMap.size()");
PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext().getCteProduceMap().get(cteId);
if (anotherSideSlotSet.size() == 1) {
// meet requirement for pushing down into cte producer
ctx.getCteRFPushDownMap().put(cteProducer, equalCondToJoinMap);
} else {
// check further whether the join upper side can bring equal set, which
// indicating actually the same runtime filter build side
// see above case 2 for reference
boolean inSameEqualSet = false;
for (EqualTo e : curJoin.getEqualToConjuncts()) {
if (e instanceof EqualTo) {
SlotReference oneSide = (SlotReference) e.left();
SlotReference anotherSide = (SlotReference) e.right();
if (anotherSideSlotSet.contains(oneSide) && anotherSideSlotSet.contains(anotherSide)) {
inSameEqualSet = true;
break;
}
}
}
if (inSameEqualSet) {
ctx.getCteRFPushDownMap().put(cteProducer, equalCondToJoinMap);
}
}
}
}
}
}
private void pushDownRuntimeFilterIntoCTE(RuntimeFilterContext ctx) {
Map<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> cteRFPushDownMap = ctx.getCteRFPushDownMap();
for (Map.Entry<PhysicalCTEProducer, Map<EqualTo, PhysicalHashJoin>> entry : cteRFPushDownMap.entrySet()) {
PhysicalCTEProducer cteProducer = entry.getKey();
Preconditions.checkState(cteProducer != null);
if (ctx.getPushedDownCTE().contains(cteProducer.getCteId())) {
continue;
}
Map<EqualTo, PhysicalHashJoin> equalCondToJoinMap = entry.getValue();
for (Map.Entry<EqualTo, PhysicalHashJoin> innerEntry : equalCondToJoinMap.entrySet()) {
EqualTo equalTo = innerEntry.getKey();
PhysicalHashJoin<? extends Plan, ? extends Plan> join = innerEntry.getValue();
Preconditions.checkState(join != null);
TRuntimeFilterType type = TRuntimeFilterType.IN_OR_BLOOM;
if (ctx.getSessionVariable().getEnablePipelineEngine() && !join.isBroadCastJoin()) {
type = TRuntimeFilterType.BLOOM;
}
EqualTo newEqualTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
equalTo, join.child(0).getOutputSet()));
doPushDownIntoCTEProducerInternal(join, ctx, newEqualTo, type, cteProducer);
}
ctx.getPushedDownCTE().add(cteProducer.getCteId());
}
}
private void doPushDownIntoCTEProducerInternal(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, PhysicalCTEProducer cteProducer) {
private boolean doPushDownIntoCTEProducerInternal(RuntimeFilter rf, Expression targetExpression,
RuntimeFilterContext ctx, PhysicalCTEProducer cteProducer) {
PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
Slot unwrappedSlot = checkTargetChild(equalTo.left());
Slot unwrappedSlot = checkTargetChild(targetExpression);
// aliasTransMap doesn't contain the key, means that the path from the scan to the join
// contains join with denied join type. for example: a left join b on a.id = b.id
if (!checkPushDownPreconditionsForJoin(join, ctx, unwrappedSlot)) {
return;
if (!checkPushDownPreconditionsForJoin(rf.getBuilderNode(), ctx, unwrappedSlot)) {
return false;
}
Slot cteSlot = ctx.getAliasTransferPair(unwrappedSlot).second;
PhysicalRelation cteNode = ctx.getAliasTransferPair(unwrappedSlot).first;
long buildSideNdv = getBuildSideNdv(join, equalTo);
long buildSideNdv = rf.getBuildSideNdv();
if (cteNode instanceof PhysicalCTEConsumer && inputPlanNode instanceof PhysicalProject) {
PhysicalProject project = (PhysicalProject) inputPlanNode;
NamedExpression targetExpr = null;
@ -602,12 +537,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
}
}
Preconditions.checkState(targetExpr != null);
if (!(targetExpr instanceof SlotReference)) {
// if not SlotReference, skip the push down
return;
} else if (!checkCanPushDownIntoBasicTable(project)) {
return;
} else {
if (targetExpr instanceof SlotReference && checkCanPushDownIntoBasicTable(project)) {
Map<Slot, PhysicalRelation> pushDownBasicTableInfos = getPushDownBasicTablesInfos(project,
(SlotReference) targetExpr, ctx);
if (!pushDownBasicTableInfos.isEmpty()) {
@ -623,22 +553,24 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
targetList.add(targetSlot);
targetExpressions.add(targetSlot);
targetNodes.add(scan);
ctx.addJoinToTargetMap(join, targetSlot.getExprId());
ctx.addJoinToTargetMap(rf.getBuilderNode(), targetSlot.getExprId());
ctx.setTargetsOnScanNode(scan, targetSlot);
}
// build multi-target runtime filter
// since always on different join, set the expr_order as 0
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
equalTo.right(), targetList, targetExpressions, type, 0, join, buildSideNdv, true,
rf.getSrcExpr(), targetList, targetExpressions, rf.getType(), rf.getExprOrder(),
rf.getBuilderNode(), buildSideNdv, rf.isBloomFilterSizeCalculatedByNdv(),
cteNode);
targetNodes.forEach(node -> node.addAppliedRuntimeFilter(filter));
for (Slot slot : targetList) {
ctx.setTargetExprIdToFilter(slot.getExprId(), filter);
}
ctx.setRuntimeFilterIdentityToFilter(equalTo.right(), type, join, filter);
ctx.setRuntimeFilterIdentityToFilter(rf.getSrcExpr(), rf.getType(), rf.getBuilderNode(), filter);
return true;
}
}
}
return false;
}
@Override

View File

@ -26,16 +26,21 @@ import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Statistics;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.util.List;
@ -58,6 +63,9 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
@Override
public Plan visit(Plan plan, CascadesContext context) {
if (!plan.children().isEmpty()) {
Preconditions.checkArgument(plan.children().size() == 1,
plan.getClass().getSimpleName()
+ " has more than one child, needs its own visitor implementation");
plan.child(0).accept(this, context);
if (context.getRuntimeFilterContext().isEffectiveSrcNode(plan.child(0))) {
RuntimeFilterContext.EffectiveSrcType childType = context.getRuntimeFilterContext()
@ -68,6 +76,45 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
return plan;
}
@Override
public PhysicalSetOperation visitPhysicalSetOperation(PhysicalSetOperation setOperation, CascadesContext context) {
for (Plan child : setOperation.children()) {
child.accept(this, context);
}
return setOperation;
}
@Override
public PhysicalIntersect visitPhysicalIntersect(PhysicalIntersect intersect, CascadesContext context) {
for (Plan child : intersect.children()) {
child.accept(this, context);
}
context.getRuntimeFilterContext().addEffectiveSrcNode(intersect, RuntimeFilterContext.EffectiveSrcType.NATIVE);
return intersect;
}
@Override
public PhysicalNestedLoopJoin visitPhysicalNestedLoopJoin(
PhysicalNestedLoopJoin<? extends Plan, ? extends Plan> join,
CascadesContext context) {
join.right().accept(this, context);
join.left().accept(this, context);
if (context.getRuntimeFilterContext().isEffectiveSrcNode(join.child(0))) {
RuntimeFilterContext.EffectiveSrcType childType = context.getRuntimeFilterContext()
.getEffectiveSrcType(join.child(0));
context.getRuntimeFilterContext().addEffectiveSrcNode(join, childType);
}
return join;
}
@Override
public PhysicalCTEAnchor visitPhysicalCTEAnchor(PhysicalCTEAnchor<? extends Plan, ? extends Plan> cteAnchor,
CascadesContext context) {
cteAnchor.child(0).accept(this, context);
cteAnchor.child(1).accept(this, context);
return cteAnchor;
}
@Override
public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext context) {
topN.child().accept(this, context);

View File

@ -35,6 +35,7 @@ import org.apache.doris.statistics.Statistics;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Lists;
import org.json.JSONArray;
import org.json.JSONObject;
@ -191,4 +192,18 @@ public abstract class AbstractPlan extends AbstractTreeNode<Plan> implements Pla
public int getId() {
return id.asInt();
}
/**
* ancestors in the tree
*/
public List<Plan> getAncestors() {
List<Plan> ancestors = Lists.newArrayList();
ancestors.add(this);
Optional<Object> parent = this.getMutableState(MutableState.KEY_PARENT);
while (parent.isPresent()) {
ancestors.add((Plan) parent.get());
parent = ((Plan) parent.get()).getMutableState(MutableState.KEY_PARENT);
}
return ancestors;
}
}

View File

@ -25,7 +25,6 @@ import org.apache.doris.nereids.processor.post.RuntimeFilterGenerator;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Explainable;
@ -129,7 +128,7 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi
filter.addTargetSlot(scanSlot, probeExpr, scan);
ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId());
ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(ctx.getAliasTransferPair((NamedExpression) probeExpr).first, scanSlot);
ctx.setTargetsOnScanNode(ctx.getAliasTransferPair(probeSlot).first, scanSlot);
}
} else {
filter = new RuntimeFilter(generator.getNextId(),
@ -139,7 +138,7 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi
this.addAppliedRuntimeFilter(filter);
ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId());
ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(ctx.getAliasTransferPair((NamedExpression) probeSlot).first, scanSlot);
ctx.setTargetsOnScanNode(ctx.getAliasTransferPair(probeSlot).first, scanSlot);
ctx.setRuntimeFilterIdentityToFilter(src, type, builderNode, filter);
}
return true;

View File

@ -102,8 +102,13 @@ public class PhysicalCTEConsumer extends PhysicalRelation {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
if (!getAppliedRuntimeFilters().isEmpty()) {
getAppliedRuntimeFilters()
.stream().forEach(rf -> builder.append(" RF").append(rf.getId().asInt()));
}
return Utils.toSqlString("PhysicalCTEConsumer[" + id.asInt() + "]",
"cteId", cteId);
"stats", getStats(), "cteId", cteId, "RFs", builder);
}
@Override
@ -136,8 +141,15 @@ public class PhysicalCTEConsumer extends PhysicalRelation {
@Override
public String shapeInfo() {
return Utils.toSqlString("PhysicalCteConsumer",
"cteId", cteId);
StringBuilder shapeBuilder = new StringBuilder();
shapeBuilder.append(Utils.toSqlString("PhysicalCteConsumer",
"cteId", cteId));
if (!getAppliedRuntimeFilters().isEmpty()) {
shapeBuilder.append(" apply RFs:");
getAppliedRuntimeFilters()
.stream().forEach(rf -> shapeBuilder.append(" RF").append(rf.getId().asInt()));
}
return shapeBuilder.toString();
}
@Override

View File

@ -70,8 +70,8 @@ public class PhysicalDistribute<CHILD_TYPE extends Plan> extends PhysicalUnary<C
@Override
public String toString() {
return Utils.toSqlString("PhysicalDistribute[" + id.asInt() + "]" + getGroupIdWithPrefix(),
"distributionSpec", distributionSpec,
"stats", statistics
"stats", statistics,
"distributionSpec", distributionSpec
);
}

View File

@ -76,8 +76,8 @@ public class PhysicalFilter<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD
@Override
public String toString() {
return Utils.toSqlString("PhysicalFilter[" + id.asInt() + "]" + getGroupIdWithPrefix(),
"predicates", getPredicate(),
"stats", statistics
"stats", statistics,
"predicates", getPredicate()
);
}

View File

@ -223,10 +223,10 @@ public class PhysicalHashJoin<
AbstractPhysicalPlan builderRightNode = (AbstractPhysicalPlan) builderNode.child(1);
Preconditions.checkState(builderLeftNode != null && builderRightNode != null,
"builder join node child node is null");
if (RuntimeFilterGenerator.hasCTEConsumerDescendant(builderLeftNode)
&& RuntimeFilterGenerator.hasCTEConsumerDescendant(builderRightNode)) {
return false;
}
// if (RuntimeFilterGenerator.hasCTEConsumerDescendant(builderLeftNode)
// && RuntimeFilterGenerator.hasCTEConsumerDescendant(builderRightNode)) {
// return false;
// }
boolean pushedDown = false;
AbstractPhysicalPlan leftNode = (AbstractPhysicalPlan) child(0);
@ -235,23 +235,25 @@ public class PhysicalHashJoin<
"join child node is null");
Set<Expression> probExprList = Sets.newHashSet(probeExpr);
Pair<PhysicalRelation, Slot> pair = ctx.getAliasTransferMap().get(probeExpr);
PhysicalRelation target1 = (pair == null) ? null : pair.first;
Pair<PhysicalRelation, Slot> srcPair = ctx.getAliasTransferMap().get(srcExpr);
PhysicalRelation srcNode = (srcPair == null) ? null : srcPair.first;
Pair<PhysicalRelation, Slot> targetPair = ctx.getAliasTransferMap().get(probeExpr);
// when probeExpr is output slot of setOperator, targetPair is null
PhysicalRelation target1 = (targetPair == null) ? null : targetPair.first;
PhysicalRelation target2 = null;
pair = ctx.getAliasTransferMap().get(srcExpr);
PhysicalRelation srcNode = (pair == null) ? null : pair.first;
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable().expandRuntimeFilterByInnerJoin) {
if (!this.equals(builderNode) && this.getJoinType() == JoinType.INNER_JOIN) {
if (!this.equals(builderNode)
&& (this.getJoinType() == JoinType.INNER_JOIN || this.getJoinType().isSemiJoin())) {
for (Expression expr : this.getHashJoinConjuncts()) {
EqualPredicate equalTo = (EqualPredicate) expr;
if (probeExpr.equals(equalTo.left())) {
probExprList.add(equalTo.right());
pair = ctx.getAliasTransferMap().get(equalTo.right());
target2 = (pair == null) ? null : pair.first;
targetPair = ctx.getAliasTransferMap().get(equalTo.right());
target2 = (targetPair == null) ? null : targetPair.first;
} else if (probeExpr.equals(equalTo.right())) {
probExprList.add(equalTo.left());
pair = ctx.getAliasTransferMap().get(equalTo.left());
target2 = (pair == null) ? null : pair.first;
targetPair = ctx.getAliasTransferMap().get(equalTo.left());
target2 = (targetPair == null) ? null : targetPair.first;
}
if (target2 != null) {
ctx.getExpandedRF().add(

View File

@ -24,7 +24,6 @@ import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.TableSample;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.PreAggStatus;
@ -120,9 +119,13 @@ public class PhysicalOlapScan extends PhysicalCatalogRelation implements OlapSca
@Override
public String toString() {
return Utils.toSqlString("PhysicalOlapScan[" + relationId.asInt() + "]" + getGroupIdWithPrefix(),
"qualified", Utils.qualifiedName(qualifier, table.getName()),
"stats", statistics, "fr", getMutableState(AbstractPlan.FRAGMENT_ID)
StringBuilder builder = new StringBuilder();
if (!getAppliedRuntimeFilters().isEmpty()) {
getAppliedRuntimeFilters()
.stream().forEach(rf -> builder.append(" RF").append(rf.getId().asInt()));
}
return Utils.toSqlString("PhysicalOlapScan[" + table.getName() + "]" + getGroupIdWithPrefix(),
"stats", statistics, "RFs", builder
);
}

View File

@ -33,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.statistics.Statistics;
@ -77,8 +78,8 @@ public class PhysicalProject<CHILD_TYPE extends Plan> extends PhysicalUnary<CHIL
@Override
public String toString() {
return Utils.toSqlString("PhysicalProject[" + id.asInt() + "]" + getGroupIdWithPrefix(),
"projects", projects,
"stats", statistics
"stats", statistics, "projects", projects
);
}
@ -187,7 +188,12 @@ public class PhysicalProject<CHILD_TYPE extends Plan> extends PhysicalUnary<CHIL
}
NamedExpression newProbeExpr = this.getProjects().get(projIndex);
if (newProbeExpr instanceof Alias) {
newProbeExpr = (NamedExpression) newProbeExpr.child(0);
Expression child = ExpressionUtils.getExpressionCoveredByCast(newProbeExpr.child(0));
if (child instanceof NamedExpression) {
newProbeExpr = (NamedExpression) child;
} else {
return false;
}
}
Slot newProbeSlot = RuntimeFilterGenerator.checkTargetChild(newProbeExpr);
if (!RuntimeFilterGenerator.checkPushDownPreconditionsForJoin(builderNode, ctx, newProbeSlot)) {

View File

@ -141,15 +141,21 @@ public class Utils {
}
for (int i = 0; i < variables.length - 1; i += 2) {
stringBuilder.append(variables[i]).append("=").append(variables[i + 1]);
if (i < variables.length - 2) {
stringBuilder.append(", ");
if (! "".equals(toStringOrNull(variables[i + 1]))) {
if (i != 0) {
stringBuilder.append(", ");
}
stringBuilder.append(toStringOrNull(variables[i])).append("=").append(toStringOrNull(variables[i + 1]));
}
}
return stringBuilder.append(" )").toString();
}
private static String toStringOrNull(Object obj) {
return obj == null ? "null" : obj.toString();
}
/**
* Get the correlated columns that belong to the subquery,
* that is, the correlated columns that can be resolved within the subquery.

View File

@ -147,7 +147,7 @@ public class Statistics {
return "-Infinite";
}
DecimalFormat format = new DecimalFormat("#,###.##");
return format.format(rowCount) + " " + widthInJoinCluster;
return format.format(rowCount);
}
public int getBENumber() {