From 332b9cb6199bcfd533a7863bf2315e579f45c330 Mon Sep 17 00:00:00 2001 From: minghong Date: Mon, 22 Jan 2024 09:22:19 +0800 Subject: [PATCH] [opt](nereids) do not change RuntimeFilter Type from IN-OR_BLOOM to BLOOM on broadcast join (#30148) 1. do not change RuntimeFilter Type from IN-OR_BLOOM to BLOOM on broadcast join tpcds1T, q48 improved from 4.x sec to 1.x sec 2. skip some redunant runtime filter example: A join B on A.a1=B.b and A.a1 = A.a2 RF B.b->(A.a1, A.a2) however, RF(B.b->A.a2) is implied by RF(B.a->A.a1) and A.a1=A.a2 we skip RF(B.b->A.a2) Issue Number: close #xxx --- .../translator/RuntimeFilterTranslator.java | 6 ---- .../post/RuntimeFilterGenerator.java | 2 +- .../plans/physical/AbstractPhysicalJoin.java | 15 ++++++++++ .../plans/physical/AbstractPhysicalPlan.java | 21 +++++++++----- .../trees/plans/physical/RuntimeFilter.java | 4 +++ .../rf_prune/query64.out | 16 +++++------ .../shape/query64.out | 28 +++++++++---------- 7 files changed, 56 insertions(+), 36 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java index 91f9790609..30a69ff97d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/RuntimeFilterTranslator.java @@ -38,7 +38,6 @@ import org.apache.doris.planner.JoinNodeBase; import org.apache.doris.planner.RuntimeFilter.RuntimeFilterTarget; import org.apache.doris.planner.ScanNode; import org.apache.doris.qe.ConnectContext; -import org.apache.doris.qe.SessionVariable; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.thrift.TRuntimeFilterType; @@ -185,11 +184,6 @@ public class RuntimeFilterTranslator { origFilter.markFinalized(); origFilter.assignToPlanNodes(); origFilter.extractTargetsPosition(); - // Number of parallel instances are large for pipeline engine, so we prefer bloom filter. - if (origFilter.hasRemoteTargets() && origFilter.getType() == TRuntimeFilterType.IN_OR_BLOOM - && SessionVariable.enablePipelineEngine()) { - origFilter.setType(TRuntimeFilterType.BLOOM); - } return origFilter; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java index 76d189ba63..8a75ad36c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterGenerator.java @@ -567,7 +567,7 @@ public class RuntimeFilterGenerator extends PlanPostProcessor { PhysicalHashJoin join = innerEntry.getValue(); Preconditions.checkState(join != null); TRuntimeFilterType type = TRuntimeFilterType.IN_OR_BLOOM; - if (ctx.getSessionVariable().getEnablePipelineEngine()) { + if (ctx.getSessionVariable().getEnablePipelineEngine() && !join.isBroadCastJoin()) { type = TRuntimeFilterType.BLOOM; } EqualTo newEqualTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder( diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java index 16ba68aac6..7d1c65b589 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalJoin.java @@ -19,6 +19,8 @@ package org.apache.doris.nereids.trees.plans.physical; import org.apache.doris.nereids.hint.DistributeHint; import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DistributionSpec; +import org.apache.doris.nereids.properties.DistributionSpecReplicated; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; @@ -251,4 +253,17 @@ public abstract class AbstractPhysicalJoin< return Utils.toSqlString(this.getClass().getSimpleName() + "[" + id.asInt() + "]" + getGroupIdWithPrefix(), args.toArray()); } + + /** + * true if this is a broadcast join + */ + public boolean isBroadCastJoin() { + if (child(1) instanceof PhysicalDistribute) { + DistributionSpec distSpec = ((PhysicalDistribute) child(1)).getDistributionSpec(); + if (distSpec instanceof DistributionSpecReplicated) { + return true; + } + } + return false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java index a2968ca808..6a19abcc8f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/AbstractPhysicalPlan.java @@ -112,19 +112,26 @@ public abstract class AbstractPhysicalPlan extends AbstractPlan implements Physi // in-filter is not friendly to pipeline if (type == TRuntimeFilterType.IN_OR_BLOOM && ctx.getSessionVariable().getEnablePipelineEngine() - && RuntimeFilterGenerator.hasRemoteTarget(builderNode, scan)) { + && RuntimeFilterGenerator.hasRemoteTarget(builderNode, scan) + && !builderNode.isBroadCastJoin()) { type = TRuntimeFilterType.BLOOM; } org.apache.doris.nereids.trees.plans.physical.RuntimeFilter filter = ctx.getRuntimeFilterBySrcAndType(src, type, builderNode); Preconditions.checkState(scanSlot != null, "scan slot is null"); if (filter != null) { - this.addAppliedRuntimeFilter(filter); - filter.addTargetSlot(scanSlot, scan); - filter.addTargetExpression(scanSlot); - ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId()); - ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter); - ctx.setTargetsOnScanNode(ctx.getAliasTransferPair((NamedExpression) probeExpr).first, scanSlot); + if (!filter.hasTargetScan(scan)) { + // A join B on A.a1=B.b and A.a1 = A.a2 + // RF B.b->(A.a1, A.a2) + // however, RF(B.b->A.a2) is implied by RF(B.a->A.a1) and A.a1=A.a2 + // we skip RF(B.b->A.a2) + this.addAppliedRuntimeFilter(filter); + filter.addTargetSlot(scanSlot, scan); + filter.addTargetExpression(scanSlot); + ctx.addJoinToTargetMap(builderNode, scanSlot.getExprId()); + ctx.setTargetExprIdToFilter(scanSlot.getExprId(), filter); + ctx.setTargetsOnScanNode(ctx.getAliasTransferPair((NamedExpression) probeExpr).first, scanSlot); + } } else { filter = new RuntimeFilter(generator.getNextId(), src, ImmutableList.of(scanSlot), type, exprOrder, builderNode, buildSideNdv, scan); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java index a8f4c3cd7c..3a3b01daec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/RuntimeFilter.java @@ -148,6 +148,10 @@ public class RuntimeFilter { return targetScans; } + public boolean hasTargetScan(PhysicalRelation scan) { + return targetScans.contains(scan); + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query64.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query64.out index e5e915d442..367d659e25 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query64.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/rf_prune/query64.out @@ -16,22 +16,22 @@ PhysicalCteAnchor ( cteId=CTEId#1 ) --------------------------PhysicalProject ----------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk)) otherCondition=() ------------------------------PhysicalProject ---------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_addr_sk = ad1.ca_address_sk)) otherCondition=() build RFs:RF16 ss_addr_sk->[ca_address_sk] +--------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_addr_sk = ad1.ca_address_sk)) otherCondition=() build RFs:RF15 ss_addr_sk->[ca_address_sk] ----------------------------------PhysicalProject -------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF16 +------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF15 ----------------------------------PhysicalDistribute[DistributionSpecHash] ------------------------------------PhysicalProject ---------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF14 ss_item_sk->[sr_item_sk];RF15 ss_ticket_number->[sr_ticket_number] +--------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF13 ss_item_sk->[sr_item_sk];RF14 ss_ticket_number->[sr_ticket_number] ----------------------------------------PhysicalProject -------------------------------------------PhysicalOlapScan[store_returns] apply RFs: RF14 RF15 +------------------------------------------PhysicalOlapScan[store_returns] apply RFs: RF13 RF14 ----------------------------------------PhysicalDistribute[DistributionSpecHash] ------------------------------------------PhysicalProject ---------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_cdemo_sk = cd1.cd_demo_sk)) otherCondition=() build RFs:RF13 ss_cdemo_sk->[cd_demo_sk] +--------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_cdemo_sk = cd1.cd_demo_sk)) otherCondition=() build RFs:RF12 ss_cdemo_sk->[cd_demo_sk] ----------------------------------------------PhysicalDistribute[DistributionSpecHash] ------------------------------------------------PhysicalProject ---------------------------------------------------PhysicalOlapScan[customer_demographics] apply RFs: RF13 +--------------------------------------------------PhysicalOlapScan[customer_demographics] apply RFs: RF12 ----------------------------------------------PhysicalDistribute[DistributionSpecHash] -------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF11 i_item_sk->[ss_item_sk];RF12 i_item_sk->[cs_item_sk] +------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF11 i_item_sk->[ss_item_sk,cs_item_sk] --------------------------------------------------PhysicalProject ----------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_store_sk = store.s_store_sk)) otherCondition=() ------------------------------------------------------PhysicalProject @@ -53,7 +53,7 @@ PhysicalCteAnchor ( cteId=CTEId#1 ) ----------------------------------------------------------------------------------PhysicalProject ------------------------------------------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((catalog_sales.cs_item_sk = catalog_returns.cr_item_sk) and (catalog_sales.cs_order_number = catalog_returns.cr_order_number)) otherCondition=() build RFs:RF4 cr_order_number->[cs_order_number];RF5 cr_item_sk->[cs_item_sk] --------------------------------------------------------------------------------------PhysicalProject -----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF4 RF5 RF12 +----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF4 RF5 RF11 --------------------------------------------------------------------------------------PhysicalProject ----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_returns] ------------------------------------------------------------------PhysicalDistribute[DistributionSpecReplicated] diff --git a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query64.out b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query64.out index 43bdb50fcc..d40500b774 100644 --- a/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query64.out +++ b/regression-test/data/nereids_tpcds_shape_sf100_p0/shape/query64.out @@ -7,31 +7,31 @@ PhysicalCteAnchor ( cteId=CTEId#1 ) --------PhysicalDistribute[DistributionSpecHash] ----------hashAgg[LOCAL] ------------PhysicalProject ---------------hashJoin[INNER_JOIN] hashCondition=((customer.c_first_shipto_date_sk = d3.d_date_sk)) otherCondition=() build RFs:RF20 d_date_sk->[c_first_shipto_date_sk] +--------------hashJoin[INNER_JOIN] hashCondition=((customer.c_first_shipto_date_sk = d3.d_date_sk)) otherCondition=() build RFs:RF19 d_date_sk->[c_first_shipto_date_sk] ----------------PhysicalProject -------------------hashJoin[INNER_JOIN] hashCondition=((customer.c_first_sales_date_sk = d2.d_date_sk)) otherCondition=() build RFs:RF19 d_date_sk->[c_first_sales_date_sk] +------------------hashJoin[INNER_JOIN] hashCondition=((customer.c_first_sales_date_sk = d2.d_date_sk)) otherCondition=() build RFs:RF18 d_date_sk->[c_first_sales_date_sk] --------------------PhysicalProject -----------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) otherCondition=(( not (cd_marital_status = cd_marital_status))) build RFs:RF18 c_customer_sk->[ss_customer_sk] +----------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_customer_sk = customer.c_customer_sk)) otherCondition=(( not (cd_marital_status = cd_marital_status))) build RFs:RF17 c_customer_sk->[ss_customer_sk] ------------------------PhysicalDistribute[DistributionSpecHash] --------------------------PhysicalProject -----------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk)) otherCondition=() build RFs:RF17 p_promo_sk->[ss_promo_sk] +----------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_promo_sk = promotion.p_promo_sk)) otherCondition=() build RFs:RF16 p_promo_sk->[ss_promo_sk] ------------------------------PhysicalProject ---------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_addr_sk = ad1.ca_address_sk)) otherCondition=() build RFs:RF16 ss_addr_sk->[ca_address_sk] +--------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_addr_sk = ad1.ca_address_sk)) otherCondition=() build RFs:RF15 ss_addr_sk->[ca_address_sk] ----------------------------------PhysicalProject -------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF16 +------------------------------------PhysicalOlapScan[customer_address] apply RFs: RF15 ----------------------------------PhysicalDistribute[DistributionSpecHash] ------------------------------------PhysicalProject ---------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF14 ss_item_sk->[sr_item_sk];RF15 ss_ticket_number->[sr_ticket_number] +--------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = store_returns.sr_item_sk) and (store_sales.ss_ticket_number = store_returns.sr_ticket_number)) otherCondition=() build RFs:RF13 ss_item_sk->[sr_item_sk];RF14 ss_ticket_number->[sr_ticket_number] ----------------------------------------PhysicalProject -------------------------------------------PhysicalOlapScan[store_returns] apply RFs: RF14 RF15 +------------------------------------------PhysicalOlapScan[store_returns] apply RFs: RF13 RF14 ----------------------------------------PhysicalDistribute[DistributionSpecHash] ------------------------------------------PhysicalProject ---------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_cdemo_sk = cd1.cd_demo_sk)) otherCondition=() build RFs:RF13 ss_cdemo_sk->[cd_demo_sk] +--------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_cdemo_sk = cd1.cd_demo_sk)) otherCondition=() build RFs:RF12 ss_cdemo_sk->[cd_demo_sk] ----------------------------------------------PhysicalDistribute[DistributionSpecHash] ------------------------------------------------PhysicalProject ---------------------------------------------------PhysicalOlapScan[customer_demographics] apply RFs: RF13 +--------------------------------------------------PhysicalOlapScan[customer_demographics] apply RFs: RF12 ----------------------------------------------PhysicalDistribute[DistributionSpecHash] -------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF11 i_item_sk->[ss_item_sk];RF12 i_item_sk->[cs_item_sk] +------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = item.i_item_sk)) otherCondition=() build RFs:RF11 i_item_sk->[ss_item_sk,cs_item_sk] --------------------------------------------------PhysicalProject ----------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_store_sk = store.s_store_sk)) otherCondition=() build RFs:RF10 s_store_sk->[ss_store_sk] ------------------------------------------------------PhysicalProject @@ -43,7 +43,7 @@ PhysicalCteAnchor ( cteId=CTEId#1 ) ------------------------------------------------------------------PhysicalProject --------------------------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((store_sales.ss_item_sk = cs_ui.cs_item_sk)) otherCondition=() build RFs:RF6 cs_item_sk->[ss_item_sk] ----------------------------------------------------------------------PhysicalProject -------------------------------------------------------------------------PhysicalOlapScan[store_sales] apply RFs: RF6 RF7 RF8 RF10 RF11 RF17 RF18 +------------------------------------------------------------------------PhysicalOlapScan[store_sales] apply RFs: RF6 RF7 RF8 RF10 RF11 RF16 RF17 ----------------------------------------------------------------------PhysicalDistribute[DistributionSpecReplicated] ------------------------------------------------------------------------PhysicalProject --------------------------------------------------------------------------filter((sale > (2 * refund))) @@ -53,7 +53,7 @@ PhysicalCteAnchor ( cteId=CTEId#1 ) ----------------------------------------------------------------------------------PhysicalProject ------------------------------------------------------------------------------------hashJoin[INNER_JOIN] hashCondition=((catalog_sales.cs_item_sk = catalog_returns.cr_item_sk) and (catalog_sales.cs_order_number = catalog_returns.cr_order_number)) otherCondition=() build RFs:RF4 cr_order_number->[cs_order_number];RF5 cr_item_sk->[cs_item_sk] --------------------------------------------------------------------------------------PhysicalProject -----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF4 RF5 RF12 +----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_sales] apply RFs: RF4 RF5 RF11 --------------------------------------------------------------------------------------PhysicalProject ----------------------------------------------------------------------------------------PhysicalOlapScan[catalog_returns] ------------------------------------------------------------------PhysicalDistribute[DistributionSpecReplicated] @@ -86,7 +86,7 @@ PhysicalCteAnchor ( cteId=CTEId#1 ) --------------------------------------PhysicalProject ----------------------------------------hashJoin[INNER_JOIN] hashCondition=((customer.c_current_hdemo_sk = hd2.hd_demo_sk)) otherCondition=() build RFs:RF1 hd_demo_sk->[c_current_hdemo_sk] ------------------------------------------PhysicalProject ---------------------------------------------PhysicalOlapScan[customer] apply RFs: RF1 RF2 RF3 RF19 RF20 +--------------------------------------------PhysicalOlapScan[customer] apply RFs: RF1 RF2 RF3 RF18 RF19 ------------------------------------------PhysicalDistribute[DistributionSpecReplicated] --------------------------------------------PhysicalProject ----------------------------------------------hashJoin[INNER_JOIN] hashCondition=((hd2.hd_income_band_sk = ib2.ib_income_band_sk)) otherCondition=() build RFs:RF0 ib_income_band_sk->[hd_income_band_sk]