[opt](nereids) enhance runtime filter pushdown (#21883)

Current runtime filter can't be pushed down into complicated plan pattern, such as set operation as join child and cte sender as filter before shuffling. This pr refines the pushing down ability and can able to push the filter into different plan tree layer recursively, such as nested subquery, set op, cte sender, etc.
This commit is contained in:
xzj7019
2023-07-21 23:31:30 +08:00
committed by GitHub
parent afeac4419f
commit 0b1c82b021
18 changed files with 574 additions and 127 deletions

View File

@ -128,6 +128,7 @@ import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.AggregationNode;
import org.apache.doris.planner.AnalyticEvalNode;
import org.apache.doris.planner.AssertNumRowsNode;
import org.apache.doris.planner.CTEScanNode;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.EmptySetNode;
@ -296,6 +297,14 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
dataStreamSink.setOutputPartition(dataPartition);
parentFragment.addChild(inputFragment);
((MultiCastPlanFragment) inputFragment).addToDest(exchangeNode);
CTEScanNode cteScanNode = context.getCteScanNodeMap().get(inputFragment.getFragmentId());
Preconditions.checkState(cteScanNode != null, "cte scan node is null");
cteScanNode.setFragment(inputFragment);
cteScanNode.setPlanNodeId(exchangeNode.getId());
context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator ->
runtimeFilterTranslator.getContext().getPlanNodeIdToCTEDataSinkMap()
.put(cteScanNode.getId(), dataStreamSink));
} else {
inputFragment.setDestination(exchangeNode);
inputFragment.setOutputPartition(dataPartition);
@ -813,6 +822,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
PhysicalCTEProducer<?> cteProducer = context.getCteProduceMap().get(cteId);
Preconditions.checkState(cteProducer != null, "invalid cteProducer");
context.getCteConsumerMap().put(cteId, cteConsumer);
// set datasink to multicast data sink but do not set target now
// target will be set when translate distribute
DataStreamSink streamSink = new DataStreamSink();
@ -821,11 +831,19 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
multiCastDataSink.getDestinations().add(Lists.newArrayList());
// update expr to slot mapping
TupleDescriptor tupleDescriptor = null;
for (Slot producerSlot : cteProducer.getOutput()) {
Slot consumerSlot = cteConsumer.getProducerToConsumerSlotMap().get(producerSlot);
SlotRef slotRef = context.findSlotRef(producerSlot.getExprId());
tupleDescriptor = slotRef.getDesc().getParent();
context.addExprIdSlotRefPair(consumerSlot.getExprId(), slotRef);
}
CTEScanNode cteScanNode = new CTEScanNode(tupleDescriptor);
context.getRuntimeTranslator().ifPresent(runtimeFilterTranslator ->
runtimeFilterTranslator.getTargetOnScanNode(cteConsumer.getRelationId()).forEach(
expr -> runtimeFilterTranslator.translateRuntimeFilterTarget(expr, cteScanNode, context)));
context.getCteScanNodeMap().put(multiCastFragment.getFragmentId(), cteScanNode);
return multiCastFragment;
}
@ -835,6 +853,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
PlanFragment child = cteProducer.child().accept(this, context);
CTEId cteId = cteProducer.getCteId();
context.getPlanFragments().remove(child);
MultiCastPlanFragment multiCastPlanFragment = new MultiCastPlanFragment(child);
MultiCastDataSink multiCastDataSink = new MultiCastDataSink();
multiCastPlanFragment.setSink(multiCastDataSink);

View File

@ -32,8 +32,10 @@ import org.apache.doris.nereids.trees.expressions.CTEId;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.VirtualSlotReference;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.planner.CTEScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNode;
@ -87,6 +89,10 @@ public class PlanTranslatorContext {
private final Map<CTEId, PhysicalCTEProducer> cteProducerMap = Maps.newHashMap();
private final Map<CTEId, PhysicalCTEConsumer> cteConsumerMap = Maps.newHashMap();
private final Map<PlanFragmentId, CTEScanNode> cteScanNodeMap = Maps.newHashMap();
public PlanTranslatorContext(CascadesContext ctx) {
this.translator = new RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
}
@ -108,6 +114,14 @@ public class PlanTranslatorContext {
return cteProducerMap;
}
public Map<CTEId, PhysicalCTEConsumer> getCteConsumerMap() {
return cteConsumerMap;
}
public Map<PlanFragmentId, CTEScanNode> getCteScanNodeMap() {
return cteScanNodeMap;
}
public TupleDescriptor generateTupleDesc() {
return descTable.createTupleDescriptor();
}

View File

@ -31,12 +31,15 @@ import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.planner.CTEScanNode;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.HashJoinNode;
import org.apache.doris.planner.HashJoinNode.DistributionMode;
import org.apache.doris.planner.JoinNodeBase;
import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.collect.ImmutableList;
@ -122,6 +125,7 @@ public class RuntimeFilterTranslator {
hasInvalidTarget = true;
break;
}
ScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(curTargetExpr);
Expr targetExpr;
if (filter.getType() == TRuntimeFilterType.BITMAP) {
if (curTargetExpression.equals(curTargetExpr)) {
@ -141,7 +145,6 @@ public class RuntimeFilterTranslator {
SlotRef targetSlot = target.getSrcSlotRef();
TupleId targetTupleId = targetSlot.getDesc().getParent().getId();
SlotId targetSlotId = targetSlot.getSlotId();
ScanNode scanNode = context.getScanNodeOfLegacyRuntimeFilterTarget().get(curTargetExpr);
scanNodeList.add(scanNode);
targetExprList.add(targetExpr);
targetTupleIdMapList.add(ImmutableMap.of(targetTupleId, ImmutableList.of(targetSlotId)));
@ -157,7 +160,8 @@ public class RuntimeFilterTranslator {
//bitmap rf requires isBroadCast=false, it always requires merge filter
origFilter.setIsBroadcast(false);
}
boolean isLocalTarget = scanNodeList.stream().allMatch(e -> e.getFragmentId().equals(node.getFragmentId()));
boolean isLocalTarget = scanNodeList.stream().allMatch(e ->
!(e instanceof CTEScanNode) && e.getFragmentId().equals(node.getFragmentId()));
for (int i = 0; i < targetExprList.size(); i++) {
ScanNode scanNode = scanNodeList.get(i);
Expr targetExpr = targetExprList.get(i);
@ -165,7 +169,15 @@ public class RuntimeFilterTranslator {
scanNode, targetExpr, true, isLocalTarget));
}
origFilter.setBitmapFilterNotIn(filter.isBitmapFilterNotIn());
context.getLegacyFilters().add(finalize(origFilter));
org.apache.doris.planner.RuntimeFilter finalizedFilter = finalize(origFilter);
scanNodeList.stream().filter(e -> e.getStatisticalType() == StatisticalType.CTE_SCAN_NODE)
.forEach(f -> {
DataStreamSink sink = context.getPlanNodeIdToCTEDataSinkMap().get(f.getId());
if (sink != null) {
sink.addRuntimeFilter(finalizedFilter);
}
});
context.getLegacyFilters().add(finalizedFilter);
}
}

View File

@ -33,6 +33,8 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.RuntimeFilterGenerator.FilterSizeLimits;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.planner.ScanNode;
@ -60,6 +62,7 @@ public class RuntimeFilterContext {
// exprId of target to runtime filter.
private final Map<ExprId, List<RuntimeFilter>> targetExprIdToFilter = Maps.newHashMap();
private final Map<PlanNodeId, DataStreamSink> planNodeIdToCTEDataSinkMap = Maps.newHashMap();
private final Map<Plan, List<ExprId>> joinToTargetExprId = Maps.newHashMap();
// olap scan node that contains target of a runtime filter.
@ -165,6 +168,10 @@ public class RuntimeFilterContext {
return exprIdToOlapScanNodeSlotRef;
}
public Map<PlanNodeId, DataStreamSink> getPlanNodeIdToCTEDataSinkMap() {
return planNodeIdToCTEDataSinkMap;
}
public Map<NamedExpression, Pair<PhysicalRelation, Slot>> getAliasTransferMap() {
return aliasTransferMap;
}

View File

@ -39,16 +39,13 @@ import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalExcept;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion;
import org.apache.doris.nereids.trees.plans.physical.RuntimeFilter;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.JoinUtils;
@ -152,8 +149,14 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
@Override
public PhysicalCTEProducer visitPhysicalCTEProducer(PhysicalCTEProducer producer, CascadesContext context) {
CTEId id = producer.getCteId();
context.getRuntimeFilterContext().getCteProduceMap().put(id, producer);
CTEId cteId = producer.getCteId();
context.getRuntimeFilterContext().getCteProduceMap().put(cteId, producer);
Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
if (!processedCTE.contains(cteId)) {
PhysicalPlan inputPlanNode = (PhysicalPlan) producer.child(0);
inputPlanNode.accept(this, context);
processedCTE.add(cteId);
}
return producer;
}
@ -194,7 +197,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
TRuntimeFilterType type = TRuntimeFilterType.BITMAP;
Set<Slot> targetSlots = bitmapContains.child(1).getInputSlots();
for (Slot targetSlot : targetSlots) {
if (!checkCanPushDownFromJoinType(join, ctx, targetSlot)) {
if (!checkPushDownPreconditions(join, ctx, targetSlot)) {
continue;
}
Slot olapScanSlot = aliasTransferMap.get(targetSlot).second;
@ -251,7 +254,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
return buildColStats.isUnKnown ? -1 : Math.max(1, (long) buildColStats.ndv);
}
private static Slot checkTargetChild(Expression leftChild) {
public static Slot checkTargetChild(Expression leftChild) {
Expression expression = ExpressionUtils.getExpressionCoveredByCast(leftChild);
return expression instanceof Slot ? ((Slot) expression) : null;
}
@ -262,8 +265,6 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
.filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
.collect(Collectors.toList());
// TODO: some complex situation cannot be handled now, see testPushDownThroughJoin.
// we will support it in later version.
for (int i = 0; i < join.getHashJoinConjuncts().size(); i++) {
EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
(EqualTo) join.getHashJoinConjuncts().get(i), join.left().getOutputSet()));
@ -272,112 +273,9 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
if (type == TRuntimeFilterType.BITMAP) {
continue;
}
if (join.left() instanceof PhysicalUnion
|| join.left() instanceof PhysicalIntersect
|| join.left() instanceof PhysicalExcept) {
doPushDownIntoSetOperation(join, ctx, equalTo, type, i);
} else {
doPushDownBasic(join, context, ctx, equalTo, type, i);
}
}
}
}
private void doPushDownBasic(PhysicalHashJoin<? extends Plan, ? extends Plan> join, CascadesContext context,
RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
// currently, we can ensure children in the two side are corresponding to the equal_to's.
// so right maybe an expression and left is a slot
Slot unwrappedSlot = checkTargetChild(equalTo.left());
// aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
// contains join with denied join type. for example: a left join b on a.id = b.id
if (!checkCanPushDownFromJoinType(join, ctx, unwrappedSlot)) {
return;
}
Slot olapScanSlot = aliasTransferMap.get(unwrappedSlot).second;
PhysicalRelation scan = aliasTransferMap.get(unwrappedSlot).first;
Preconditions.checkState(olapScanSlot != null && scan != null);
if (scan instanceof PhysicalCTEConsumer) {
Set<CTEId> processedCTE = context.getRuntimeFilterContext().getProcessedCTE();
CTEId cteId = ((PhysicalCTEConsumer) scan).getCteId();
if (!processedCTE.contains(cteId)) {
PhysicalCTEProducer cteProducer = context.getRuntimeFilterContext()
.getCteProduceMap().get(cteId);
PhysicalPlan inputPlanNode = (PhysicalPlan) cteProducer.child(0);
// process cte producer self recursively
inputPlanNode.accept(this, context);
processedCTE.add(cteId);
}
} else {
// in-filter is not friendly to pipeline
if (type == TRuntimeFilterType.IN_OR_BLOOM
&& ctx.getSessionVariable().getEnablePipelineEngine()
&& hasRemoteTarget(join, scan)) {
type = TRuntimeFilterType.BLOOM;
}
long buildSideNdv = getBuildSideNdv(join, equalTo);
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
equalTo.right(), ImmutableList.of(olapScanSlot), type, exprOrder, join, buildSideNdv);
ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(aliasTransferMap.get(unwrappedSlot).first.getRelationId(), olapScanSlot);
}
}
private void doPushDownIntoSetOperation(PhysicalHashJoin<? extends Plan, ? extends Plan> join,
RuntimeFilterContext ctx, EqualTo equalTo, TRuntimeFilterType type, int exprOrder) {
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
List<Slot> targetList = new ArrayList<>();
int projIndex = -1;
for (int j = 0; j < join.left().children().size(); j++) {
PhysicalPlan child = (PhysicalPlan) join.left().child(j);
if (child instanceof PhysicalProject) {
PhysicalProject project = (PhysicalProject) child;
Slot leftSlot = checkTargetChild(equalTo.left());
if (leftSlot == null) {
break;
}
for (int k = 0; projIndex < 0 && k < project.getProjects().size(); k++) {
NamedExpression expr = (NamedExpression) project.getProjects().get(k);
if (expr.getName().equals(leftSlot.getName())) {
projIndex = k;
break;
}
}
Preconditions.checkState(projIndex >= 0
&& projIndex < project.getProjects().size());
NamedExpression targetExpr = (NamedExpression) project.getProjects().get(projIndex);
SlotReference origSlot = null;
if (targetExpr instanceof Alias) {
origSlot = (SlotReference) targetExpr.child(0);
} else {
origSlot = (SlotReference) targetExpr;
}
Slot olapScanSlot = aliasTransferMap.get(origSlot).second;
if (!checkCanPushDownFromJoinType(join, ctx, olapScanSlot)) {
continue;
}
PhysicalRelation scan = aliasTransferMap.get(origSlot).first;
if (type == TRuntimeFilterType.IN_OR_BLOOM
&& ctx.getSessionVariable().getEnablePipelineEngine()
&& hasRemoteTarget(join, scan)) {
type = TRuntimeFilterType.BLOOM;
}
targetList.add(olapScanSlot);
ctx.addJoinToTargetMap(join, olapScanSlot.getExprId());
ctx.setTargetsOnScanNode(aliasTransferMap.get(origSlot).first.getRelationId(), olapScanSlot);
}
}
if (!targetList.isEmpty()) {
long buildSideNdv = getBuildSideNdv(join, equalTo);
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
equalTo.right(), targetList, type, exprOrder, join, buildSideNdv);
for (int j = 0; j < targetList.size(); j++) {
ctx.setTargetExprIdToFilter(targetList.get(j).getExprId(), filter);
long buildSideNdv = getBuildSideNdv(join, equalTo);
join.pushDownRuntimeFilter(context, generator, join, equalTo.right(),
equalTo.left(), type, buildSideNdv, i);
}
}
}
@ -580,7 +478,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
Slot unwrappedSlot = checkTargetChild(equalTo.left());
// aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
// contains join with denied join type. for example: a left join b on a.id = b.id
if (!checkCanPushDownFromJoinType(join, ctx, unwrappedSlot)) {
if (!checkPushDownPreconditions(join, ctx, unwrappedSlot)) {
return;
}
Slot cteSlot = aliasTransferMap.get(unwrappedSlot).second;
@ -626,8 +524,11 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
}
}
private boolean checkCanPushDownFromJoinType(AbstractPhysicalJoin physicalJoin,
RuntimeFilterContext ctx, Slot slot) {
/**
* Check runtime filter push down pre-conditions, such as builder side join type, etc.
*/
public static boolean checkPushDownPreconditions(AbstractPhysicalJoin physicalJoin,
RuntimeFilterContext ctx, Slot slot) {
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
if (slot == null || !aliasTransferMap.containsKey(slot)) {
return false;
@ -688,7 +589,47 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
}
}
private boolean hasRemoteTarget(AbstractPlan join, AbstractPlan scan) {
public static boolean isCoveredByPlanNode(PhysicalPlan root, PhysicalRelation relation) {
Set<PhysicalRelation> relations = new HashSet<>();
RuntimeFilterGenerator.getAllScanInfo(root, relations);
return relations.contains(relation);
}
/**
* Get all relation node from current root plan.
*/
public static void getAllScanInfo(PhysicalPlan root, Set<PhysicalRelation> scans) {
if (root instanceof PhysicalRelation) {
scans.add((PhysicalRelation) root);
} else {
for (Object child : root.children()) {
getAllScanInfo((PhysicalPlan) child, scans);
}
}
}
/**
* Check whether plan root contains cte consumer descendant.
*/
public static boolean hasCTEConsumerDescendant(PhysicalPlan root) {
if (root instanceof PhysicalCTEConsumer) {
return true;
} else if (root.children().size() == 1) {
return hasCTEConsumerDescendant((PhysicalPlan) root.child(0));
} else {
for (Object child : root.children()) {
if (hasCTEConsumerDescendant((PhysicalPlan) child)) {
return true;
}
}
return false;
}
}
/**
* Check whether runtime filter target is remote or local
*/
public static boolean hasRemoteTarget(AbstractPlan join, AbstractPlan scan) {
if (scan instanceof PhysicalCTEConsumer) {
return true;
} else {

View File

@ -17,16 +17,30 @@
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.processor.post.RuntimeFilterContext;
import org.apache.doris.nereids.processor.post.RuntimeFilterGenerator;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
@ -57,6 +71,56 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi
return physicalProperties;
}
/**
* Pushing down runtime filter into different plan node, such as olap scan node, cte sender node, etc.
*/
public boolean pushDownRuntimeFilter(CascadesContext context, IdGenerator<RuntimeFilterId> generator,
AbstractPhysicalJoin builderNode,
Expression src, Expression probeExpr,
TRuntimeFilterType type, long buildSideNdv, int exprOrder) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
// currently, we can ensure children in the two side are corresponding to the equal_to's.
// so right maybe an expression and left is a slot
Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr);
// aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
// contains join with denied join type. for example: a left join b on a.id = b.id
if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) {
return false;
}
boolean pushedDown = false;
for (Object child : children) {
AbstractPhysicalPlan childPlan = (AbstractPhysicalPlan) child;
pushedDown |= childPlan.pushDownRuntimeFilter(context, generator, builderNode, src, probeExpr,
type, buildSideNdv, exprOrder);
}
if (pushedDown) {
return true;
}
Slot olapScanSlot = aliasTransferMap.get(probeSlot).second;
PhysicalRelation scan = aliasTransferMap.get(probeSlot).first;
if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) {
return false;
}
Preconditions.checkState(olapScanSlot != null && scan != null);
// in-filter is not friendly to pipeline
if (type == TRuntimeFilterType.IN_OR_BLOOM
&& ctx.getSessionVariable().getEnablePipelineEngine()
&& RuntimeFilterGenerator.hasRemoteTarget(builderNode, scan)) {
type = TRuntimeFilterType.BLOOM;
}
org.apache.doris.nereids.trees.plans.physical.RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
src, ImmutableList.of(olapScanSlot), type, exprOrder, builderNode, buildSideNdv);
ctx.addJoinToTargetMap(builderNode, olapScanSlot.getExprId());
ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(aliasTransferMap.get(probeExpr).first.getRelationId(), olapScanSlot);
return true;
}
@Override
public Plan getExplainPlan(ConnectContext ctx) {
return this;

View File

@ -17,17 +17,22 @@
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.CTEId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
@ -134,4 +139,14 @@ public class PhysicalCTEConsumer extends PhysicalRelation {
return Utils.toSqlString("PhysicalCteConsumer",
"cteId", cteId);
}
@Override
public boolean pushDownRuntimeFilter(CascadesContext context, IdGenerator<RuntimeFilterId> generator,
AbstractPhysicalJoin builderNode,
Expression src, Expression probeExpr,
TRuntimeFilterType type, long buildSideNdv, int exprOrder) {
// TODO: current cte internal pushing down is too complicated and it is not convenient to move the logic here.
// will refine it in the future.
return false;
}
}

View File

@ -17,22 +17,32 @@
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.processor.post.RuntimeFilterContext;
import org.apache.doris.nereids.processor.post.RuntimeFilterGenerator;
import org.apache.doris.nereids.properties.DistributionSpec;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.json.JSONObject;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
@ -118,4 +128,32 @@ public class PhysicalDistribute<CHILD_TYPE extends Plan> extends PhysicalUnary<C
return new PhysicalDistribute<>(distributionSpec, groupExpression,
getLogicalProperties(), physicalProperties, statistics, child());
}
@Override
public boolean pushDownRuntimeFilter(CascadesContext context, IdGenerator<RuntimeFilterId> generator,
AbstractPhysicalJoin builderNode, Expression src, Expression probeExpr,
TRuntimeFilterType type, long buildSideNdv, int exprOrder) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
// currently, we can ensure children in the two side are corresponding to the equal_to's.
// so right maybe an expression and left is a slot
Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr);
// aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
// contains join with denied join type. for example: a left join b on a.id = b.id
if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) {
return false;
}
PhysicalRelation scan = aliasTransferMap.get(probeSlot).first;
if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) {
return false;
}
// TODO: global rf need merge stage which is heavy
// add some rule, such as bc only is allowed for
// pushing down through distribute, currently always pushing.
AbstractPhysicalPlan childPlan = (AbstractPhysicalPlan) child(0);
boolean pushedDown = childPlan.pushDownRuntimeFilter(context, generator, builderNode, src, probeExpr,
type, buildSideNdv, exprOrder);
return pushedDown;
}
}

View File

@ -17,13 +17,19 @@
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.processor.post.RuntimeFilterContext;
import org.apache.doris.nereids.processor.post.RuntimeFilterGenerator;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.properties.RequireProperties;
import org.apache.doris.nereids.properties.RequirePropertiesSupplier;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
import org.apache.doris.nereids.trees.plans.AggMode;
import org.apache.doris.nereids.trees.plans.AggPhase;
@ -32,12 +38,15 @@ import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -283,4 +292,30 @@ public class PhysicalHashAggregate<CHILD_TYPE extends Plan> extends PhysicalUnar
builder.append(getAggPhase()).append("]");
return builder.toString();
}
@Override
public boolean pushDownRuntimeFilter(CascadesContext context, IdGenerator<RuntimeFilterId> generator,
AbstractPhysicalJoin builderNode, Expression src, Expression probeExpr,
TRuntimeFilterType type, long buildSideNdv, int exprOrder) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
// currently, we can ensure children in the two side are corresponding to the equal_to's.
// so right maybe an expression and left is a slot
Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr);
// aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
// contains join with denied join type. for example: a left join b on a.id = b.id
if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) {
return false;
}
PhysicalRelation scan = aliasTransferMap.get(probeSlot).first;
if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) {
return false;
}
AbstractPhysicalPlan child = (AbstractPhysicalPlan) child(0);
boolean pushedDown = child.pushDownRuntimeFilter(context, generator, builderNode,
src, probeExpr, type, buildSideNdv, exprOrder);
return pushedDown;
}
}

View File

@ -17,13 +17,19 @@
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.processor.post.RuntimeFilterContext;
import org.apache.doris.nereids.processor.post.RuntimeFilterGenerator;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.MarkJoinSlotReference;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.JoinType;
@ -31,13 +37,17 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -189,6 +199,73 @@ public class PhysicalHashJoin<
groupExpression, getLogicalProperties(), physicalProperties, statistics, left(), right());
}
@Override
public boolean pushDownRuntimeFilter(CascadesContext context, IdGenerator<RuntimeFilterId> generator,
AbstractPhysicalJoin builderNode, Expression srcExpr, Expression probeExpr,
TRuntimeFilterType type, long buildSideNdv, int exprOrder) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
// if rf built between plan nodes containing cte both, for example both src slot and target slot are from cte,
// or two sub-queries both containing cte, disable this rf since this kind of cross-cte rf will make one side
// of cte to wait for a long time until another side cte consumer finished, which will make the rf into
// not ready state.
AbstractPhysicalPlan builderLeftNode = (AbstractPhysicalPlan) builderNode.child(0);
AbstractPhysicalPlan builderRightNode = (AbstractPhysicalPlan) builderNode.child(1);
Preconditions.checkState(builderLeftNode != null && builderRightNode != null,
"builder join node child node is null");
if (RuntimeFilterGenerator.hasCTEConsumerDescendant(builderLeftNode)
&& RuntimeFilterGenerator.hasCTEConsumerDescendant(builderRightNode)) {
return false;
}
boolean pushedDown = false;
AbstractPhysicalPlan leftNode = (AbstractPhysicalPlan) child(0);
AbstractPhysicalPlan rightNode = (AbstractPhysicalPlan) child(1);
Preconditions.checkState(leftNode != null && rightNode != null,
"join child node is null");
pushedDown |= leftNode.pushDownRuntimeFilter(context, generator, builderNode,
srcExpr, probeExpr, type, buildSideNdv, exprOrder);
pushedDown |= rightNode.pushDownRuntimeFilter(context, generator, builderNode,
srcExpr, probeExpr, type, buildSideNdv, exprOrder);
// currently, we can ensure children in the two side are corresponding to the equal_to's.
// so right maybe an expression and left is a slot
Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr);
// aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
// contains join with denied join type. for example: a left join b on a.id = b.id
if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) {
return false;
}
Slot olapScanSlot = aliasTransferMap.get(probeSlot).second;
PhysicalRelation scan = aliasTransferMap.get(probeSlot).first;
Preconditions.checkState(olapScanSlot != null && scan != null);
if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) {
return false;
}
// TODO: if can't push down into join's chidren, try to
// find possible chance in upper layer
if (pushedDown) {
return true;
}
// in-filter is not friendly to pipeline
if (type == TRuntimeFilterType.IN_OR_BLOOM
&& ctx.getSessionVariable().getEnablePipelineEngine()
&& RuntimeFilterGenerator.hasRemoteTarget(this, scan)) {
type = TRuntimeFilterType.BLOOM;
}
RuntimeFilter filter = new RuntimeFilter(generator.getNextId(),
srcExpr, ImmutableList.of(olapScanSlot), type, exprOrder, this, buildSideNdv);
ctx.addJoinToTargetMap(this, olapScanSlot.getExprId());
ctx.setTargetExprIdToFilter(olapScanSlot.getExprId(), filter);
ctx.setTargetsOnScanNode(aliasTransferMap.get(probeSlot).first.getRelationId(), olapScanSlot);
return true;
}
private class ExprComparator implements Comparator<Expression> {
@Override
public int compare(Expression e1, Expression e2) {

View File

@ -17,22 +17,32 @@
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.processor.post.RuntimeFilterContext;
import org.apache.doris.nereids.processor.post.RuntimeFilterGenerator;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@ -146,4 +156,55 @@ public class PhysicalProject<CHILD_TYPE extends Plan> extends PhysicalUnary<CHIL
child
);
}
@Override
public boolean pushDownRuntimeFilter(CascadesContext context, IdGenerator<RuntimeFilterId> generator,
AbstractPhysicalJoin builderNode, Expression src, Expression probeExpr,
TRuntimeFilterType type, long buildSideNdv, int exprOrder) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
Map<NamedExpression, Pair<PhysicalRelation, Slot>> aliasTransferMap = ctx.getAliasTransferMap();
// currently, we can ensure children in the two side are corresponding to the equal_to's.
// so right maybe an expression and left is a slot
Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr);
// aliasTransMap doesn't contain the key, means that the path from the olap scan to the join
// contains join with denied join type. for example: a left join b on a.id = b.id
if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) {
return false;
}
PhysicalRelation scan = aliasTransferMap.get(probeSlot).first;
if (scan instanceof PhysicalCTEConsumer) {
// update the probeExpr
int projIndex = -1;
for (int i = 0; i < getProjects().size(); i++) {
NamedExpression expr = getProjects().get(i);
if (expr.getName().equals(probeSlot.getName())) {
projIndex = i;
break;
}
}
if (projIndex < 0 || projIndex >= getProjects().size()) {
// the pushed down path can't contain the probe expr
return false;
}
NamedExpression newProbeExpr = this.getProjects().get(projIndex);
if (newProbeExpr instanceof Alias) {
newProbeExpr = (NamedExpression) newProbeExpr.child(0);
}
Slot newProbeSlot = RuntimeFilterGenerator.checkTargetChild(newProbeExpr);
if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, newProbeSlot)) {
return false;
}
scan = aliasTransferMap.get(newProbeSlot).first;
probeExpr = newProbeExpr;
}
if (!RuntimeFilterGenerator.isCoveredByPlanNode(this, scan)) {
return false;
}
AbstractPhysicalPlan child = (AbstractPhysicalPlan) child(0);
boolean pushedDown = child.pushDownRuntimeFilter(context, generator, builderNode,
src, probeExpr, type, buildSideNdv, exprOrder);
return pushedDown;
}
}

View File

@ -17,9 +17,14 @@
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.processor.post.RuntimeFilterContext;
import org.apache.doris.nereids.processor.post.RuntimeFilterGenerator;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
@ -28,7 +33,9 @@ import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.thrift.TRuntimeFilterType;
import com.google.common.collect.ImmutableList;
@ -130,4 +137,44 @@ public abstract class PhysicalSetOperation extends AbstractPhysicalPlan implemen
return children.size();
}
@Override
public boolean pushDownRuntimeFilter(CascadesContext context, IdGenerator<RuntimeFilterId> generator,
AbstractPhysicalJoin builderNode,
Expression src, Expression probeExpr,
TRuntimeFilterType type, long buildSideNdv, int exprOrder) {
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
boolean pushedDown = false;
for (int i = 0; i < this.children().size(); i++) {
AbstractPhysicalPlan child = (AbstractPhysicalPlan) this.child(i);
// TODO: replace this special logic with dynamic handling
if (child instanceof PhysicalDistribute) {
child = (AbstractPhysicalPlan) child.child(0);
}
if (child instanceof PhysicalProject) {
PhysicalProject project = (PhysicalProject) child;
int projIndex = -1;
Slot probeSlot = RuntimeFilterGenerator.checkTargetChild(probeExpr);
if (!RuntimeFilterGenerator.checkPushDownPreconditions(builderNode, ctx, probeSlot)) {
continue;
}
for (int j = 0; j < project.getProjects().size(); j++) {
NamedExpression expr = (NamedExpression) project.getProjects().get(j);
if (expr.getName().equals(probeSlot.getName())) {
projIndex = j;
break;
}
}
if (projIndex < 0 || projIndex >= project.getProjects().size()) {
continue;
}
NamedExpression newProbeExpr = (NamedExpression) project.getProjects().get(projIndex);
if (newProbeExpr instanceof Alias) {
newProbeExpr = (NamedExpression) newProbeExpr.child(0);
}
pushedDown |= child.pushDownRuntimeFilter(context, generator, builderNode, src,
newProbeExpr, type, buildSideNdv, exprOrder);
}
}
return pushedDown;
}
}

View File

@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.UserException;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TScanRangeLocations;
import java.util.List;
/**
* The cte scan node is just a cte consumer wrapper which is convenient for collecting
* cte target information.
*/
public class CTEScanNode extends ScanNode {
private static final PlanNodeId UNINITIAL_PLANNODEID = new PlanNodeId(-1);
public CTEScanNode(TupleDescriptor desc) {
super(UNINITIAL_PLANNODEID, desc, "CTEScanNode", StatisticalType.CTE_SCAN_NODE);
}
public CTEScanNode(PlanNodeId id, TupleDescriptor desc) {
super(id, desc, "CTEScanNode", StatisticalType.CTE_SCAN_NODE);
}
public void setPlanNodeId(PlanNodeId id) {
this.id = id;
}
@Override
protected void toThrift(TPlanNode msg) {
// NO real action to be taken, just a wrapper
}
@Override
protected void createScanRangeLocations() throws UserException {
// NO real action to be taken, just a wrapper
}
@Override
public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
// NO real action to be taken, just a wrapper
return null;
}
}

View File

@ -28,9 +28,11 @@ import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TDataStreamSink;
import org.apache.doris.thrift.TExplainLevel;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
/**
@ -48,6 +50,8 @@ public class DataStreamSink extends DataSink {
protected List<Expr> conjuncts = Lists.newArrayList();
protected List<RuntimeFilter> runtimeFilters = Lists.newArrayList();
public DataStreamSink() {
}
@ -102,6 +106,18 @@ public class DataStreamSink extends DataSink {
this.conjuncts.add(conjunct);
}
public List<RuntimeFilter> getRuntimeFilters() {
return runtimeFilters;
}
public void setRuntimeFilters(List<RuntimeFilter> runtimeFilters) {
this.runtimeFilters = runtimeFilters;
}
public void addRuntimeFilter(RuntimeFilter runtimeFilter) {
this.runtimeFilters.add(runtimeFilter);
}
@Override
public String getExplainString(String prefix, TExplainLevel explainLevel) {
StringBuilder strBuilder = new StringBuilder();
@ -114,6 +130,10 @@ public class DataStreamSink extends DataSink {
Expr expr = PlanNode.convertConjunctsToAndCompoundPredicate(conjuncts);
strBuilder.append(prefix).append(" CONJUNCTS: ").append(expr.toSql()).append("\n");
}
if (!runtimeFilters.isEmpty()) {
strBuilder.append(prefix).append(" runtime filters: ");
strBuilder.append(getRuntimeFilterExplainString(false, false));
}
if (!CollectionUtils.isEmpty(projections)) {
strBuilder.append(prefix).append(" PROJECTIONS: ")
.append(PlanNode.getExplainString(projections)).append("\n");
@ -124,6 +144,34 @@ public class DataStreamSink extends DataSink {
return strBuilder.toString();
}
protected String getRuntimeFilterExplainString(boolean isBuildNode, boolean isBrief) {
if (runtimeFilters.isEmpty()) {
return "";
}
List<String> filtersStr = new ArrayList<>();
for (RuntimeFilter filter : runtimeFilters) {
StringBuilder filterStr = new StringBuilder();
filterStr.append(filter.getFilterId());
if (!isBrief) {
filterStr.append("[");
filterStr.append(filter.getType().toString().toLowerCase());
filterStr.append("]");
if (isBuildNode) {
filterStr.append(" <- ");
filterStr.append(filter.getSrcExpr().toSql());
filterStr.append("(").append(filter.getEstimateNdv()).append("/")
.append(filter.getExpectFilterSizeBytes()).append("/")
.append(filter.getFilterSizeBytes()).append(")");
} else {
filterStr.append(" -> ");
filterStr.append(filter.getTargetExpr(getExchNodeId()).toSql());
}
}
filtersStr.add(filterStr.toString());
}
return Joiner.on(", ").join(filtersStr) + "\n";
}
@Override
protected TDataSink toThrift() {
TDataSink result = new TDataSink(TDataSinkType.DATA_STREAM_SINK);
@ -142,6 +190,12 @@ public class DataStreamSink extends DataSink {
if (outputTupleDesc != null) {
tStreamSink.setOutputTupleId(outputTupleDesc.getId().asInt());
}
if (runtimeFilters != null) {
for (RuntimeFilter rf : runtimeFilters) {
tStreamSink.addToRuntimeFilters(rf.toThrift());
}
}
result.setStreamSink(tStreamSink);
return result;
}

View File

@ -747,7 +747,7 @@ public class HashJoinNode extends JoinNodeBase {
output.append(detailPrefix).append(
String.format("cardinality=%,d", cardinality)).append("\n");
if (!runtimeFilters.isEmpty()) {
output.append(detailPrefix).append("Build RFs: ");
output.append(detailPrefix).append("runtime filters: ");
output.append(getRuntimeFilterExplainString(true, true));
}
return output.toString();

View File

@ -33,7 +33,6 @@ import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TPlanFragment;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
@ -171,8 +170,8 @@ public class PlanFragment extends TreeNode<PlanFragment> {
public PlanFragment(PlanFragmentId id, PlanNode root, DataPartition partition,
Set<RuntimeFilterId> builderRuntimeFilterIds, Set<RuntimeFilterId> targetRuntimeFilterIds) {
this(id, root, partition);
this.builderRuntimeFilterIds = ImmutableSet.copyOf(builderRuntimeFilterIds);
this.targetRuntimeFilterIds = ImmutableSet.copyOf(targetRuntimeFilterIds);
this.builderRuntimeFilterIds = new HashSet<>(builderRuntimeFilterIds);
this.targetRuntimeFilterIds = new HashSet<>(targetRuntimeFilterIds);
}
/**

View File

@ -123,7 +123,8 @@ public final class RuntimeFilter {
public RuntimeFilterTarget(ScanNode targetNode, Expr targetExpr,
boolean isBoundByKeyColumns, boolean isLocalTarget) {
Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds()));
Preconditions.checkState(targetExpr.isBoundByTupleIds(targetNode.getTupleIds())
|| targetNode instanceof CTEScanNode);
this.node = targetNode;
this.expr = targetExpr;
this.isBoundByKeyColumns = isBoundByKeyColumns;

View File

@ -22,6 +22,7 @@ public enum StatisticalType {
AGG_NODE,
ANALYTIC_EVAL_NODE,
ASSERT_NUM_ROWS_NODE,
CTE_SCAN_NODE,
BROKER_SCAN_NODE,
NESTED_LOOP_JOIN_NODE,
EMPTY_SET_NODE,