diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index 0232f1178f..a5a18d0d3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -267,31 +267,37 @@ public class Analyzer { public final Map, JoinOperator> anyTwoTalesJoinOperator = Maps.newHashMap(); // slotEqSlotExpr: Record existing and infer equivalent connections - public final List onSlotEqSlotExpr = new ArrayList<>(); + private final List onSlotEqSlotExpr = new ArrayList<>(); // slotEqSlotDeDuplication: De-Duplication for slotEqSlotExpr - public final Set> onSlotEqSlotDeDuplication = Sets.newHashSet(); + private final Set> onSlotEqSlotDeDuplication = Sets.newHashSet(); // slotToLiteralExpr: Record existing and infer expr which slot and literal are equal - public final List onSlotToLiteralExpr = new ArrayList<>(); + private final List onSlotToLiteralExpr = new ArrayList<>(); // slotToLiteralDeDuplication: De-Duplication for slotToLiteralExpr - public final Set> onSlotToLiteralDeDuplication = Sets.newHashSet(); + private final Set> onSlotToLiteralDeDuplication = Sets.newHashSet(); // inExpr: Recoud existing and infer expr which in predicate - public final List onInExpr = new ArrayList<>(); + private final List onInExpr = new ArrayList<>(); // inExprDeDuplication: De-Duplication for inExpr - public final Set onInDeDuplication = Sets.newHashSet(); + private final Set onInDeDuplication = Sets.newHashSet(); // isNullExpr: Record existing and infer not null predicate - public final List onIsNullExpr = new ArrayList<>(); + private final List onIsNullExpr = new ArrayList<>(); //isNullDeDuplication: De-Duplication for isNullExpr - public final Set onIsNullDeDuplication = Sets.newHashSet(); + private final Set onIsNullDeDuplication = Sets.newHashSet(); + + // slotToLiteralDeDuplication: De-Duplication for slotToLiteralExpr. Contain on and where. + private final Set> globalSlotToLiteralDeDuplication = Sets.newHashSet(); + + // inExprDeDuplication: De-Duplication for inExpr. Contain on and where + private final Set globalInDeDuplication = Sets.newHashSet(); // map from slot id to the analyzer/block in which it was registered - public final Map blockBySlot = Maps.newHashMap(); + private final Map blockBySlot = Maps.newHashMap(); // Expr rewriter for normalizing and rewriting expressions. private final ExprRewriter exprRewriter; @@ -997,6 +1003,14 @@ public class Analyzer { globalState.onIsNullDeDuplication.add(expr); } + public void registerGlobalSlotToLiteralDeDuplication(Pair pair) { + globalState.globalSlotToLiteralDeDuplication.add(pair); + } + + public void registerGlobalInDeDuplication(Expr expr) { + globalState.globalInDeDuplication.add(expr); + } + public void registerConjunct(Expr e, TupleId tupleId) throws AnalysisException { final List exprs = Lists.newArrayList(); exprs.add(e); @@ -1462,6 +1476,13 @@ public class Analyzer { return Sets.newHashSet(globalState.onIsNullDeDuplication); } + public Set> getGlobalSlotToLiteralDeDuplication() { + return Sets.newHashSet(globalState.globalSlotToLiteralDeDuplication); + } + + public Set getGlobalInDeDuplication() { + return Sets.newHashSet(globalState.globalInDeDuplication); + } /** * Makes the given semi-joined tuple visible such that its slots can be referenced. * If tid is null, makes the currently visible semi-joined tuple invisible again. diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java new file mode 100644 index 0000000000..76c2ace45f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.JoinOperator; +import org.apache.doris.analysis.Predicate; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.common.Pair; + +import org.apache.directory.api.util.Strings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +/** + * Due to the current architecture, predicate derivation at rewrite cannot satisfy all cases, + * because rewrite is performed on first and then where, and when there are subqueries, all cases cannot be derived. + * So keep the predicate pushdown method here. + * + *

+ * eg: + * origin: select * from t1 left join t2 on t1 = t2 where t1 = 1; + * after: The function will be derived t2 = 1 + *

+ * + */ +public class PredicatePushDown { + private static final Logger LOG = LogManager.getLogger(PredicatePushDown.class); + + /** + * Desc: Predicate pushdown for inner and left join. + + * @param scanNode ScanNode to be judged + * @param joinOp join Operator + * @param analyzer global context + * @return {@link PlanNode} + */ + public static PlanNode visitScanNode(ScanNode scanNode, JoinOperator joinOp, Analyzer analyzer) { + switch (joinOp) { + case INNER_JOIN: + case LEFT_OUTER_JOIN: + predicateFromLeftSidePropagatesToRightSide(scanNode, analyzer); + break; + // TODO + default: + break; + } + return scanNode; + } + + private static void predicateFromLeftSidePropagatesToRightSide(ScanNode scanNode, Analyzer analyzer) { + List tupleIdList = scanNode.getTupleIds(); + if (tupleIdList.size() != 1) { + LOG.info("The predicate pushdown is not reflected " + + "because the scan node involves more then one tuple:{}", + Strings.listToString(tupleIdList)); + return; + } + TupleId rightSideTuple = tupleIdList.get(0); + List unassignedRightSideConjuncts = analyzer.getUnassignedConjuncts(scanNode); + List eqJoinPredicates = analyzer.getEqJoinConjuncts(rightSideTuple); + if (eqJoinPredicates != null) { + List allConjuncts = analyzer.getConjuncts(analyzer.getAllTupleIds()); + allConjuncts.removeAll(unassignedRightSideConjuncts); + for (Expr conjunct : allConjuncts) { + if (!Predicate.canPushDownPredicate(conjunct)) { + continue; + } + for (Expr eqJoinPredicate : eqJoinPredicates) { + // we can ensure slot is left node, because NormalizeBinaryPredicatesRule + SlotRef otherSlot = conjunct.getChild(0).unwrapSlotRef(); + + // ensure the children for eqJoinPredicate both be SlotRef + if (eqJoinPredicate.getChild(0).unwrapSlotRef() == null + || eqJoinPredicate.getChild(1).unwrapSlotRef() == null) { + continue; + } + + SlotRef leftSlot = eqJoinPredicate.getChild(0).unwrapSlotRef(); + SlotRef rightSlot = eqJoinPredicate.getChild(1).unwrapSlotRef(); + // ensure the type is match + if (!leftSlot.getDesc().getType().matchesType(rightSlot.getDesc().getType())) { + continue; + } + + // example: t1.id = t2.id and t1.id = 1 => t2.id =1 + if (otherSlot.isBound(leftSlot.getSlotId()) + && rightSlot.isBound(rightSideTuple)) { + Expr pushDownConjunct = rewritePredicate(analyzer, conjunct, rightSlot); + LOG.debug("pushDownConjunct: {}", pushDownConjunct); + if (!analyzer.getGlobalInDeDuplication().contains(pushDownConjunct) + && !analyzer.getGlobalSlotToLiteralDeDuplication() + .contains(new Pair<>(pushDownConjunct.getChild(0), pushDownConjunct.getChild(1)))) { + scanNode.addConjunct(pushDownConjunct); + } + } else if (otherSlot.isBound(rightSlot.getSlotId()) + && leftSlot.isBound(rightSideTuple)) { + Expr pushDownConjunct = rewritePredicate(analyzer, conjunct, leftSlot); + LOG.debug("pushDownConjunct: {}", pushDownConjunct); + if (!analyzer.getGlobalInDeDuplication().contains(pushDownConjunct) + && !analyzer.getGlobalSlotToLiteralDeDuplication() + .contains(new Pair<>(pushDownConjunct.getChild(0), pushDownConjunct.getChild(1)))) { + scanNode.addConjunct(pushDownConjunct); + } + } + } + } + } + } + + // Rewrite the oldPredicate with new leftChild + // For example: oldPredicate is t1.id = 1, leftChild is t2.id, will return t2.id = 1 + private static Expr rewritePredicate(Analyzer analyzer, Expr oldPredicate, Expr leftChild) { + if (oldPredicate instanceof BinaryPredicate) { + BinaryPredicate oldBP = (BinaryPredicate) oldPredicate; + BinaryPredicate bp = new BinaryPredicate(oldBP.getOp(), leftChild, oldBP.getChild(1)); + bp.analyzeNoThrow(analyzer); + return bp; + } + + if (oldPredicate instanceof InPredicate) { + InPredicate oldIP = (InPredicate) oldPredicate; + InPredicate ip = new InPredicate(leftChild, oldIP.getListChildren(), oldIP.isNotIn()); + ip.analyzeNoThrow(analyzer); + return ip; + } + + return oldPredicate; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 8d125c1204..b8ac8e7d7b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1712,6 +1712,9 @@ public class SingleNodePlanner { break; } if (scanNode instanceof OlapScanNode || scanNode instanceof EsScanNode || scanNode instanceof HiveScanNode) { + if (analyzer.enableInferPredicate()) { + PredicatePushDown.visitScanNode(scanNode, tblRef.getJoinOp(), analyzer); + } scanNode.setSortColumn(tblRef.getSortColumn()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java b/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java index 17be70c6e4..edfae20905 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java @@ -180,6 +180,7 @@ public class InferFiltersRule implements ExprRewriteRule { analyzer.registerOnSlotToLiteralDeDuplication(pair); analyzer.registerOnSlotToLiteralExpr(conjunct); } + analyzer.registerGlobalSlotToLiteralDeDuplication(pair); } } else if (conjunct.getChild(0).unwrapSlotRef() instanceof SlotRef && conjunct.getChild(1).unwrapSlotRef() instanceof SlotRef) { @@ -219,6 +220,7 @@ public class InferFiltersRule implements ExprRewriteRule { analyzer.registerInExpr(conjunct); analyzer.registerInDeDuplication(conjunct.getChild(0).unwrapSlotRef()); } + analyzer.registerGlobalInDeDuplication(conjunct.getChild(0).unwrapSlotRef()); } } } @@ -497,6 +499,9 @@ public class InferFiltersRule implements ExprRewriteRule { analyzer.registerOnSlotToLiteralDeDuplication(pair); analyzer.registerOnSlotToLiteralExpr(newBP); } + if (needAddnewExprWithState) { + analyzer.registerGlobalSlotToLiteralDeDuplication(pair); + } } } } @@ -665,6 +670,9 @@ public class InferFiltersRule implements ExprRewriteRule { analyzer.registerInDeDuplication(newIP); analyzer.registerInExpr(newIP); } + if (needAddnewExprWithState) { + analyzer.registerGlobalInDeDuplication(newIP); + } } } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/rewrite/InferFiltersRuleTest.java b/fe/fe-core/src/test/java/org/apache/doris/rewrite/InferFiltersRuleTest.java index 0a402bc2e2..b71b36452f 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/rewrite/InferFiltersRuleTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/rewrite/InferFiltersRuleTest.java @@ -298,7 +298,7 @@ public class InferFiltersRuleTest { } @Test - public void testOnAndWhere2TablesLeftJoin() throws Exception { + public void testOnAndWhere2TablesLeftJoin2ndIsLiteral() throws Exception { SessionVariable sessionVariable = dorisAssert.getSessionVariable(); sessionVariable.setEnableInferPredicate(true); Assert.assertTrue(sessionVariable.isEnableInferPredicate()); @@ -308,7 +308,7 @@ public class InferFiltersRuleTest { } @Test - public void testOnAndWhere2TablesInnerJoin() throws Exception { + public void testOnAndWhere2TablesInnerJoin2ndIsLiteral() throws Exception { SessionVariable sessionVariable = dorisAssert.getSessionVariable(); sessionVariable.setEnableInferPredicate(true); Assert.assertTrue(sessionVariable.isEnableInferPredicate()); @@ -317,4 +317,23 @@ public class InferFiltersRuleTest { Assert.assertTrue(planString.contains("`tb1`.`k1` = 1")); } + @Test + public void testOnAndWhere2TableLeftJoin1stIsLiteral() throws Exception { + SessionVariable sessionVariable = dorisAssert.getSessionVariable(); + sessionVariable.setEnableInferPredicate(true); + Assert.assertTrue(sessionVariable.isEnableInferPredicate()); + String query = "select * from tb1 left join tb2 on tb1.k1 = tb2.k1 where tb1.k1 = 1"; + String planString = dorisAssert.query(query).explainQuery(); + Assert.assertTrue(planString.contains("`tb2`.`k1` = 1")); + } + + @Test + public void testOnAndWhere2TablesInnerJoin1stIsLiteral() throws Exception { + SessionVariable sessionVariable = dorisAssert.getSessionVariable(); + sessionVariable.setEnableInferPredicate(true); + Assert.assertTrue(sessionVariable.isEnableInferPredicate()); + String query = "select * from tb1 inner join tb2 on tb1.k1 = tb2.k1 where tb1.k1 = 1"; + String planString = dorisAssert.query(query).explainQuery(); + Assert.assertTrue(planString.contains("`tb2`.`k1` = 1")); + } }