Planner support push down predicates past agg, win and sort (#1471)

This commit is contained in:
chenhao
2019-09-08 09:30:46 +08:00
committed by Mingyu Chen
parent 2f52ae7988
commit f23ac0eadd
3 changed files with 311 additions and 28 deletions

View File

@ -712,12 +712,22 @@ public class Analyzer {
globalState.semiJoinedTupleIds.put(tid, rhsRef);
}
public void registerConjunct(Expr e, TupleId tupleId) throws AnalysisException {
final List<Expr> exprs = Lists.newArrayList();
exprs.add(e);
registerConjuncts(exprs, tupleId);
}
/**
* Register all conjuncts in a list of predicates as Where clause conjuncts.
*/
public void registerConjuncts(List<Expr> l) throws AnalysisException {
registerConjuncts(l, null);
public void registerConjuncts(List<Expr> l, TupleId tupleId) throws AnalysisException {
final List<TupleId> tupleIds = Lists.newArrayList();
tupleIds.add(tupleId);
registerConjuncts(l, tupleIds);
}
public void registerConjunct(Expr e, List<TupleId> tupleIds) throws AnalysisException {
final List<Expr> exprs = Lists.newArrayList();
exprs.add(e);
registerConjuncts(exprs, tupleIds);
}
// register all conjuncts and handle constant conjuncts with ids

View File

@ -1084,6 +1084,13 @@ abstract public class Expr extends TreeNode<Expr> implements ParseNode, Cloneabl
return true;
}
public boolean isBound(List<SlotId> slotIds) {
final List<TupleId> exprTupleIds = Lists.newArrayList();
final List<SlotId> exprSlotIds = Lists.newArrayList();
getIds(exprTupleIds, exprSlotIds);
return !exprSlotIds.retainAll(slotIds);
}
public void getIds(List<TupleId> tupleIds, List<SlotId> slotIds) {
for (Expr child : children) {
child.getIds(tupleIds, slotIds);

View File

@ -55,6 +55,7 @@ import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.catalog.AggregateFunction;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Reference;
import org.apache.doris.common.UserException;
@ -207,6 +208,8 @@ public class SingleNodePlanner {
PlanNode root;
if (stmt instanceof SelectStmt) {
SelectStmt selectStmt = (SelectStmt) stmt;
pushDownPredicates(analyzer, selectStmt);
root = createSelectPlan(selectStmt, analyzer, newDefaultOrderByLimit);
// TODO(zc)
@ -257,7 +260,7 @@ public class SingleNodePlanner {
}
Preconditions.checkState(root.hasValidStats());
root.init(analyzer);
// TODO chenhao16, before merge ValueTransferGraph, force evaluate conjuncts
// TODO chenhao, before merge ValueTransferGraph, force evaluate conjuncts
// from SelectStmt outside
root = addUnassignedConjuncts(analyzer, root);
} else {
@ -337,7 +340,7 @@ public class SingleNodePlanner {
if (selectStmt.getTableRefs().size() > 1) {
for (int i = 1; i < selectStmt.getTableRefs().size(); ++i) {
final JoinOperator joinOperator = selectStmt.getTableRefs().get(i).getJoinOp();
// TODO chenhao16 , right out join ?
// TODO chenhao , right out join ?
if (joinOperator.isRightOuterJoin() || joinOperator.isFullOuterJoin()) {
turnOffReason = selectStmt.getTableRefs().get(i) +
" joinOp is full outer join or right outer join.";
@ -1003,14 +1006,6 @@ public class SingleNodePlanner {
// All conjuncts
final List<Expr> unassignedConjuncts =
analyzer.getUnassignedConjuncts(inlineViewRef.getId().asList(), true);
if (!canMigrateConjuncts(inlineViewRef)) {
// mark (fully resolve) slots referenced by unassigned conjuncts as
// materialized
List<Expr> substUnassigned = Expr.substituteList(unassignedConjuncts,
inlineViewRef.getBaseTblSmap(), analyzer, false);
analyzer.materializeSlots(substUnassigned);
return;
}
// Constant conjuncts
final List<Expr> unassignedConstantConjuncts = Lists.newArrayList();
@ -1041,17 +1036,21 @@ public class SingleNodePlanner {
preds.add(e);
}
}
unassignedConjuncts.removeAll(preds);
// Generate predicates to enforce equivalences among slots of the inline view
// tuple. These predicates are also migrated into the inline view.
// TODO(zc)
// analyzer.createEquivConjuncts(inlineViewRef.getId(), preds);
// create new predicates against the inline view's unresolved result exprs, not
// the resolved result exprs, in order to avoid skipping scopes (and ignoring
// limit clauses on the way)
final List<Expr> viewPredicates =
Expr.substituteList(preds, inlineViewRef.getSmap(), analyzer, false);
final List<Expr> pushDownFailedPredicates = Lists.newArrayList();
final List<Expr> viewPredicates = getPushDownPredicatesForInlineView(
inlineViewRef, preds, analyzer, pushDownFailedPredicates);
if (viewPredicates.size() <= 0) {
// mark (fully resolve) slots referenced by unassigned conjuncts as
// materialized
List<Expr> substUnassigned = Expr.substituteList(unassignedConjuncts,
inlineViewRef.getBaseTblSmap(), analyzer, false);
analyzer.materializeSlots(substUnassigned);
return;
}
preds.removeAll(pushDownFailedPredicates);
unassignedConjuncts.remove(preds);
unassignedConjuncts.addAll(pushDownFailedPredicates);
// Remove unregistered predicates that reference the same slot on
// both sides (e.g. a = a). Such predicates have been generated from slot
@ -1094,6 +1093,11 @@ public class SingleNodePlanner {
if (conjuncts.isEmpty()) {
return;
}
if (!canMigrateConjuncts(inlineViewRef)) {
return;
}
final List<Expr> newConjuncts = cloneExprs(conjuncts);
final QueryStmt stmt = inlineViewRef.getViewStmt();
final Analyzer viewAnalyzer = inlineViewRef.getAnalyzer();
@ -1138,11 +1142,87 @@ public class SingleNodePlanner {
return clones;
}
/**
* Get predicates can be migrated into an inline view.
*/
private List<Expr> getPushDownPredicatesForInlineView(
InlineViewRef inlineViewRef, List<Expr> viewPredicates,
Analyzer analyzer, List<Expr> pushDownFailedPredicates) {
// TODO chenhao, remove evaluateOrderBy when SubQuery's default limit is removed.
final List<Expr> pushDownPredicates = Lists.newArrayList();
if (inlineViewRef.getViewStmt().evaluateOrderBy()
|| inlineViewRef.getViewStmt().hasLimit()
|| inlineViewRef.getViewStmt().hasOffset()) {
return pushDownPredicates;
}
// UnionNode will handle predicates and assigns predicates to it's children.
final List<Expr> candicatePredicates =
Expr.substituteList(viewPredicates, inlineViewRef.getSmap(), analyzer, false);
if (inlineViewRef.getViewStmt() instanceof UnionStmt) {
final UnionStmt unionStmt = (UnionStmt)inlineViewRef.getViewStmt();
for (int i = 0; i < candicatePredicates.size(); i++) {
final Expr predicate = candicatePredicates.get(i);
if (predicate.isBound(unionStmt.getTupleId())) {
pushDownPredicates.add(predicate);
} else {
pushDownFailedPredicates.add(viewPredicates.get(i));
}
}
return pushDownPredicates;
}
final SelectStmt selectStmt = (SelectStmt)inlineViewRef.getViewStmt();
if (selectStmt.hasAnalyticInfo()) {
pushDownPredicates.addAll(getWindowsPushDownPredicates(candicatePredicates, viewPredicates,
selectStmt.getAnalyticInfo(), pushDownFailedPredicates));
} else {
pushDownPredicates.addAll(candicatePredicates);
}
return pushDownPredicates;
}
/**
* Get predicates which can be pushed down past Windows.
* @param predicates
* @param viewPredicates
* @param analyticInfo
* @param pushDownFailedPredicates
* @return
*/
private List<Expr> getWindowsPushDownPredicates(
List<Expr> predicates, List<Expr> viewPredicates,
AnalyticInfo analyticInfo, List<Expr> pushDownFailedPredicates) {
final List<Expr> pushDownPredicates = Lists.newArrayList();
final List<Expr> partitionExprs = analyticInfo.getCommonPartitionExprs();
final List<SlotId> partitionByIds = Lists.newArrayList();
for (Expr expr : partitionExprs) {
if (expr instanceof SlotRef) {
final SlotRef slotRef = (SlotRef)expr;
partitionByIds.add(slotRef.getSlotId());
}
}
if (partitionByIds.size() <= 0) {
return pushDownPredicates;
}
for (int i = 0; i < predicates.size(); i++) {
final Expr predicate = predicates.get(i);
if (predicate.isBound(partitionByIds)) {
pushDownPredicates.add(predicate);
} else {
pushDownFailedPredicates.add(viewPredicates.get(i));
}
}
return pushDownPredicates;
}
/**
* Checks if conjuncts can be migrated into an inline view.
*/
private boolean canMigrateConjuncts(InlineViewRef inlineViewRef) {
// TODO chenhao16, remove 'false' when SubQuery's default limit is removed.
// TODO chenhao, remove evaluateOrderBy when SubQuery's default limit is removed.
return inlineViewRef.getViewStmt().evaluateOrderBy() ? false :
(!inlineViewRef.getViewStmt().hasLimit()
&& !inlineViewRef.getViewStmt().hasOffset()
@ -1197,7 +1277,7 @@ public class SingleNodePlanner {
}
// assignConjuncts(scanNode, analyzer);
scanNode.init(analyzer);
// TODO chenhao16 add
// TODO chenhao add
// materialize conjuncts in where
analyzer.materializeSlots(scanNode.getConjuncts());
@ -1406,7 +1486,7 @@ public class SingleNodePlanner {
// List<Expr> conjuncts =
// analyzer.getUnassignedConjuncts(unionStmt.getTupleId().asList(), false);
List<Expr> conjuncts = analyzer.getUnassignedConjuncts(unionStmt.getTupleId().asList());
// TODO chenhao16
// TODO chenhao
// Because Conjuncts can't be assigned to UnionNode and Palo's fe can't evaluate conjuncts,
// it needs to add SelectNode as UnionNode's parent, when UnionStmt's Ops contains constant
// Select.
@ -1563,5 +1643,191 @@ public class SingleNodePlanner {
}
}
/**
------------------------------------------------------------------------------
*/
/**
* Push down predicates rules
*/
/**
* Entrance for push-down rules, it will execute possible push-down rules from top to down
* and the planner will be responsible for assigning all predicates to PlanNode.
*/
private void pushDownPredicates(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
// Push down predicates according to the semantic requirements of SQL.
pushDownPredicatesPastSort(analyzer, stmt);
pushDownPredicatesPastWindows(analyzer, stmt);
pushDownPredicatesPastAggregation(analyzer, stmt);
}
private void pushDownPredicatesPastSort(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
// TODO chenhao, remove isEvaluateOrderBy when SubQuery's default limit is removed.
if (stmt.evaluateOrderBy() || stmt.getLimit() >= 0 || stmt.getOffset() > 0 || stmt.getSortInfo() == null) {
return;
}
final List<Expr> predicates = getBoundPredicates(analyzer, stmt.getSortInfo().getSortTupleDescriptor());
if (predicates.size() <= 0) {
return;
}
final List<Expr> pushDownPredicates = getPredicatesReplacedSlotWithSourceExpr(predicates, analyzer);
if (pushDownPredicates.size() <= 0) {
return;
}
// Push down predicates to sort's child until they are assigned successfully.
if (putPredicatesOnWindows(stmt, analyzer, pushDownPredicates)) {
return;
}
if (putPredicatesOnAggregation(stmt, analyzer, pushDownPredicates)) {
return;
}
putPredicatesOnFrom(stmt, analyzer, pushDownPredicates);
}
private void pushDownPredicatesPastWindows(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
final AnalyticInfo analyticInfo = stmt.getAnalyticInfo();
if (analyticInfo == null || analyticInfo.getCommonPartitionExprs().size() == 0) {
return;
}
final List<Expr> predicates = getBoundPredicates(analyzer, analyticInfo.getOutputTupleDesc());
if (predicates.size() <= 0) {
return;
}
// Push down predicates to Windows' child until they are assigned successfully.
final List<Expr> pushDownPredicates = getPredicatesBoundedByGroupbysSourceExpr(predicates, analyzer);
if (pushDownPredicates.size() <= 0) {
return;
}
if (putPredicatesOnAggregation(stmt, analyzer, pushDownPredicates)) {
return;
}
putPredicatesOnFrom(stmt, analyzer, pushDownPredicates);
}
private void pushDownPredicatesPastAggregation(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
final AggregateInfo aggregateInfo = stmt.getAggInfo();
if (aggregateInfo == null || aggregateInfo.getGroupingExprs().size() <= 0) {
return;
}
final List<Expr> predicates = getBoundPredicates(analyzer, aggregateInfo.getOutputTupleDesc());
if (predicates.size() <= 0) {
return;
}
// Push down predicates to aggregation's child until they are assigned successfully.
final List<Expr> pushDownPredicates = getPredicatesBoundedByGroupbysSourceExpr(predicates, analyzer);
if (pushDownPredicates.size() <= 0) {
return;
}
putPredicatesOnFrom(stmt, analyzer, pushDownPredicates);
}
private List<Expr> getPredicatesBoundedByGroupbysSourceExpr(List<Expr> predicates, Analyzer analyzer) {
final List<Expr> predicatesCanPushDown = Lists.newArrayList();
for (Expr predicate : predicates) {
if (predicate.isConstant()) {
// Constant predicates can't be pushed down past Groupby.
continue;
}
final List<TupleId> tupleIds = Lists.newArrayList();
final List<SlotId> slotIds = Lists.newArrayList();
predicate.getIds(tupleIds, slotIds);
boolean isAllSlotReferingGroupBys = true;
for (SlotId slotId : slotIds) {
final SlotDescriptor slotDesc = analyzer.getDescTbl().getSlotDesc(slotId);
Expr sourceExpr = slotDesc.getSourceExprs().get(0);
if (sourceExpr.getFn() instanceof AggregateFunction) {
isAllSlotReferingGroupBys = false;
}
}
if (isAllSlotReferingGroupBys) {
predicatesCanPushDown.add(predicate);
}
}
return getPredicatesReplacedSlotWithSourceExpr(predicatesCanPushDown, analyzer);
}
private List<Expr> getPredicatesReplacedSlotWithSourceExpr(List<Expr> predicates, Analyzer analyzer) {
final List<Expr> predicatesCanPushDown = Lists.newArrayList();
analyzer.markConjunctsAssigned(predicates);
for (Expr predicate : predicates) {
final Expr newPredicate = predicate.clone();
replacePredicateSlotRefWithSource(newPredicate, analyzer);
predicatesCanPushDown.add(newPredicate);
}
return predicatesCanPushDown;
}
private void replacePredicateSlotRefWithSource(Expr predicate, Analyzer analyzer) {
replacePredicateSlotRefWithSource(null, predicate, -1, analyzer);
}
private void replacePredicateSlotRefWithSource(Expr parent, Expr predicate, int childIndex, Analyzer analyzer) {
if (predicate instanceof SlotRef) {
final SlotRef slotRef = (SlotRef)predicate;
if (parent != null && childIndex >= 0) {
final Expr newReplacedExpr = slotRef.getDesc().getSourceExprs().get(0).clone();
parent.setChild(childIndex, newReplacedExpr);
}
}
for (int i = 0; i < predicate.getChildren().size(); i++) {
final Expr child = predicate.getChild(i);
replacePredicateSlotRefWithSource(predicate, child, i, analyzer);
}
}
// Register predicates with Aggregation's output tuple id.
private boolean putPredicatesOnAggregation(SelectStmt stmt, Analyzer analyzer,
List<Expr> predicates) throws AnalysisException {
final AggregateInfo aggregateInfo = stmt.getAggInfo();
if (aggregateInfo != null) {
analyzer.registerConjuncts(predicates, aggregateInfo.getOutputTupleId());
return true;
}
return false;
}
// Register predicates with Windows's tuple id.
private boolean putPredicatesOnWindows(SelectStmt stmt, Analyzer analyzer,
List<Expr> predicates) throws AnalysisException {
final AnalyticInfo analyticInfo = stmt.getAnalyticInfo();
if (analyticInfo != null) {
analyzer.registerConjuncts(predicates, analyticInfo.getOutputTupleId());
return true;
}
return false;
}
// Register predicates with TableRef's tuple id.
private void putPredicatesOnFrom(SelectStmt stmt, Analyzer analyzer, List<Expr> predicates) throws AnalysisException {
final List<TupleId> tableTupleIds = Lists.newArrayList();
for (TableRef tableRef : stmt.getTableRefs()) {
tableTupleIds.add(tableRef.getId());
}
for (Expr predicate : predicates) {
Preconditions.checkArgument(predicate.isBoundByTupleIds(tableTupleIds),
"Predicate:" + predicate.toSql() + " can't be assigned to some PlanNode.");
final List<TupleId> predicateTupleIds = Lists.newArrayList();
predicate.getIds(predicateTupleIds, null);
analyzer.registerConjunct(predicate, predicateTupleIds);
}
}
/**
------------------------------------------------------------------------------
*/
private List<Expr> getBoundPredicates(Analyzer analyzer, TupleDescriptor tupleDesc) {
final List<TupleId> tupleIds = Lists.newArrayList();
tupleIds.add(tupleDesc.getId());
return analyzer.getUnassignedConjuncts(tupleIds);
}
}