From d8f1b77cc15dfb9e68fc33336e3cf1b45df7dea2 Mon Sep 17 00:00:00 2001 From: zhengshiJ <32082872+zhengshiJ@users.noreply.github.com> Date: Sun, 22 May 2022 21:35:32 +0800 Subject: [PATCH] [improvement](planner) Backfill the original predicate pushdown code (#9703) Due to the current architecture, predicate derivation at rewrite cannot satisfy all cases, because rewrite is performed on first and then where, and when there are subqueries, all cases cannot be derived. So keep the predicate pushdown method here. eg. select * from t1 left join t2 on t1 = t2 where t1 = 1; InferFiltersRule can't infer t2 = 1, because this is out of specification. The expression(t2 = 1) can actually be deduced to push it down to the scan node. --- .../org/apache/doris/analysis/Analyzer.java | 39 +++-- .../doris/planner/PredicatePushDown.java | 151 ++++++++++++++++++ .../doris/planner/SingleNodePlanner.java | 3 + .../doris/rewrite/InferFiltersRule.java | 8 + .../doris/rewrite/InferFiltersRuleTest.java | 23 ++- 5 files changed, 213 insertions(+), 11 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index 0232f1178f..a5a18d0d3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -267,31 +267,37 @@ public class Analyzer { public final Map, JoinOperator> anyTwoTalesJoinOperator = Maps.newHashMap(); // slotEqSlotExpr: Record existing and infer equivalent connections - public final List onSlotEqSlotExpr = new ArrayList<>(); + private final List onSlotEqSlotExpr = new ArrayList<>(); // slotEqSlotDeDuplication: De-Duplication for slotEqSlotExpr - public final Set> onSlotEqSlotDeDuplication = Sets.newHashSet(); + private final Set> onSlotEqSlotDeDuplication = Sets.newHashSet(); // slotToLiteralExpr: Record existing and infer expr which slot and literal are equal - public final List onSlotToLiteralExpr = new ArrayList<>(); + private final List onSlotToLiteralExpr = new ArrayList<>(); // slotToLiteralDeDuplication: De-Duplication for slotToLiteralExpr - public final Set> onSlotToLiteralDeDuplication = Sets.newHashSet(); + private final Set> onSlotToLiteralDeDuplication = Sets.newHashSet(); // inExpr: Recoud existing and infer expr which in predicate - public final List onInExpr = new ArrayList<>(); + private final List onInExpr = new ArrayList<>(); // inExprDeDuplication: De-Duplication for inExpr - public final Set onInDeDuplication = Sets.newHashSet(); + private final Set onInDeDuplication = Sets.newHashSet(); // isNullExpr: Record existing and infer not null predicate - public final List onIsNullExpr = new ArrayList<>(); + private final List onIsNullExpr = new ArrayList<>(); //isNullDeDuplication: De-Duplication for isNullExpr - public final Set onIsNullDeDuplication = Sets.newHashSet(); + private final Set onIsNullDeDuplication = Sets.newHashSet(); + + // slotToLiteralDeDuplication: De-Duplication for slotToLiteralExpr. Contain on and where. + private final Set> globalSlotToLiteralDeDuplication = Sets.newHashSet(); + + // inExprDeDuplication: De-Duplication for inExpr. Contain on and where + private final Set globalInDeDuplication = Sets.newHashSet(); // map from slot id to the analyzer/block in which it was registered - public final Map blockBySlot = Maps.newHashMap(); + private final Map blockBySlot = Maps.newHashMap(); // Expr rewriter for normalizing and rewriting expressions. private final ExprRewriter exprRewriter; @@ -997,6 +1003,14 @@ public class Analyzer { globalState.onIsNullDeDuplication.add(expr); } + public void registerGlobalSlotToLiteralDeDuplication(Pair pair) { + globalState.globalSlotToLiteralDeDuplication.add(pair); + } + + public void registerGlobalInDeDuplication(Expr expr) { + globalState.globalInDeDuplication.add(expr); + } + public void registerConjunct(Expr e, TupleId tupleId) throws AnalysisException { final List exprs = Lists.newArrayList(); exprs.add(e); @@ -1462,6 +1476,13 @@ public class Analyzer { return Sets.newHashSet(globalState.onIsNullDeDuplication); } + public Set> getGlobalSlotToLiteralDeDuplication() { + return Sets.newHashSet(globalState.globalSlotToLiteralDeDuplication); + } + + public Set getGlobalInDeDuplication() { + return Sets.newHashSet(globalState.globalInDeDuplication); + } /** * Makes the given semi-joined tuple visible such that its slots can be referenced. * If tid is null, makes the currently visible semi-joined tuple invisible again. diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java new file mode 100644 index 0000000000..76c2ace45f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.JoinOperator; +import org.apache.doris.analysis.Predicate; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.common.Pair; + +import org.apache.directory.api.util.Strings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +/** + * Due to the current architecture, predicate derivation at rewrite cannot satisfy all cases, + * because rewrite is performed on first and then where, and when there are subqueries, all cases cannot be derived. + * So keep the predicate pushdown method here. + * + *

+ * eg: + * origin: select * from t1 left join t2 on t1 = t2 where t1 = 1; + * after: The function will be derived t2 = 1 + *

+ * + */ +public class PredicatePushDown { + private static final Logger LOG = LogManager.getLogger(PredicatePushDown.class); + + /** + * Desc: Predicate pushdown for inner and left join. + + * @param scanNode ScanNode to be judged + * @param joinOp join Operator + * @param analyzer global context + * @return {@link PlanNode} + */ + public static PlanNode visitScanNode(ScanNode scanNode, JoinOperator joinOp, Analyzer analyzer) { + switch (joinOp) { + case INNER_JOIN: + case LEFT_OUTER_JOIN: + predicateFromLeftSidePropagatesToRightSide(scanNode, analyzer); + break; + // TODO + default: + break; + } + return scanNode; + } + + private static void predicateFromLeftSidePropagatesToRightSide(ScanNode scanNode, Analyzer analyzer) { + List tupleIdList = scanNode.getTupleIds(); + if (tupleIdList.size() != 1) { + LOG.info("The predicate pushdown is not reflected " + + "because the scan node involves more then one tuple:{}", + Strings.listToString(tupleIdList)); + return; + } + TupleId rightSideTuple = tupleIdList.get(0); + List unassignedRightSideConjuncts = analyzer.getUnassignedConjuncts(scanNode); + List eqJoinPredicates = analyzer.getEqJoinConjuncts(rightSideTuple); + if (eqJoinPredicates != null) { + List allConjuncts = analyzer.getConjuncts(analyzer.getAllTupleIds()); + allConjuncts.removeAll(unassignedRightSideConjuncts); + for (Expr conjunct : allConjuncts) { + if (!Predicate.canPushDownPredicate(conjunct)) { + continue; + } + for (Expr eqJoinPredicate : eqJoinPredicates) { + // we can ensure slot is left node, because NormalizeBinaryPredicatesRule + SlotRef otherSlot = conjunct.getChild(0).unwrapSlotRef(); + + // ensure the children for eqJoinPredicate both be SlotRef + if (eqJoinPredicate.getChild(0).unwrapSlotRef() == null + || eqJoinPredicate.getChild(1).unwrapSlotRef() == null) { + continue; + } + + SlotRef leftSlot = eqJoinPredicate.getChild(0).unwrapSlotRef(); + SlotRef rightSlot = eqJoinPredicate.getChild(1).unwrapSlotRef(); + // ensure the type is match + if (!leftSlot.getDesc().getType().matchesType(rightSlot.getDesc().getType())) { + continue; + } + + // example: t1.id = t2.id and t1.id = 1 => t2.id =1 + if (otherSlot.isBound(leftSlot.getSlotId()) + && rightSlot.isBound(rightSideTuple)) { + Expr pushDownConjunct = rewritePredicate(analyzer, conjunct, rightSlot); + LOG.debug("pushDownConjunct: {}", pushDownConjunct); + if (!analyzer.getGlobalInDeDuplication().contains(pushDownConjunct) + && !analyzer.getGlobalSlotToLiteralDeDuplication() + .contains(new Pair<>(pushDownConjunct.getChild(0), pushDownConjunct.getChild(1)))) { + scanNode.addConjunct(pushDownConjunct); + } + } else if (otherSlot.isBound(rightSlot.getSlotId()) + && leftSlot.isBound(rightSideTuple)) { + Expr pushDownConjunct = rewritePredicate(analyzer, conjunct, leftSlot); + LOG.debug("pushDownConjunct: {}", pushDownConjunct); + if (!analyzer.getGlobalInDeDuplication().contains(pushDownConjunct) + && !analyzer.getGlobalSlotToLiteralDeDuplication() + .contains(new Pair<>(pushDownConjunct.getChild(0), pushDownConjunct.getChild(1)))) { + scanNode.addConjunct(pushDownConjunct); + } + } + } + } + } + } + + // Rewrite the oldPredicate with new leftChild + // For example: oldPredicate is t1.id = 1, leftChild is t2.id, will return t2.id = 1 + private static Expr rewritePredicate(Analyzer analyzer, Expr oldPredicate, Expr leftChild) { + if (oldPredicate instanceof BinaryPredicate) { + BinaryPredicate oldBP = (BinaryPredicate) oldPredicate; + BinaryPredicate bp = new BinaryPredicate(oldBP.getOp(), leftChild, oldBP.getChild(1)); + bp.analyzeNoThrow(analyzer); + return bp; + } + + if (oldPredicate instanceof InPredicate) { + InPredicate oldIP = (InPredicate) oldPredicate; + InPredicate ip = new InPredicate(leftChild, oldIP.getListChildren(), oldIP.isNotIn()); + ip.analyzeNoThrow(analyzer); + return ip; + } + + return oldPredicate; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 8d125c1204..b8ac8e7d7b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1712,6 +1712,9 @@ public class SingleNodePlanner { break; } if (scanNode instanceof OlapScanNode || scanNode instanceof EsScanNode || scanNode instanceof HiveScanNode) { + if (analyzer.enableInferPredicate()) { + PredicatePushDown.visitScanNode(scanNode, tblRef.getJoinOp(), analyzer); + } scanNode.setSortColumn(tblRef.getSortColumn()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java b/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java index 17be70c6e4..edfae20905 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java @@ -180,6 +180,7 @@ public class InferFiltersRule implements ExprRewriteRule { analyzer.registerOnSlotToLiteralDeDuplication(pair); analyzer.registerOnSlotToLiteralExpr(conjunct); } + analyzer.registerGlobalSlotToLiteralDeDuplication(pair); } } else if (conjunct.getChild(0).unwrapSlotRef() instanceof SlotRef && conjunct.getChild(1).unwrapSlotRef() instanceof SlotRef) { @@ -219,6 +220,7 @@ public class InferFiltersRule implements ExprRewriteRule { analyzer.registerInExpr(conjunct); analyzer.registerInDeDuplication(conjunct.getChild(0).unwrapSlotRef()); } + analyzer.registerGlobalInDeDuplication(conjunct.getChild(0).unwrapSlotRef()); } } } @@ -497,6 +499,9 @@ public class InferFiltersRule implements ExprRewriteRule { analyzer.registerOnSlotToLiteralDeDuplication(pair); analyzer.registerOnSlotToLiteralExpr(newBP); } + if (needAddnewExprWithState) { + analyzer.registerGlobalSlotToLiteralDeDuplication(pair); + } } } } @@ -665,6 +670,9 @@ public class InferFiltersRule implements ExprRewriteRule { analyzer.registerInDeDuplication(newIP); analyzer.registerInExpr(newIP); } + if (needAddnewExprWithState) { + analyzer.registerGlobalInDeDuplication(newIP); + } } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/rewrite/InferFiltersRuleTest.java b/fe/fe-core/src/test/java/org/apache/doris/rewrite/InferFiltersRuleTest.java index 0a402bc2e2..b71b36452f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/rewrite/InferFiltersRuleTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/rewrite/InferFiltersRuleTest.java @@ -298,7 +298,7 @@ public class InferFiltersRuleTest { } @Test - public void testOnAndWhere2TablesLeftJoin() throws Exception { + public void testOnAndWhere2TablesLeftJoin2ndIsLiteral() throws Exception { SessionVariable sessionVariable = dorisAssert.getSessionVariable(); sessionVariable.setEnableInferPredicate(true); Assert.assertTrue(sessionVariable.isEnableInferPredicate()); @@ -308,7 +308,7 @@ public class InferFiltersRuleTest { } @Test - public void testOnAndWhere2TablesInnerJoin() throws Exception { + public void testOnAndWhere2TablesInnerJoin2ndIsLiteral() throws Exception { SessionVariable sessionVariable = dorisAssert.getSessionVariable(); sessionVariable.setEnableInferPredicate(true); Assert.assertTrue(sessionVariable.isEnableInferPredicate()); @@ -317,4 +317,23 @@ public class InferFiltersRuleTest { Assert.assertTrue(planString.contains("`tb1`.`k1` = 1")); } + @Test + public void testOnAndWhere2TableLeftJoin1stIsLiteral() throws Exception { + SessionVariable sessionVariable = dorisAssert.getSessionVariable(); + sessionVariable.setEnableInferPredicate(true); + Assert.assertTrue(sessionVariable.isEnableInferPredicate()); + String query = "select * from tb1 left join tb2 on tb1.k1 = tb2.k1 where tb1.k1 = 1"; + String planString = dorisAssert.query(query).explainQuery(); + Assert.assertTrue(planString.contains("`tb2`.`k1` = 1")); + } + + @Test + public void testOnAndWhere2TablesInnerJoin1stIsLiteral() throws Exception { + SessionVariable sessionVariable = dorisAssert.getSessionVariable(); + sessionVariable.setEnableInferPredicate(true); + Assert.assertTrue(sessionVariable.isEnableInferPredicate()); + String query = "select * from tb1 inner join tb2 on tb1.k1 = tb2.k1 where tb1.k1 = 1"; + String planString = dorisAssert.query(query).explainQuery(); + Assert.assertTrue(planString.contains("`tb2`.`k1` = 1")); + } }