[enhancement](Nereids) eliminate all unessential cross join in TPC-H benchmark (#12651)
For eliminate all unessential cross join in TPC-H benchmark, this PR: 1. push all predicates that can be push down through join before do ReorderJoin rule. Then we could eliminate all cross join that can be eliminated in ReorderJoin rule since this rule need matching a LogicalFilter as a root pattern. (Q2, Q15, Q16, Q17, Q18) 2. enable expression optimization rule - extract common expression. (Q19) 3. fix cast translate failed. (Q19)
This commit is contained in:
@ -109,6 +109,19 @@ public class CastExpr extends Expr {
|
||||
analysisDone();
|
||||
}
|
||||
|
||||
/**
|
||||
* Just use for nereids, put analyze() in finalizeImplForNereids
|
||||
*/
|
||||
public CastExpr(Type targetType, Expr e, Void v) {
|
||||
super();
|
||||
Preconditions.checkArgument(targetType.isValid());
|
||||
Preconditions.checkNotNull(e);
|
||||
type = targetType;
|
||||
targetTypeDef = null;
|
||||
isImplicit = true;
|
||||
children.add(e);
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy c'tor used in clone().
|
||||
*/
|
||||
@ -531,6 +544,13 @@ public class CastExpr extends Expr {
|
||||
|
||||
@Override
|
||||
public void finalizeImplForNereids() throws AnalysisException {
|
||||
try {
|
||||
analyze();
|
||||
} catch (AnalysisException ex) {
|
||||
LOG.warn("Implicit casts fail", ex);
|
||||
Preconditions.checkState(false,
|
||||
"Implicit casts should never throw analysis exception.");
|
||||
}
|
||||
FunctionName fnName = new FunctionName(getFnName(type));
|
||||
Function searchDesc = new Function(fnName, Arrays.asList(collectChildReturnTypes()), Type.INVALID, false);
|
||||
if (type.isScalarType()) {
|
||||
|
||||
@ -232,7 +232,7 @@ public class ExpressionTranslator extends DefaultExpressionVisitor<Expr, PlanTra
|
||||
public Expr visitCast(Cast cast, PlanTranslatorContext context) {
|
||||
// left child of cast is expression, right child of cast is target type
|
||||
return new CastExpr(cast.getDataType().toCatalogDataType(),
|
||||
cast.child().accept(this, context));
|
||||
cast.child().accept(this, context), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -74,6 +74,9 @@ public class RuntimeFilterTranslator {
|
||||
public void createLegacyRuntimeFilter(RuntimeFilter filter, HashJoinNode node, PlanTranslatorContext ctx) {
|
||||
SlotRef src = ctx.findSlotRef(filter.getSrcExpr().getExprId());
|
||||
SlotRef target = context.getExprIdToOlapScanNodeSlotRef().get(filter.getTargetExpr().getExprId());
|
||||
if (target == null) {
|
||||
return;
|
||||
}
|
||||
org.apache.doris.planner.RuntimeFilter origFilter
|
||||
= org.apache.doris.planner.RuntimeFilter.fromNereidsRuntimeFilter(
|
||||
filter.getId(), node, src, filter.getExprOrder(), target,
|
||||
|
||||
@ -32,10 +32,18 @@ import java.util.stream.Collectors;
|
||||
public abstract class Job {
|
||||
protected JobType type;
|
||||
protected JobContext context;
|
||||
protected boolean once;
|
||||
|
||||
public Job(JobType type, JobContext context) {
|
||||
this.type = type;
|
||||
this.context = context;
|
||||
this.once = true;
|
||||
}
|
||||
|
||||
public Job(JobType type, JobContext context, boolean once) {
|
||||
this.type = type;
|
||||
this.context = context;
|
||||
this.once = once;
|
||||
}
|
||||
|
||||
public void pushJob(Job job) {
|
||||
@ -46,6 +54,10 @@ public abstract class Job {
|
||||
return context.getCascadesContext().getRuleSet();
|
||||
}
|
||||
|
||||
public boolean isOnce() {
|
||||
return once;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the rule set of this job. Filter out already applied rules and rules that are not matched on root node.
|
||||
*
|
||||
|
||||
@ -32,6 +32,7 @@ public class JobContext {
|
||||
protected final CascadesContext cascadesContext;
|
||||
protected final PhysicalProperties requiredProperties;
|
||||
protected double costUpperBound;
|
||||
protected boolean rewritten = false;
|
||||
|
||||
protected Map<RuleType, Integer> ruleInvokeTimes = Maps.newLinkedHashMap();
|
||||
|
||||
@ -57,6 +58,14 @@ public class JobContext {
|
||||
this.costUpperBound = costUpperBound;
|
||||
}
|
||||
|
||||
public boolean isRewritten() {
|
||||
return rewritten;
|
||||
}
|
||||
|
||||
public void setRewritten(boolean rewritten) {
|
||||
this.rewritten = rewritten;
|
||||
}
|
||||
|
||||
public void onInvokeRule(RuleType ruleType) {
|
||||
addRuleInvokeTimes(ruleType);
|
||||
}
|
||||
|
||||
@ -58,22 +58,35 @@ public abstract class BatchRulesJob {
|
||||
for (RuleFactory ruleFactory : ruleFactories) {
|
||||
rules.addAll(ruleFactory.buildRules());
|
||||
}
|
||||
return new RewriteTopDownJob(
|
||||
cascadesContext.getMemo().getRoot(),
|
||||
rules,
|
||||
return new RewriteTopDownJob(cascadesContext.getMemo().getRoot(), rules,
|
||||
cascadesContext.getCurrentJobContext());
|
||||
}
|
||||
|
||||
protected Job topDownBatch(List<RuleFactory> ruleFactories, boolean once) {
|
||||
List<Rule> rules = new ArrayList<>();
|
||||
for (RuleFactory ruleFactory : ruleFactories) {
|
||||
rules.addAll(ruleFactory.buildRules());
|
||||
}
|
||||
return new RewriteTopDownJob(cascadesContext.getMemo().getRoot(), rules,
|
||||
cascadesContext.getCurrentJobContext(), once);
|
||||
}
|
||||
|
||||
protected Job optimize() {
|
||||
return new OptimizeGroupJob(
|
||||
cascadesContext.getMemo().getRoot(),
|
||||
cascadesContext.getCurrentJobContext());
|
||||
}
|
||||
|
||||
/**
|
||||
* execute.
|
||||
*/
|
||||
public void execute() {
|
||||
for (Job job : rulesJob) {
|
||||
cascadesContext.pushJob(job);
|
||||
cascadesContext.getJobScheduler().executeJobPool(cascadesContext);
|
||||
do {
|
||||
cascadesContext.getCurrentJobContext().setRewritten(false);
|
||||
cascadesContext.pushJob(job);
|
||||
cascadesContext.getJobScheduler().executeJobPool(cascadesContext);
|
||||
} while (!job.isOnce() && cascadesContext.getCurrentJobContext().isRewritten());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,35 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.jobs.batch;
|
||||
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.rules.rewrite.logical.FindHashConditionForJoin;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
/**
|
||||
* FindHashConditionForJoinJob
|
||||
*/
|
||||
public class FindHashConditionForJoinJob extends BatchRulesJob {
|
||||
public FindHashConditionForJoinJob(CascadesContext cascadesContext) {
|
||||
super(cascadesContext);
|
||||
rulesJob.addAll(ImmutableList.of(
|
||||
topDownBatch(ImmutableList.of(new FindHashConditionForJoin()))
|
||||
));
|
||||
}
|
||||
}
|
||||
@ -21,6 +21,7 @@ import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.jobs.Job;
|
||||
import org.apache.doris.nereids.rules.RuleSet;
|
||||
import org.apache.doris.nereids.rules.expression.rewrite.ExpressionNormalization;
|
||||
import org.apache.doris.nereids.rules.expression.rewrite.ExpressionOptimization;
|
||||
import org.apache.doris.nereids.rules.mv.SelectRollup;
|
||||
import org.apache.doris.nereids.rules.rewrite.AggregateDisassemble;
|
||||
import org.apache.doris.nereids.rules.rewrite.logical.ColumnPruning;
|
||||
@ -57,10 +58,12 @@ public class NereidsRewriteJobExecutor extends BatchRulesJob {
|
||||
.addAll(new AdjustApplyFromCorrelatToUnCorrelatJob(cascadesContext).rulesJob)
|
||||
.addAll(new ConvertApplyToJoinJob(cascadesContext).rulesJob)
|
||||
.add(topDownBatch(ImmutableList.of(new ExpressionNormalization())))
|
||||
.add(topDownBatch(ImmutableList.of(new ExpressionOptimization())))
|
||||
.add(topDownBatch(ImmutableList.of(new NormalizeAggregate())))
|
||||
.add(topDownBatch(RuleSet.PUSH_DOWN_JOIN_CONDITION_RULES, false))
|
||||
.add(topDownBatch(ImmutableList.of(new ReorderJoin())))
|
||||
.add(topDownBatch(ImmutableList.of(new ColumnPruning())))
|
||||
.add(topDownBatch(RuleSet.PUSH_DOWN_JOIN_CONDITION_RULES))
|
||||
.add(topDownBatch(RuleSet.PUSH_DOWN_JOIN_CONDITION_RULES, false))
|
||||
.add(topDownBatch(ImmutableList.of(new FindHashConditionForJoin())))
|
||||
.add(topDownBatch(ImmutableList.of(new AggregateDisassemble())))
|
||||
.add(topDownBatch(ImmutableList.of(new LimitPushDown())))
|
||||
|
||||
@ -44,7 +44,11 @@ public class RewriteTopDownJob extends Job {
|
||||
public RewriteTopDownJob(Group group, JobContext context, List<RuleFactory> factories) {
|
||||
this(group, factories.stream()
|
||||
.flatMap(factory -> factory.buildRules().stream())
|
||||
.collect(Collectors.toList()), context);
|
||||
.collect(Collectors.toList()), context, true);
|
||||
}
|
||||
|
||||
public RewriteTopDownJob(Group group, List<Rule> rules, JobContext context) {
|
||||
this(group, rules, context, true);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -54,8 +58,8 @@ public class RewriteTopDownJob extends Job {
|
||||
* @param rules rewrite rules
|
||||
* @param context planner context
|
||||
*/
|
||||
public RewriteTopDownJob(Group group, List<Rule> rules, JobContext context) {
|
||||
super(JobType.TOP_DOWN_REWRITE, context);
|
||||
public RewriteTopDownJob(Group group, List<Rule> rules, JobContext context, boolean once) {
|
||||
super(JobType.TOP_DOWN_REWRITE, context, once);
|
||||
this.group = Objects.requireNonNull(group, "group cannot be null");
|
||||
this.rules = Objects.requireNonNull(rules, "rules cannot be null");
|
||||
}
|
||||
@ -84,6 +88,7 @@ public class RewriteTopDownJob extends Job {
|
||||
if (result.generateNewExpression) {
|
||||
// new group-expr replaced the origin group-expr in `group`,
|
||||
// run this rule against this `group` again.
|
||||
context.setRewritten(true);
|
||||
pushJob(new RewriteTopDownJob(group, rules, context));
|
||||
return;
|
||||
}
|
||||
|
||||
@ -22,6 +22,7 @@ import org.apache.doris.nereids.trees.expressions.EqualTo;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.plans.GroupPlan;
|
||||
import org.apache.doris.nereids.trees.plans.JoinType;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
|
||||
@ -188,10 +189,18 @@ public class MultiJoin extends PlanVisitor<Void, Void> {
|
||||
conjunctsForAllHashJoins.addAll(ExpressionUtils.extractConjunction(join.getOtherJoinCondition().get()));
|
||||
}
|
||||
|
||||
if (!(join.left() instanceof LogicalJoin)) {
|
||||
Plan leftChild = join.left();
|
||||
if (join.left() instanceof LogicalFilter) {
|
||||
leftChild = join.left().child(0);
|
||||
}
|
||||
if (leftChild instanceof GroupPlan) {
|
||||
joinInputs.add(join.left());
|
||||
}
|
||||
if (!(join.right() instanceof LogicalJoin)) {
|
||||
Plan rightChild = join.right();
|
||||
if (join.right() instanceof LogicalFilter) {
|
||||
rightChild = join.right().child(0);
|
||||
}
|
||||
if (rightChild instanceof GroupPlan) {
|
||||
joinInputs.add(join.right());
|
||||
}
|
||||
return null;
|
||||
|
||||
Reference in New Issue
Block a user