[feature-wip](nereids) Adjust plan execution flow and fix physical bugs (#10481)
Organize the plan process, improve the batch execution of rules and the way to add jobs. Fix the problem that the condition in PhysicalHashJoin is empty.
This commit is contained in:
@ -0,0 +1,38 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids;
|
||||
|
||||
import org.apache.doris.nereids.rules.analysis.BindRelation;
|
||||
import org.apache.doris.nereids.rules.analysis.BindSlotReference;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
/**
|
||||
* Execute the analyze job.
|
||||
*/
|
||||
public class AnalyzeRulesJob extends BatchRulesJob {
|
||||
|
||||
AnalyzeRulesJob(PlannerContext plannerContext) {
|
||||
super(plannerContext);
|
||||
rulesJob.addAll(ImmutableList.of(
|
||||
bottomUpBatch(ImmutableList.of(
|
||||
new BindRelation(),
|
||||
new BindSlotReference())
|
||||
)));
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,82 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids;
|
||||
|
||||
import org.apache.doris.nereids.jobs.Job;
|
||||
import org.apache.doris.nereids.jobs.cascades.OptimizeGroupJob;
|
||||
import org.apache.doris.nereids.jobs.rewrite.RewriteBottomUpJob;
|
||||
import org.apache.doris.nereids.jobs.rewrite.RewriteTopDownJob;
|
||||
import org.apache.doris.nereids.rules.Rule;
|
||||
import org.apache.doris.nereids.rules.RuleFactory;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Base class for executing all jobs.
|
||||
*
|
||||
* Each batch of rules will be uniformly executed.
|
||||
*/
|
||||
public class BatchRulesJob {
|
||||
protected PlannerContext plannerContext;
|
||||
protected List<Job<Plan>> rulesJob = new ArrayList<>();
|
||||
|
||||
BatchRulesJob(PlannerContext plannerContext) {
|
||||
this.plannerContext = Objects.requireNonNull(plannerContext, "plannerContext can not null");
|
||||
}
|
||||
|
||||
protected Job<Plan> bottomUpBatch(List<RuleFactory> ruleFactories) {
|
||||
List<Rule<Plan>> rules = new ArrayList<>();
|
||||
for (RuleFactory ruleFactory : ruleFactories) {
|
||||
rules.add((Rule<Plan>) ruleFactory.buildRules());
|
||||
}
|
||||
Collections.reverse(rules);
|
||||
return new RewriteBottomUpJob(
|
||||
plannerContext.getOptimizerContext().getMemo().getRoot(),
|
||||
rules,
|
||||
plannerContext);
|
||||
}
|
||||
|
||||
protected Job<Plan> topDownBatch(List<RuleFactory> ruleFactories) {
|
||||
List<Rule<Plan>> rules = new ArrayList<>();
|
||||
for (RuleFactory ruleFactory : ruleFactories) {
|
||||
rules.add((Rule<Plan>) ruleFactory.buildRules());
|
||||
}
|
||||
Collections.reverse(rules);
|
||||
return new RewriteTopDownJob(
|
||||
plannerContext.getOptimizerContext().getMemo().getRoot(),
|
||||
rules,
|
||||
plannerContext);
|
||||
}
|
||||
|
||||
protected Job<Plan> optimize() {
|
||||
return new OptimizeGroupJob(
|
||||
plannerContext.getOptimizerContext().getMemo().getRoot(),
|
||||
plannerContext);
|
||||
}
|
||||
|
||||
public void execute() {
|
||||
for (Job job : rulesJob) {
|
||||
plannerContext.getOptimizerContext().pushJob(job);
|
||||
plannerContext.getOptimizerContext().getJobScheduler().executeJobPool(plannerContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -21,8 +21,6 @@ import org.apache.doris.analysis.StatementBase;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.VectorizedUtil;
|
||||
import org.apache.doris.nereids.jobs.cascades.OptimizeGroupJob;
|
||||
import org.apache.doris.nereids.jobs.rewrite.RewriteBottomUpJob;
|
||||
import org.apache.doris.nereids.memo.Group;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.memo.Memo;
|
||||
@ -94,13 +92,21 @@ public class NereidsPlanner extends Planner {
|
||||
OptimizerContext optimizerContext = new OptimizerContext(memo);
|
||||
plannerContext = new PlannerContext(optimizerContext, connectContext, outputProperties);
|
||||
|
||||
plannerContext.getOptimizerContext().pushJob(
|
||||
new RewriteBottomUpJob(getRoot(), optimizerContext.getRuleSet().getAnalysisRules(), plannerContext));
|
||||
|
||||
plannerContext.getOptimizerContext().pushJob(new OptimizeGroupJob(getRoot(), plannerContext));
|
||||
plannerContext.getOptimizerContext().getJobScheduler().executeJobPool(plannerContext);
|
||||
|
||||
// Get plan directly. Just for SSB.
|
||||
return doPlan();
|
||||
}
|
||||
|
||||
/**
|
||||
* The actual execution of the plan, including the generation and execution of the job.
|
||||
* @return PhysicalPlan.
|
||||
*/
|
||||
private PhysicalPlan doPlan() {
|
||||
AnalyzeRulesJob analyzeRulesJob = new AnalyzeRulesJob(plannerContext);
|
||||
analyzeRulesJob.execute();
|
||||
|
||||
OptimizeRulesJob optimizeRulesJob = new OptimizeRulesJob(plannerContext);
|
||||
optimizeRulesJob.execute();
|
||||
|
||||
return getRoot().extractPlan();
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,32 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
/**
|
||||
* cascade optimizer added.
|
||||
*/
|
||||
public class OptimizeRulesJob extends BatchRulesJob {
|
||||
OptimizeRulesJob(PlannerContext plannerContext) {
|
||||
super(plannerContext);
|
||||
rulesJob.addAll(ImmutableList.of(
|
||||
optimize()
|
||||
));
|
||||
}
|
||||
}
|
||||
@ -27,6 +27,8 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalBinaryPlan;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Physical hash join plan operator.
|
||||
@ -35,7 +37,7 @@ public class PhysicalHashJoin extends PhysicalBinaryOperator {
|
||||
|
||||
private final JoinType joinType;
|
||||
|
||||
private final Expression condition;
|
||||
private final Optional<Expression> condition;
|
||||
|
||||
/**
|
||||
* Constructor of PhysicalHashJoinNode.
|
||||
@ -43,17 +45,17 @@ public class PhysicalHashJoin extends PhysicalBinaryOperator {
|
||||
* @param joinType Which join type, left semi join, inner join...
|
||||
* @param predicate join condition.
|
||||
*/
|
||||
public PhysicalHashJoin(JoinType joinType, Expression predicate) {
|
||||
public PhysicalHashJoin(JoinType joinType, Optional<Expression> predicate) {
|
||||
super(OperatorType.PHYSICAL_HASH_JOIN);
|
||||
this.joinType = joinType;
|
||||
this.condition = predicate;
|
||||
this.joinType = Objects.requireNonNull(joinType, "joinType can not be null");
|
||||
this.condition = Objects.requireNonNull(predicate, "predicate can not be null");
|
||||
}
|
||||
|
||||
public JoinType getJoinType() {
|
||||
return joinType;
|
||||
}
|
||||
|
||||
public Expression getCondition() {
|
||||
public Optional<Expression> getCondition() {
|
||||
return condition;
|
||||
}
|
||||
|
||||
@ -64,6 +66,6 @@ public class PhysicalHashJoin extends PhysicalBinaryOperator {
|
||||
|
||||
@Override
|
||||
public List<Expression> getExpressions() {
|
||||
return ImmutableList.of(condition);
|
||||
return condition.<List<Expression>>map(ImmutableList::of).orElseGet(ImmutableList::of);
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.nereids.rules;
|
||||
|
||||
import org.apache.doris.nereids.rules.analysis.BindRelation;
|
||||
import org.apache.doris.nereids.rules.exploration.join.JoinCommutative;
|
||||
import org.apache.doris.nereids.rules.exploration.join.JoinLeftAssociative;
|
||||
import org.apache.doris.nereids.rules.implementation.LogicalFilterToPhysicalFilter;
|
||||
@ -35,10 +34,6 @@ import java.util.List;
|
||||
* Containers for set of different type rules.
|
||||
*/
|
||||
public class RuleSet {
|
||||
public static final List<Rule<Plan>> ANALYSIS_RULES = planRuleFactories()
|
||||
.add(new BindRelation())
|
||||
.build();
|
||||
|
||||
public static final List<Rule<Plan>> EXPLORATION_RULES = planRuleFactories()
|
||||
.add(new JoinCommutative(false))
|
||||
.add(new JoinLeftAssociative())
|
||||
@ -50,10 +45,6 @@ public class RuleSet {
|
||||
.add(new LogicalFilterToPhysicalFilter())
|
||||
.build();
|
||||
|
||||
public List<Rule<Plan>> getAnalysisRules() {
|
||||
return ANALYSIS_RULES;
|
||||
}
|
||||
|
||||
public List<Rule<Plan>> getExplorationRules() {
|
||||
return EXPLORATION_RULES;
|
||||
}
|
||||
@ -62,11 +53,14 @@ public class RuleSet {
|
||||
return IMPLEMENTATION_RULES;
|
||||
}
|
||||
|
||||
private static RuleFactories<Plan> planRuleFactories() {
|
||||
public static RuleFactories<Plan> planRuleFactories() {
|
||||
return new RuleFactories();
|
||||
}
|
||||
|
||||
private static class RuleFactories<TYPE extends TreeNode<TYPE>> {
|
||||
/**
|
||||
* generate rule factories.
|
||||
*/
|
||||
public static class RuleFactories<TYPE extends TreeNode<TYPE>> {
|
||||
final Builder<Rule<TYPE>> rules = ImmutableList.builder();
|
||||
|
||||
public RuleFactories<TYPE> add(RuleFactory<TYPE> ruleFactory) {
|
||||
|
||||
@ -29,7 +29,7 @@ public class LogicalJoinToHashJoin extends OneImplementationRuleFactory {
|
||||
@Override
|
||||
public Rule<Plan> build() {
|
||||
return logicalJoin().then(join -> plan(
|
||||
new PhysicalHashJoin(join.operator.getJoinType(), join.operator.getCondition().get()),
|
||||
new PhysicalHashJoin(join.operator.getJoinType(), join.operator.getCondition()),
|
||||
join.getLogicalProperties(),
|
||||
join.left(), join.right()
|
||||
)).toRule(RuleType.LOGICAL_JOIN_TO_HASH_JOIN_RULE);
|
||||
|
||||
@ -207,7 +207,7 @@ public class PhysicalPlanTranslator extends PlanOperatorVisitor<PlanFragment, Pl
|
||||
PlanFragment leftFragment = visit(hashJoin.child(0), context);
|
||||
PlanFragment rightFragment = visit(hashJoin.child(0), context);
|
||||
PhysicalHashJoin physicalHashJoin = hashJoin.getOperator();
|
||||
Expression predicateExpr = physicalHashJoin.getCondition();
|
||||
Expression predicateExpr = physicalHashJoin.getCondition().get();
|
||||
List<Expression> eqExprList = Utils.getEqConjuncts(hashJoin.child(0).getOutput(),
|
||||
hashJoin.child(1).getOutput(), predicateExpr);
|
||||
JoinType joinType = physicalHashJoin.getJoinType();
|
||||
|
||||
Reference in New Issue
Block a user