[feature](Nereids): cost and enforcer job in cascades. (#10657)
Issue Number: close #9640 Add enforcer job for cascades. Inspired by to *NoisePage enforcer job*, and *ORCA paper* During this period, we will derive physical property for plan tree, and prune the plan according to the cos.
This commit is contained in:
@ -40,14 +40,14 @@ public class CostCalculator {
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
public double calculateCost(GroupExpression groupExpression) {
|
||||
public static double calculateCost(GroupExpression groupExpression) {
|
||||
PlanContext planContext = new PlanContext(groupExpression);
|
||||
CostEstimator costCalculator = new CostEstimator();
|
||||
CostEstimate costEstimate = groupExpression.getOperator().accept(costCalculator, planContext);
|
||||
return costFormula(costEstimate);
|
||||
}
|
||||
|
||||
private double costFormula(CostEstimate costEstimate) {
|
||||
private static double costFormula(CostEstimate costEstimate) {
|
||||
double cpuCostWeight = 1;
|
||||
double memoryCostWeight = 1;
|
||||
double networkCostWeight = 1;
|
||||
|
||||
@ -26,7 +26,7 @@ import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
public class JobContext {
|
||||
private final PlannerContext plannerContext;
|
||||
private final PhysicalProperties requiredProperties;
|
||||
private final double costUpperBound;
|
||||
private double costUpperBound;
|
||||
|
||||
public JobContext(PlannerContext plannerContext, PhysicalProperties requiredProperties, double costUpperBound) {
|
||||
this.plannerContext = plannerContext;
|
||||
@ -45,4 +45,8 @@ public class JobContext {
|
||||
public double getCostUpperBound() {
|
||||
return costUpperBound;
|
||||
}
|
||||
|
||||
public void setCostUpperBound(double costUpperBound) {
|
||||
this.costUpperBound = costUpperBound;
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,18 +17,49 @@
|
||||
|
||||
package org.apache.doris.nereids.jobs.cascades;
|
||||
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.nereids.PlanContext;
|
||||
import org.apache.doris.nereids.cost.CostCalculator;
|
||||
import org.apache.doris.nereids.jobs.Job;
|
||||
import org.apache.doris.nereids.jobs.JobContext;
|
||||
import org.apache.doris.nereids.jobs.JobType;
|
||||
import org.apache.doris.nereids.memo.Group;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.properties.ChildrenOutputPropertyDeriver;
|
||||
import org.apache.doris.nereids.properties.EnforceMissingPropertiesHelper;
|
||||
import org.apache.doris.nereids.properties.ParentRequiredPropertyDeriver;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Job to compute cost and add enforcer.
|
||||
*/
|
||||
public class CostAndEnforcerJob extends Job<Plan> {
|
||||
// GroupExpression to optimize
|
||||
private final GroupExpression groupExpression;
|
||||
// Current total cost
|
||||
private double curTotalCost;
|
||||
|
||||
// Properties from parent plan node.
|
||||
// Like: Physical Hash Join
|
||||
// [ [Properties ["", ANY], Properties ["", BROADCAST]],
|
||||
// [Properties ["", SHUFFLE_JOIN], Properties ["", SHUFFLE_JOIN]] ]
|
||||
private List<List<PhysicalProperties>> propertiesListList;
|
||||
|
||||
private List<GroupExpression> childrenBestGroupExprList;
|
||||
private List<PhysicalProperties> childrenOutputProperties = Lists.newArrayList();
|
||||
|
||||
// Current stage of enumeration through child groups
|
||||
private int curChildIndex = -1;
|
||||
// Indicator of last child group that we waited for optimization
|
||||
private int prevChildIndex = -1;
|
||||
// Current stage of enumeration through outputInputProperties
|
||||
private int curPropertyPairIndex = 0;
|
||||
|
||||
public CostAndEnforcerJob(GroupExpression groupExpression, JobContext context) {
|
||||
super(JobType.OPTIMIZE_CHILDREN, context);
|
||||
@ -47,4 +78,162 @@ public class CostAndEnforcerJob extends Job<Plan> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* execute.
|
||||
*/
|
||||
public void execute1() {
|
||||
// Do init logic of root operator/groupExpr of `subplan`, only run once per task.
|
||||
if (curChildIndex != -1) {
|
||||
curTotalCost = 0;
|
||||
|
||||
// Get property from groupExpression operator (it's root of subplan).
|
||||
ParentRequiredPropertyDeriver parentRequiredPropertyDeriver = new ParentRequiredPropertyDeriver(context);
|
||||
propertiesListList = parentRequiredPropertyDeriver.getRequiredPropertyListList(groupExpression);
|
||||
|
||||
curChildIndex = 0;
|
||||
}
|
||||
|
||||
for (; curPropertyPairIndex < propertiesListList.size(); curPropertyPairIndex++) {
|
||||
// children input properties
|
||||
List<PhysicalProperties> childrenInputProperties = propertiesListList.get(curPropertyPairIndex);
|
||||
|
||||
// Calculate cost of groupExpression and update total cost
|
||||
if (curChildIndex == 0 && prevChildIndex == -1) {
|
||||
curTotalCost += CostCalculator.calculateCost(groupExpression);
|
||||
}
|
||||
|
||||
for (; curChildIndex < groupExpression.arity(); curChildIndex++) {
|
||||
PhysicalProperties childInputProperties = childrenInputProperties.get(curChildIndex);
|
||||
Group childGroup = groupExpression.child(curChildIndex);
|
||||
|
||||
// Whether the child group was optimized for this childInputProperties according to
|
||||
// the result of returning.
|
||||
Optional<Pair<Double, GroupExpression>> lowestCostPlanOpt = childGroup.getLowestCostPlan(
|
||||
childInputProperties);
|
||||
|
||||
if (!lowestCostPlanOpt.isPresent()) {
|
||||
// The child should be pruned due to cost prune.
|
||||
if (prevChildIndex >= curChildIndex) {
|
||||
break;
|
||||
}
|
||||
|
||||
// This child isn't optimized, create new tasks to optimize it.
|
||||
// Meaning that optimize recursively by derive tasks.
|
||||
prevChildIndex = curChildIndex;
|
||||
pushTask((CostAndEnforcerJob) clone());
|
||||
double newCostUpperBound = context.getCostUpperBound() - curTotalCost;
|
||||
JobContext jobContext = new JobContext(context.getPlannerContext(), childInputProperties,
|
||||
newCostUpperBound);
|
||||
pushTask(new OptimizeGroupJob(childGroup, jobContext));
|
||||
return;
|
||||
}
|
||||
|
||||
GroupExpression lowestCostExpr = lowestCostPlanOpt.get().second;
|
||||
|
||||
PhysicalProperties childOutputProperty = lowestCostExpr.getPropertyFromMap(childInputProperties);
|
||||
// TODO: maybe need to record children lowestCostExpr
|
||||
childrenInputProperties.set(curChildIndex, childOutputProperty);
|
||||
|
||||
// todo: check whether split agg broadcast row count limit.
|
||||
|
||||
curTotalCost += lowestCostExpr.getLowestCostTable().get(childInputProperties).first;
|
||||
if (curTotalCost > context.getCostUpperBound()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// When we successfully optimize all child group, it's last child.
|
||||
if (curChildIndex == groupExpression.arity()) {
|
||||
// Not need to do pruning here because it has been done when we get the
|
||||
// best expr from the child group
|
||||
|
||||
// TODO: it could update the cost.
|
||||
PhysicalProperties outputProperty = ChildrenOutputPropertyDeriver.getProperties(
|
||||
context.getRequiredProperties(),
|
||||
childrenOutputProperties, groupExpression);
|
||||
|
||||
if (curTotalCost > context.getCostUpperBound()) {
|
||||
break;
|
||||
}
|
||||
|
||||
/* update current group statistics and re-compute costs. */
|
||||
if (groupExpression.children().stream().anyMatch(group -> group.getStatistics() != null)) {
|
||||
return;
|
||||
}
|
||||
PlanContext planContext = new PlanContext(groupExpression);
|
||||
// TODO: calculate stats.
|
||||
groupExpression.getParent().setStatistics(planContext.getStatistics());
|
||||
|
||||
enforce(outputProperty, childrenInputProperties);
|
||||
}
|
||||
|
||||
// Reset child idx and total cost
|
||||
prevChildIndex = -1;
|
||||
curChildIndex = 0;
|
||||
curTotalCost = 0;
|
||||
}
|
||||
}
|
||||
|
||||
private void enforce(PhysicalProperties outputProperty, List<PhysicalProperties> inputProperties) {
|
||||
|
||||
// groupExpression can satisfy its own output property
|
||||
putProperty(groupExpression, outputProperty, outputProperty, inputProperties);
|
||||
// groupExpression can satisfy the ANY type output property
|
||||
putProperty(groupExpression, outputProperty, new PhysicalProperties(), inputProperties);
|
||||
|
||||
EnforceMissingPropertiesHelper enforceMissingPropertiesHelper = new EnforceMissingPropertiesHelper(context,
|
||||
groupExpression, curTotalCost);
|
||||
|
||||
PhysicalProperties requiredProperties = context.getRequiredProperties();
|
||||
if (outputProperty.meet(requiredProperties)) {
|
||||
Pair<PhysicalProperties, Double> pair = enforceMissingPropertiesHelper.enforceProperty(outputProperty,
|
||||
requiredProperties);
|
||||
PhysicalProperties addEnforcedProperty = pair.first;
|
||||
curTotalCost = pair.second;
|
||||
|
||||
// enforcedProperty is superset of requiredProperty
|
||||
if (!addEnforcedProperty.equals(requiredProperties)) {
|
||||
putProperty(groupExpression.getParent().getBestExpression(addEnforcedProperty),
|
||||
requiredProperties, requiredProperties, Lists.newArrayList(outputProperty));
|
||||
}
|
||||
} else {
|
||||
if (!outputProperty.equals(requiredProperties)) {
|
||||
putProperty(groupExpression, outputProperty, requiredProperties, inputProperties);
|
||||
}
|
||||
}
|
||||
|
||||
if (curTotalCost < context.getCostUpperBound()) {
|
||||
context.setCostUpperBound(curTotalCost);
|
||||
}
|
||||
}
|
||||
|
||||
private void putProperty(GroupExpression groupExpression,
|
||||
PhysicalProperties outputProperty,
|
||||
PhysicalProperties requiredProperty,
|
||||
List<PhysicalProperties> inputProperties) {
|
||||
if (groupExpression.updateLowestCostTable(requiredProperty, inputProperties, curTotalCost)) {
|
||||
// Each group expression need to record the outputProperty satisfy what requiredProperty,
|
||||
// because group expression can generate multi outputProperty. eg. Join may have shuffle local
|
||||
// and shuffle join two types outputProperty.
|
||||
groupExpression.putOutputPropertiesMap(outputProperty, requiredProperty);
|
||||
}
|
||||
this.groupExpression.getParent().setBestPlan(groupExpression,
|
||||
curTotalCost, requiredProperty);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Shallow clone (ignore clone propertiesListList and groupExpression).
|
||||
*/
|
||||
@Override
|
||||
public Object clone() {
|
||||
CostAndEnforcerJob task;
|
||||
try {
|
||||
task = (CostAndEnforcerJob) super.clone();
|
||||
} catch (CloneNotSupportedException ignored) {
|
||||
return null;
|
||||
}
|
||||
return task;
|
||||
}
|
||||
}
|
||||
|
||||
@ -24,6 +24,7 @@ import org.apache.doris.nereids.properties.LogicalProperties;
|
||||
import org.apache.doris.nereids.properties.PhysicalProperties;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
|
||||
import org.apache.doris.statistics.StatsDeriveResult;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
@ -50,6 +51,7 @@ public class Group {
|
||||
private double costLowerBound = -1;
|
||||
private boolean isExplored = false;
|
||||
private boolean hasCost = false;
|
||||
private StatsDeriveResult statistics;
|
||||
|
||||
/**
|
||||
* Constructor for Group.
|
||||
@ -135,6 +137,35 @@ public class Group {
|
||||
this.costLowerBound = costLowerBound;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set or update lowestCostPlans: properties --> new Pair<>(cost, expression)
|
||||
*/
|
||||
public void setBestPlan(GroupExpression expression, double cost, PhysicalProperties properties) {
|
||||
if (lowestCostPlans.containsKey(properties)) {
|
||||
if (lowestCostPlans.get(properties).first > cost) {
|
||||
lowestCostPlans.put(properties, new Pair<>(cost, expression));
|
||||
}
|
||||
} else {
|
||||
lowestCostPlans.put(properties, new Pair<>(cost, expression));
|
||||
}
|
||||
}
|
||||
|
||||
public GroupExpression getBestExpression(PhysicalProperties properties) {
|
||||
if (lowestCostPlans.containsKey(properties)) {
|
||||
return lowestCostPlans.get(properties).second;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public StatsDeriveResult getStatistics() {
|
||||
return statistics;
|
||||
}
|
||||
|
||||
public void setStatistics(StatsDeriveResult statistics) {
|
||||
this.statistics = statistics;
|
||||
}
|
||||
|
||||
|
||||
public List<GroupExpression> getLogicalExpressions() {
|
||||
return logicalExpressions;
|
||||
}
|
||||
|
||||
@ -44,6 +44,8 @@ public class GroupExpression {
|
||||
|
||||
// Mapping from output properties to the corresponding best cost, statistics, and child properties.
|
||||
private final Map<PhysicalProperties, Pair<Double, List<PhysicalProperties>>> lowestCostTable;
|
||||
// Each physical group expression maintains mapping incoming requests to the corresponding child requests.
|
||||
private final Map<PhysicalProperties, PhysicalProperties> requestPropertiesMap;
|
||||
|
||||
public GroupExpression(Operator op) {
|
||||
this(op, Lists.newArrayList());
|
||||
@ -61,6 +63,14 @@ public class GroupExpression {
|
||||
this.ruleMasks = new BitSet(RuleType.SENTINEL.ordinal());
|
||||
this.statDerived = false;
|
||||
this.lowestCostTable = Maps.newHashMap();
|
||||
this.requestPropertiesMap = Maps.newHashMap();
|
||||
}
|
||||
|
||||
// TODO: rename
|
||||
public PhysicalProperties getPropertyFromMap(PhysicalProperties requiredPropertySet) {
|
||||
PhysicalProperties outputProperty = requestPropertiesMap.get(requiredPropertySet);
|
||||
Preconditions.checkState(outputProperty != null);
|
||||
return outputProperty;
|
||||
}
|
||||
|
||||
public int arity() {
|
||||
@ -124,6 +134,30 @@ public class GroupExpression {
|
||||
return lowestCostTable.get(require).second;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a (parentOutputProperties) -> (cost, childrenInputProperties) in lowestCostTable.
|
||||
*/
|
||||
public boolean updateLowestCostTable(
|
||||
PhysicalProperties parentOutputProperties,
|
||||
List<PhysicalProperties> childrenInputProperties,
|
||||
double cost) {
|
||||
if (lowestCostTable.containsKey(parentOutputProperties)) {
|
||||
if (lowestCostTable.get(parentOutputProperties).first > cost) {
|
||||
lowestCostTable.put(parentOutputProperties, new Pair<>(cost, childrenInputProperties));
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
lowestCostTable.put(parentOutputProperties, new Pair<>(cost, childrenInputProperties));
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public void putOutputPropertiesMap(PhysicalProperties outputPropertySet,
|
||||
PhysicalProperties requiredPropertySet) {
|
||||
this.requestPropertiesMap.put(requiredPropertySet, outputPropertySet);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
|
||||
@ -193,4 +193,11 @@ public class Memo {
|
||||
}
|
||||
groups.remove(source);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add enforcer expression into the target group.
|
||||
*/
|
||||
public void addEnforcerPlan(GroupExpression groupExpression, Group group) {
|
||||
groupExpression.setParent(group);
|
||||
}
|
||||
}
|
||||
|
||||
@ -23,9 +23,9 @@ package org.apache.doris.nereids.operators;
|
||||
* 1. ANY: match any operator
|
||||
* 2. MULTI: match multiple operators
|
||||
* 3. FIXED: the leaf node of pattern tree, which can be matched by a single operator
|
||||
* but this operator cannot be used in rules
|
||||
* but this operator cannot be used in rules
|
||||
* 4. MULTI_FIXED: the leaf node of pattern tree, which can be matched by multiple operators,
|
||||
* but these operators cannot be used in rules
|
||||
* but these operators cannot be used in rules
|
||||
*/
|
||||
public enum OperatorType {
|
||||
UNKNOWN,
|
||||
@ -48,5 +48,6 @@ public enum OperatorType {
|
||||
PHYSICAL_AGGREGATION,
|
||||
PHYSICAL_SORT,
|
||||
PHYSICAL_HASH_JOIN,
|
||||
PHYSICAL_EXCHANGE;
|
||||
PHYSICAL_EXCHANGE,
|
||||
PHYSICAL_DISTRIBUTION;
|
||||
}
|
||||
|
||||
@ -0,0 +1,42 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.operators.plans.physical;
|
||||
|
||||
import org.apache.doris.nereids.operators.OperatorType;
|
||||
import org.apache.doris.nereids.properties.DistributionSpec;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Enforcer operator.
|
||||
*/
|
||||
public class PhysicalDistribution extends PhysicalUnaryOperator {
|
||||
|
||||
protected DistributionSpec distributionSpec;
|
||||
|
||||
|
||||
public PhysicalDistribution(DistributionSpec spec) {
|
||||
super(OperatorType.PHYSICAL_DISTRIBUTION);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Expression> getExpressions() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,74 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.properties;
|
||||
|
||||
import org.apache.doris.nereids.PlanContext;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.operators.Operator;
|
||||
import org.apache.doris.nereids.operators.OperatorVisitor;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Used for property drive.
|
||||
*/
|
||||
public class ChildrenOutputPropertyDeriver extends OperatorVisitor<PhysicalProperties, PlanContext> {
|
||||
PhysicalProperties requirements;
|
||||
List<PhysicalProperties> childrenOutputProperties;
|
||||
|
||||
public ChildrenOutputPropertyDeriver(PhysicalProperties requirements,
|
||||
List<PhysicalProperties> childrenOutputProperties) {
|
||||
this.childrenOutputProperties = childrenOutputProperties;
|
||||
this.requirements = requirements;
|
||||
}
|
||||
|
||||
public static PhysicalProperties getProperties(
|
||||
PhysicalProperties requirements,
|
||||
List<PhysicalProperties> childrenOutputProperties,
|
||||
GroupExpression groupExpression) {
|
||||
|
||||
ChildrenOutputPropertyDeriver childrenOutputPropertyDeriver = new ChildrenOutputPropertyDeriver(requirements,
|
||||
childrenOutputProperties);
|
||||
|
||||
return groupExpression.getOperator().accept(childrenOutputPropertyDeriver, new PlanContext(groupExpression));
|
||||
}
|
||||
|
||||
public PhysicalProperties getRequirements() {
|
||||
return requirements;
|
||||
}
|
||||
|
||||
// public List<List<PhysicalProperties>> getProperties(GroupExpression groupExpression) {
|
||||
// properties = Lists.newArrayList();
|
||||
// groupExpression.getOperator().accept(this, new PlanContext(groupExpression));
|
||||
// return properties;
|
||||
// }
|
||||
|
||||
// @Override
|
||||
// public Void visitOperator(Operator operator, PlanContext context) {
|
||||
// List<PhysicalProperties> props = Lists.newArrayList();
|
||||
// for (int childIndex = 0; childIndex < context.getGroupExpression().arity(); ++childIndex) {
|
||||
// props.add(new PhysicalProperties());
|
||||
// }
|
||||
// properties.add(props);
|
||||
// return null;
|
||||
// }
|
||||
@Override
|
||||
public PhysicalProperties visitOperator(Operator node, PlanContext context) {
|
||||
return new PhysicalProperties();
|
||||
}
|
||||
}
|
||||
@ -17,15 +17,21 @@
|
||||
|
||||
package org.apache.doris.nereids.properties;
|
||||
|
||||
import org.apache.doris.nereids.memo.Group;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.operators.plans.physical.PhysicalDistribution;
|
||||
import org.apache.doris.planner.DataPartition;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* Base class for data distribution.
|
||||
* Spec of data distribution.
|
||||
*/
|
||||
public class DistributionSpec {
|
||||
|
||||
private DataPartition dataPartition;
|
||||
|
||||
// TODO: why exist?
|
||||
public DistributionSpec() {
|
||||
}
|
||||
|
||||
@ -33,6 +39,16 @@ public class DistributionSpec {
|
||||
this.dataPartition = dataPartition;
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO: need read ORCA.
|
||||
* Whether other `DistributionSpec` is satisfied the current `DistributionSpec`.
|
||||
*
|
||||
* @param other another DistributionSpec.
|
||||
*/
|
||||
public boolean meet(DistributionSpec other) {
|
||||
return false;
|
||||
}
|
||||
|
||||
public DataPartition getDataPartition() {
|
||||
return dataPartition;
|
||||
}
|
||||
@ -40,4 +56,15 @@ public class DistributionSpec {
|
||||
public void setDataPartition(DataPartition dataPartition) {
|
||||
this.dataPartition = dataPartition;
|
||||
}
|
||||
|
||||
public GroupExpression addEnforcer(Group child) {
|
||||
return new GroupExpression(new PhysicalDistribution(new DistributionSpec(dataPartition)),
|
||||
Lists.newArrayList(child));
|
||||
}
|
||||
|
||||
// TODO
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
return super.equals(obj);
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,125 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
|
||||
package org.apache.doris.nereids.properties;
|
||||
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.nereids.cost.CostCalculator;
|
||||
import org.apache.doris.nereids.jobs.JobContext;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* When parent node request some properties but children don't have.
|
||||
* Enforce add missing properties for child.
|
||||
*/
|
||||
public class EnforceMissingPropertiesHelper {
|
||||
|
||||
private JobContext context;
|
||||
private GroupExpression groupExpression;
|
||||
private double curTotalCost;
|
||||
|
||||
public EnforceMissingPropertiesHelper(JobContext context, GroupExpression groupExpression,
|
||||
double curTotalCost) {
|
||||
this.context = context;
|
||||
this.groupExpression = groupExpression;
|
||||
this.curTotalCost = curTotalCost;
|
||||
}
|
||||
|
||||
/**
|
||||
* Enforce missing property.
|
||||
*/
|
||||
public Pair<PhysicalProperties, Double> enforceProperty(PhysicalProperties output, PhysicalProperties required) {
|
||||
boolean isMeetOrder = output.getOrderSpec().meet(required.getOrderSpec());
|
||||
boolean isMeetDistribution = output.getDistributionSpec().meet(required.getDistributionSpec());
|
||||
|
||||
if (!isMeetDistribution && !isMeetOrder) {
|
||||
return new Pair<>(enforceSortAndDistribution(output, required), curTotalCost);
|
||||
} else if (isMeetDistribution && isMeetOrder) {
|
||||
return new Pair<>(null, curTotalCost);
|
||||
} else if (!isMeetDistribution) {
|
||||
if (required.getOrderSpec().getOrderKeys().isEmpty()) {
|
||||
return new Pair<>(enforceDistribution(output), curTotalCost);
|
||||
} else {
|
||||
// TODO
|
||||
// It's wrong that SortSpec is empty.
|
||||
// After redistribute data , original order requirement may be wrong. Need enforce "SortNode" here.
|
||||
// PhysicalProperties newProperty =
|
||||
// new PhysicalProperties(new DistributionSpec(), new OrderSpec(Lists.newArrayList()));
|
||||
// groupExpression.getParent().
|
||||
// return enforceSortAndDistribution(newProperty, required);
|
||||
return new Pair<>(enforceDistribution(output), curTotalCost);
|
||||
}
|
||||
} else {
|
||||
return new Pair<>(enforceSort(output), curTotalCost);
|
||||
}
|
||||
}
|
||||
|
||||
private PhysicalProperties enforceSort(PhysicalProperties oldOutputProperty) {
|
||||
// clone
|
||||
PhysicalProperties newOutputProperty = new PhysicalProperties(oldOutputProperty.getDistributionSpec(),
|
||||
oldOutputProperty.getOrderSpec());
|
||||
newOutputProperty.setOrderSpec(context.getRequiredProperties().getOrderSpec());
|
||||
GroupExpression enforcer =
|
||||
context.getRequiredProperties().getOrderSpec().addEnforcer(groupExpression.getParent());
|
||||
|
||||
updateCostWithEnforcer(enforcer, oldOutputProperty, newOutputProperty);
|
||||
|
||||
return newOutputProperty;
|
||||
}
|
||||
|
||||
private PhysicalProperties enforceDistribution(PhysicalProperties oldOutputProperty) {
|
||||
PhysicalProperties newOutputProperty = new PhysicalProperties(oldOutputProperty.getDistributionSpec(),
|
||||
oldOutputProperty.getOrderSpec());
|
||||
newOutputProperty.setDistributionSpec(context.getRequiredProperties().getDistributionSpec());
|
||||
GroupExpression enforcer =
|
||||
context.getRequiredProperties().getDistributionSpec().addEnforcer(groupExpression.getParent());
|
||||
|
||||
updateCostWithEnforcer(enforcer, oldOutputProperty, newOutputProperty);
|
||||
|
||||
return newOutputProperty;
|
||||
}
|
||||
|
||||
private void updateCostWithEnforcer(GroupExpression enforcer,
|
||||
PhysicalProperties oldOutputProperty,
|
||||
PhysicalProperties newOutputProperty) {
|
||||
context.getPlannerContext().getMemo().addEnforcerPlan(enforcer, groupExpression.getParent());
|
||||
curTotalCost += CostCalculator.calculateCost(enforcer);
|
||||
|
||||
if (enforcer.updateLowestCostTable(newOutputProperty, Lists.newArrayList(oldOutputProperty), curTotalCost)) {
|
||||
enforcer.putOutputPropertiesMap(newOutputProperty, newOutputProperty);
|
||||
}
|
||||
groupExpression.getParent().setBestPlan(enforcer, curTotalCost, newOutputProperty);
|
||||
}
|
||||
|
||||
private PhysicalProperties enforceSortAndDistribution(PhysicalProperties outputProperty,
|
||||
PhysicalProperties requiredProperty) {
|
||||
PhysicalProperties enforcedProperty;
|
||||
if (requiredProperty.getDistributionSpec()
|
||||
.equals(new GatherDistributionSpec())) {
|
||||
enforcedProperty = enforceSort(outputProperty);
|
||||
enforcedProperty = enforceDistribution(enforcedProperty);
|
||||
} else {
|
||||
enforcedProperty = enforceDistribution(outputProperty);
|
||||
enforcedProperty = enforceSort(enforcedProperty);
|
||||
}
|
||||
|
||||
return enforcedProperty;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,28 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.properties;
|
||||
|
||||
/**
|
||||
* Re-shuffle.
|
||||
*/
|
||||
public class GatherDistributionSpec extends DistributionSpec {
|
||||
|
||||
public GatherDistributionSpec() {
|
||||
super();
|
||||
}
|
||||
}
|
||||
@ -42,6 +42,15 @@ public class OrderKey {
|
||||
this.nullFirst = nullFirst;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether other `OrderKey` is satisfied the current `OrderKey`.
|
||||
*
|
||||
* @param other another OrderKey.
|
||||
*/
|
||||
public boolean matches(OrderKey other) {
|
||||
return expr.equals(other.expr) && isAsc == other.isAsc && nullFirst == other.nullFirst;
|
||||
}
|
||||
|
||||
public Expression getExpr() {
|
||||
return expr;
|
||||
}
|
||||
|
||||
@ -0,0 +1,65 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.properties;
|
||||
|
||||
import org.apache.doris.nereids.memo.Group;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.operators.plans.physical.PhysicalHeapSort;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Spec of sort order.
|
||||
*/
|
||||
public class OrderSpec {
|
||||
private final List<OrderKey> orderKeys;
|
||||
|
||||
public OrderSpec(List<OrderKey> orderKeys) {
|
||||
this.orderKeys = orderKeys;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether other `OrderSpec` is satisfied the current `OrderSpec`.
|
||||
*
|
||||
* @param other another OrderSpec.
|
||||
*/
|
||||
public boolean meet(OrderSpec other) {
|
||||
if (this.orderKeys.size() < other.getOrderKeys().size()) {
|
||||
return false;
|
||||
}
|
||||
for (int i = 0; i < other.getOrderKeys().size(); ++i) {
|
||||
if (!this.orderKeys.get(i).matches(other.getOrderKeys().get(i))) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public GroupExpression addEnforcer(Group child) {
|
||||
return new GroupExpression(
|
||||
new PhysicalHeapSort(orderKeys, -1, 0),
|
||||
Lists.newArrayList(child)
|
||||
);
|
||||
}
|
||||
|
||||
public List<OrderKey> getOrderKeys() {
|
||||
return orderKeys;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,58 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.properties;
|
||||
|
||||
import org.apache.doris.nereids.PlanContext;
|
||||
import org.apache.doris.nereids.jobs.JobContext;
|
||||
import org.apache.doris.nereids.memo.GroupExpression;
|
||||
import org.apache.doris.nereids.operators.Operator;
|
||||
import org.apache.doris.nereids.operators.OperatorVisitor;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Used for parent property drive.
|
||||
*/
|
||||
public class ParentRequiredPropertyDeriver extends OperatorVisitor<Void, PlanContext> {
|
||||
|
||||
PhysicalProperties requestPropertyFromParent;
|
||||
List<List<PhysicalProperties>> requiredPropertyListList;
|
||||
|
||||
public ParentRequiredPropertyDeriver(JobContext context) {
|
||||
this.requestPropertyFromParent = context.getRequiredProperties();
|
||||
}
|
||||
|
||||
public List<List<PhysicalProperties>> getRequiredPropertyListList(GroupExpression groupExpression) {
|
||||
requiredPropertyListList = Lists.newArrayList();
|
||||
groupExpression.getOperator().accept(this, new PlanContext(groupExpression));
|
||||
return requiredPropertyListList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void visitOperator(Operator operator, PlanContext context) {
|
||||
List<PhysicalProperties> requiredPropertyList = Lists.newArrayList();
|
||||
for (int i = 0; i < context.getGroupExpression().arity(); i++) {
|
||||
requiredPropertyList.add(new PhysicalProperties());
|
||||
}
|
||||
requiredPropertyListList.add(requiredPropertyList);
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
@ -19,17 +19,40 @@ package org.apache.doris.nereids.properties;
|
||||
|
||||
/**
|
||||
* Physical properties used in cascades.
|
||||
* TODO(wj): Do we need to `PhysicalPropertySpec` Interface like NoisePage?
|
||||
*/
|
||||
public class PhysicalProperties {
|
||||
private DistributionSpec distributionDesc;
|
||||
private OrderSpec orderSpec;
|
||||
|
||||
public PhysicalProperties() {}
|
||||
private DistributionSpec distributionSpec;
|
||||
|
||||
public DistributionSpec getDistributionDesc() {
|
||||
return distributionDesc;
|
||||
public PhysicalProperties() {
|
||||
}
|
||||
|
||||
public void setDistributionDesc(DistributionSpec distributionDesc) {
|
||||
this.distributionDesc = distributionDesc;
|
||||
public PhysicalProperties(DistributionSpec distributionSpec, OrderSpec orderSpec) {
|
||||
this.distributionSpec = distributionSpec;
|
||||
this.orderSpec = orderSpec;
|
||||
}
|
||||
|
||||
public boolean meet(PhysicalProperties other) {
|
||||
// TODO: handle distributionSpec meet()
|
||||
return orderSpec.meet(other.orderSpec) && distributionSpec.meet(other.distributionSpec);
|
||||
}
|
||||
|
||||
|
||||
public OrderSpec getOrderSpec() {
|
||||
return orderSpec;
|
||||
}
|
||||
|
||||
public void setOrderSpec(OrderSpec orderSpec) {
|
||||
this.orderSpec = orderSpec;
|
||||
}
|
||||
|
||||
public DistributionSpec getDistributionSpec() {
|
||||
return distributionSpec;
|
||||
}
|
||||
|
||||
public void setDistributionSpec(DistributionSpec distributionSpec) {
|
||||
this.distributionSpec = distributionSpec;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user