diff --git a/fe/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/src/main/java/org/apache/doris/analysis/Analyzer.java index eecd48ed7e..5d3ec0d05a 100644 --- a/fe/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -712,12 +712,22 @@ public class Analyzer { globalState.semiJoinedTupleIds.put(tid, rhsRef); } + public void registerConjunct(Expr e, TupleId tupleId) throws AnalysisException { + final List exprs = Lists.newArrayList(); + exprs.add(e); + registerConjuncts(exprs, tupleId); + } - /** - * Register all conjuncts in a list of predicates as Where clause conjuncts. - */ - public void registerConjuncts(List l) throws AnalysisException { - registerConjuncts(l, null); + public void registerConjuncts(List l, TupleId tupleId) throws AnalysisException { + final List tupleIds = Lists.newArrayList(); + tupleIds.add(tupleId); + registerConjuncts(l, tupleIds); + } + + public void registerConjunct(Expr e, List tupleIds) throws AnalysisException { + final List exprs = Lists.newArrayList(); + exprs.add(e); + registerConjuncts(exprs, tupleIds); } // register all conjuncts and handle constant conjuncts with ids diff --git a/fe/src/main/java/org/apache/doris/analysis/Expr.java b/fe/src/main/java/org/apache/doris/analysis/Expr.java index f9771babea..9acc131e19 100644 --- a/fe/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/src/main/java/org/apache/doris/analysis/Expr.java @@ -1084,6 +1084,13 @@ abstract public class Expr extends TreeNode implements ParseNode, Cloneabl return true; } + public boolean isBound(List slotIds) { + final List exprTupleIds = Lists.newArrayList(); + final List exprSlotIds = Lists.newArrayList(); + getIds(exprTupleIds, exprSlotIds); + return !exprSlotIds.retainAll(slotIds); + } + public void getIds(List tupleIds, List slotIds) { for (Expr child : children) { child.getIds(tupleIds, slotIds); diff --git a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 823c0e36bd..0a77a8d461 100644 --- a/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -55,6 +55,7 @@ import org.apache.doris.catalog.MysqlTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.FunctionSet; import org.apache.doris.common.AnalysisException; +import org.apache.doris.catalog.AggregateFunction; import org.apache.doris.common.Pair; import org.apache.doris.common.Reference; import org.apache.doris.common.UserException; @@ -207,6 +208,8 @@ public class SingleNodePlanner { PlanNode root; if (stmt instanceof SelectStmt) { SelectStmt selectStmt = (SelectStmt) stmt; + pushDownPredicates(analyzer, selectStmt); + root = createSelectPlan(selectStmt, analyzer, newDefaultOrderByLimit); // TODO(zc) @@ -257,7 +260,7 @@ public class SingleNodePlanner { } Preconditions.checkState(root.hasValidStats()); root.init(analyzer); - // TODO chenhao16, before merge ValueTransferGraph, force evaluate conjuncts + // TODO chenhao, before merge ValueTransferGraph, force evaluate conjuncts // from SelectStmt outside root = addUnassignedConjuncts(analyzer, root); } else { @@ -337,7 +340,7 @@ public class SingleNodePlanner { if (selectStmt.getTableRefs().size() > 1) { for (int i = 1; i < selectStmt.getTableRefs().size(); ++i) { final JoinOperator joinOperator = selectStmt.getTableRefs().get(i).getJoinOp(); - // TODO chenhao16 , right out join ? + // TODO chenhao , right out join ? if (joinOperator.isRightOuterJoin() || joinOperator.isFullOuterJoin()) { turnOffReason = selectStmt.getTableRefs().get(i) + " joinOp is full outer join or right outer join."; @@ -1003,14 +1006,6 @@ public class SingleNodePlanner { // All conjuncts final List unassignedConjuncts = analyzer.getUnassignedConjuncts(inlineViewRef.getId().asList(), true); - if (!canMigrateConjuncts(inlineViewRef)) { - // mark (fully resolve) slots referenced by unassigned conjuncts as - // materialized - List substUnassigned = Expr.substituteList(unassignedConjuncts, - inlineViewRef.getBaseTblSmap(), analyzer, false); - analyzer.materializeSlots(substUnassigned); - return; - } // Constant conjuncts final List unassignedConstantConjuncts = Lists.newArrayList(); @@ -1041,17 +1036,21 @@ public class SingleNodePlanner { preds.add(e); } } - unassignedConjuncts.removeAll(preds); - // Generate predicates to enforce equivalences among slots of the inline view - // tuple. These predicates are also migrated into the inline view. - // TODO(zc) - // analyzer.createEquivConjuncts(inlineViewRef.getId(), preds); - // create new predicates against the inline view's unresolved result exprs, not - // the resolved result exprs, in order to avoid skipping scopes (and ignoring - // limit clauses on the way) - final List viewPredicates = - Expr.substituteList(preds, inlineViewRef.getSmap(), analyzer, false); + final List pushDownFailedPredicates = Lists.newArrayList(); + final List viewPredicates = getPushDownPredicatesForInlineView( + inlineViewRef, preds, analyzer, pushDownFailedPredicates); + if (viewPredicates.size() <= 0) { + // mark (fully resolve) slots referenced by unassigned conjuncts as + // materialized + List substUnassigned = Expr.substituteList(unassignedConjuncts, + inlineViewRef.getBaseTblSmap(), analyzer, false); + analyzer.materializeSlots(substUnassigned); + return; + } + preds.removeAll(pushDownFailedPredicates); + unassignedConjuncts.remove(preds); + unassignedConjuncts.addAll(pushDownFailedPredicates); // Remove unregistered predicates that reference the same slot on // both sides (e.g. a = a). Such predicates have been generated from slot @@ -1094,6 +1093,11 @@ public class SingleNodePlanner { if (conjuncts.isEmpty()) { return; } + + if (!canMigrateConjuncts(inlineViewRef)) { + return; + } + final List newConjuncts = cloneExprs(conjuncts); final QueryStmt stmt = inlineViewRef.getViewStmt(); final Analyzer viewAnalyzer = inlineViewRef.getAnalyzer(); @@ -1138,11 +1142,87 @@ public class SingleNodePlanner { return clones; } + /** + * Get predicates can be migrated into an inline view. + */ + private List getPushDownPredicatesForInlineView( + InlineViewRef inlineViewRef, List viewPredicates, + Analyzer analyzer, List pushDownFailedPredicates) { + // TODO chenhao, remove evaluateOrderBy when SubQuery's default limit is removed. + final List pushDownPredicates = Lists.newArrayList(); + if (inlineViewRef.getViewStmt().evaluateOrderBy() + || inlineViewRef.getViewStmt().hasLimit() + || inlineViewRef.getViewStmt().hasOffset()) { + return pushDownPredicates; + } + + // UnionNode will handle predicates and assigns predicates to it's children. + final List candicatePredicates = + Expr.substituteList(viewPredicates, inlineViewRef.getSmap(), analyzer, false); + if (inlineViewRef.getViewStmt() instanceof UnionStmt) { + final UnionStmt unionStmt = (UnionStmt)inlineViewRef.getViewStmt(); + for (int i = 0; i < candicatePredicates.size(); i++) { + final Expr predicate = candicatePredicates.get(i); + if (predicate.isBound(unionStmt.getTupleId())) { + pushDownPredicates.add(predicate); + } else { + pushDownFailedPredicates.add(viewPredicates.get(i)); + } + } + return pushDownPredicates; + } + + final SelectStmt selectStmt = (SelectStmt)inlineViewRef.getViewStmt(); + if (selectStmt.hasAnalyticInfo()) { + pushDownPredicates.addAll(getWindowsPushDownPredicates(candicatePredicates, viewPredicates, + selectStmt.getAnalyticInfo(), pushDownFailedPredicates)); + } else { + pushDownPredicates.addAll(candicatePredicates); + } + return pushDownPredicates; + } + + /** + * Get predicates which can be pushed down past Windows. + * @param predicates + * @param viewPredicates + * @param analyticInfo + * @param pushDownFailedPredicates + * @return + */ + private List getWindowsPushDownPredicates( + List predicates, List viewPredicates, + AnalyticInfo analyticInfo, List pushDownFailedPredicates) { + final List pushDownPredicates = Lists.newArrayList(); + final List partitionExprs = analyticInfo.getCommonPartitionExprs(); + final List partitionByIds = Lists.newArrayList(); + for (Expr expr : partitionExprs) { + if (expr instanceof SlotRef) { + final SlotRef slotRef = (SlotRef)expr; + partitionByIds.add(slotRef.getSlotId()); + } + } + + if (partitionByIds.size() <= 0) { + return pushDownPredicates; + } + + for (int i = 0; i < predicates.size(); i++) { + final Expr predicate = predicates.get(i); + if (predicate.isBound(partitionByIds)) { + pushDownPredicates.add(predicate); + } else { + pushDownFailedPredicates.add(viewPredicates.get(i)); + } + } + return pushDownPredicates; + } + /** * Checks if conjuncts can be migrated into an inline view. */ private boolean canMigrateConjuncts(InlineViewRef inlineViewRef) { - // TODO chenhao16, remove 'false' when SubQuery's default limit is removed. + // TODO chenhao, remove evaluateOrderBy when SubQuery's default limit is removed. return inlineViewRef.getViewStmt().evaluateOrderBy() ? false : (!inlineViewRef.getViewStmt().hasLimit() && !inlineViewRef.getViewStmt().hasOffset() @@ -1197,7 +1277,7 @@ public class SingleNodePlanner { } // assignConjuncts(scanNode, analyzer); scanNode.init(analyzer); - // TODO chenhao16 add + // TODO chenhao add // materialize conjuncts in where analyzer.materializeSlots(scanNode.getConjuncts()); @@ -1406,7 +1486,7 @@ public class SingleNodePlanner { // List conjuncts = // analyzer.getUnassignedConjuncts(unionStmt.getTupleId().asList(), false); List conjuncts = analyzer.getUnassignedConjuncts(unionStmt.getTupleId().asList()); - // TODO chenhao16 + // TODO chenhao // Because Conjuncts can't be assigned to UnionNode and Palo's fe can't evaluate conjuncts, // it needs to add SelectNode as UnionNode's parent, when UnionStmt's Ops contains constant // Select. @@ -1563,5 +1643,191 @@ public class SingleNodePlanner { } } + /** + ------------------------------------------------------------------------------ + */ + /** + * Push down predicates rules + */ + + /** + * Entrance for push-down rules, it will execute possible push-down rules from top to down + * and the planner will be responsible for assigning all predicates to PlanNode. + */ + private void pushDownPredicates(Analyzer analyzer, SelectStmt stmt) throws AnalysisException { + // Push down predicates according to the semantic requirements of SQL. + pushDownPredicatesPastSort(analyzer, stmt); + pushDownPredicatesPastWindows(analyzer, stmt); + pushDownPredicatesPastAggregation(analyzer, stmt); + } + + private void pushDownPredicatesPastSort(Analyzer analyzer, SelectStmt stmt) throws AnalysisException { + // TODO chenhao, remove isEvaluateOrderBy when SubQuery's default limit is removed. + if (stmt.evaluateOrderBy() || stmt.getLimit() >= 0 || stmt.getOffset() > 0 || stmt.getSortInfo() == null) { + return; + } + final List predicates = getBoundPredicates(analyzer, stmt.getSortInfo().getSortTupleDescriptor()); + if (predicates.size() <= 0) { + return; + } + final List pushDownPredicates = getPredicatesReplacedSlotWithSourceExpr(predicates, analyzer); + if (pushDownPredicates.size() <= 0) { + return; + } + + // Push down predicates to sort's child until they are assigned successfully. + if (putPredicatesOnWindows(stmt, analyzer, pushDownPredicates)) { + return; + } + if (putPredicatesOnAggregation(stmt, analyzer, pushDownPredicates)) { + return; + } + putPredicatesOnFrom(stmt, analyzer, pushDownPredicates); + } + + private void pushDownPredicatesPastWindows(Analyzer analyzer, SelectStmt stmt) throws AnalysisException { + final AnalyticInfo analyticInfo = stmt.getAnalyticInfo(); + if (analyticInfo == null || analyticInfo.getCommonPartitionExprs().size() == 0) { + return; + } + final List predicates = getBoundPredicates(analyzer, analyticInfo.getOutputTupleDesc()); + if (predicates.size() <= 0) { + return; + } + + // Push down predicates to Windows' child until they are assigned successfully. + final List pushDownPredicates = getPredicatesBoundedByGroupbysSourceExpr(predicates, analyzer); + if (pushDownPredicates.size() <= 0) { + return; + } + if (putPredicatesOnAggregation(stmt, analyzer, pushDownPredicates)) { + return; + } + putPredicatesOnFrom(stmt, analyzer, pushDownPredicates); + } + + private void pushDownPredicatesPastAggregation(Analyzer analyzer, SelectStmt stmt) throws AnalysisException { + final AggregateInfo aggregateInfo = stmt.getAggInfo(); + if (aggregateInfo == null || aggregateInfo.getGroupingExprs().size() <= 0) { + return; + } + final List predicates = getBoundPredicates(analyzer, aggregateInfo.getOutputTupleDesc()); + if (predicates.size() <= 0) { + return; + } + + // Push down predicates to aggregation's child until they are assigned successfully. + final List pushDownPredicates = getPredicatesBoundedByGroupbysSourceExpr(predicates, analyzer); + if (pushDownPredicates.size() <= 0) { + return; + } + putPredicatesOnFrom(stmt, analyzer, pushDownPredicates); + } + + private List getPredicatesBoundedByGroupbysSourceExpr(List predicates, Analyzer analyzer) { + final List predicatesCanPushDown = Lists.newArrayList(); + for (Expr predicate : predicates) { + if (predicate.isConstant()) { + // Constant predicates can't be pushed down past Groupby. + continue; + } + + final List tupleIds = Lists.newArrayList(); + final List slotIds = Lists.newArrayList(); + predicate.getIds(tupleIds, slotIds); + + boolean isAllSlotReferingGroupBys = true; + for (SlotId slotId : slotIds) { + final SlotDescriptor slotDesc = analyzer.getDescTbl().getSlotDesc(slotId); + Expr sourceExpr = slotDesc.getSourceExprs().get(0); + if (sourceExpr.getFn() instanceof AggregateFunction) { + isAllSlotReferingGroupBys = false; + } + } + + if (isAllSlotReferingGroupBys) { + predicatesCanPushDown.add(predicate); + } + } + return getPredicatesReplacedSlotWithSourceExpr(predicatesCanPushDown, analyzer); + } + + private List getPredicatesReplacedSlotWithSourceExpr(List predicates, Analyzer analyzer) { + final List predicatesCanPushDown = Lists.newArrayList(); + analyzer.markConjunctsAssigned(predicates); + for (Expr predicate : predicates) { + final Expr newPredicate = predicate.clone(); + replacePredicateSlotRefWithSource(newPredicate, analyzer); + predicatesCanPushDown.add(newPredicate); + } + return predicatesCanPushDown; + } + + private void replacePredicateSlotRefWithSource(Expr predicate, Analyzer analyzer) { + replacePredicateSlotRefWithSource(null, predicate, -1, analyzer); + } + + private void replacePredicateSlotRefWithSource(Expr parent, Expr predicate, int childIndex, Analyzer analyzer) { + if (predicate instanceof SlotRef) { + final SlotRef slotRef = (SlotRef)predicate; + if (parent != null && childIndex >= 0) { + final Expr newReplacedExpr = slotRef.getDesc().getSourceExprs().get(0).clone(); + parent.setChild(childIndex, newReplacedExpr); + } + } + + for (int i = 0; i < predicate.getChildren().size(); i++) { + final Expr child = predicate.getChild(i); + replacePredicateSlotRefWithSource(predicate, child, i, analyzer); + } + } + + // Register predicates with Aggregation's output tuple id. + private boolean putPredicatesOnAggregation(SelectStmt stmt, Analyzer analyzer, + List predicates) throws AnalysisException { + final AggregateInfo aggregateInfo = stmt.getAggInfo(); + if (aggregateInfo != null) { + analyzer.registerConjuncts(predicates, aggregateInfo.getOutputTupleId()); + return true; + } + return false; + } + + // Register predicates with Windows's tuple id. + private boolean putPredicatesOnWindows(SelectStmt stmt, Analyzer analyzer, + List predicates) throws AnalysisException { + final AnalyticInfo analyticInfo = stmt.getAnalyticInfo(); + if (analyticInfo != null) { + analyzer.registerConjuncts(predicates, analyticInfo.getOutputTupleId()); + return true; + } + return false; + } + + // Register predicates with TableRef's tuple id. + private void putPredicatesOnFrom(SelectStmt stmt, Analyzer analyzer, List predicates) throws AnalysisException { + final List tableTupleIds = Lists.newArrayList(); + for (TableRef tableRef : stmt.getTableRefs()) { + tableTupleIds.add(tableRef.getId()); + } + + for (Expr predicate : predicates) { + Preconditions.checkArgument(predicate.isBoundByTupleIds(tableTupleIds), + "Predicate:" + predicate.toSql() + " can't be assigned to some PlanNode."); + final List predicateTupleIds = Lists.newArrayList(); + predicate.getIds(predicateTupleIds, null); + analyzer.registerConjunct(predicate, predicateTupleIds); + } + } + + /** + ------------------------------------------------------------------------------ + */ + + private List getBoundPredicates(Analyzer analyzer, TupleDescriptor tupleDesc) { + final List tupleIds = Lists.newArrayList(); + tupleIds.add(tupleDesc.getId()); + return analyzer.getUnassignedConjuncts(tupleIds); + } }