[refactor](planner): improve enfocer job. (#11922)

- handle enforce distribution when meet sort.
- calculate stats in enforcer job
- refactor calculate stats.
This commit is contained in:
jakevin
2022-08-19 17:55:43 +08:00
committed by GitHub
parent 788114c89c
commit 1a8a889d56
8 changed files with 123 additions and 109 deletions

View File

@ -18,7 +18,6 @@
package org.apache.doris.nereids.jobs.cascades;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.PlanContext;
import org.apache.doris.nereids.cost.CostCalculator;
import org.apache.doris.nereids.jobs.Job;
import org.apache.doris.nereids.jobs.JobContext;
@ -29,6 +28,7 @@ import org.apache.doris.nereids.properties.ChildOutputPropertyDeriver;
import org.apache.doris.nereids.properties.EnforceMissingPropertiesHelper;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.properties.RequestPropertyDeriver;
import org.apache.doris.nereids.stats.StatsCalculator;
import com.google.common.collect.Lists;
@ -162,22 +162,24 @@ public class CostAndEnforcerJob extends Job implements Cloneable {
// Not need to do pruning here because it has been done when we get the
// best expr from the child group
// TODO: it could update the cost.
PhysicalProperties outputProperty = ChildOutputPropertyDeriver.getProperties(
context.getRequiredProperties(),
childrenOutputProperty, groupExpression);
ChildOutputPropertyDeriver childOutputPropertyDeriver = new ChildOutputPropertyDeriver(
context.getRequiredProperties(), childrenOutputProperty, curTotalCost);
PhysicalProperties outputProperty = childOutputPropertyDeriver.getOutputProperties(groupExpression);
curTotalCost = childOutputPropertyDeriver.getCurTotalCost();
if (curTotalCost > context.getCostUpperBound()) {
break;
}
/* update current group statistics and re-compute costs. */
// update current group statistics and re-compute costs.
if (groupExpression.children().stream().anyMatch(group -> group.getStatistics() == null)) {
return;
}
PlanContext planContext = new PlanContext(groupExpression);
// TODO: calculate stats. ??????
groupExpression.getOwnerGroup().setStatistics(planContext.getStatisticsWithCheck());
StatsCalculator.estimate(groupExpression);
// record map { outputProperty -> outputProperty }, { ANY -> outputProperty },
recordPropertyAndCost(groupExpression, outputProperty, outputProperty, requestChildrenProperty);
recordPropertyAndCost(groupExpression, outputProperty, PhysicalProperties.ANY, requestChildrenProperty);
enforce(outputProperty, requestChildrenProperty);
@ -194,47 +196,38 @@ public class CostAndEnforcerJob extends Job implements Cloneable {
}
}
private void enforce(PhysicalProperties outputProperty, List<PhysicalProperties> childrenInputProperties) {
// groupExpression can satisfy its own output property
putProperty(groupExpression, outputProperty, outputProperty, childrenInputProperties);
// groupExpression can satisfy the ANY type output property
putProperty(groupExpression, outputProperty, PhysicalProperties.ANY, childrenInputProperties);
private void enforce(PhysicalProperties outputProperty, List<PhysicalProperties> requestChildrenProperty) {
EnforceMissingPropertiesHelper enforceMissingPropertiesHelper = new EnforceMissingPropertiesHelper(context,
groupExpression, curTotalCost);
PhysicalProperties requestedProperties = context.getRequiredProperties();
if (!outputProperty.satisfy(requestedProperties)) {
Pair<PhysicalProperties, Double> pair = enforceMissingPropertiesHelper.enforceProperty(outputProperty,
PhysicalProperties addEnforcedProperty = enforceMissingPropertiesHelper.enforceProperty(outputProperty,
requestedProperties);
PhysicalProperties addEnforcedProperty = pair.first;
curTotalCost = pair.second;
curTotalCost = enforceMissingPropertiesHelper.getCurTotalCost();
// enforcedProperty is superset of requiredProperty
if (!addEnforcedProperty.equals(requestedProperties)) {
putProperty(groupExpression.getOwnerGroup().getBestExpression(addEnforcedProperty),
recordPropertyAndCost(groupExpression.getOwnerGroup().getBestPlan(addEnforcedProperty),
requestedProperties, requestedProperties, Lists.newArrayList(outputProperty));
}
} else {
if (!outputProperty.equals(requestedProperties)) {
putProperty(groupExpression, outputProperty, requestedProperties, childrenInputProperties);
recordPropertyAndCost(groupExpression, outputProperty, requestedProperties, requestChildrenProperty);
}
}
}
private void putProperty(GroupExpression groupExpression,
private void recordPropertyAndCost(GroupExpression groupExpression,
PhysicalProperties outputProperty,
PhysicalProperties requiredProperty,
PhysicalProperties requestProperty,
List<PhysicalProperties> inputProperties) {
if (groupExpression.updateLowestCostTable(requiredProperty, inputProperties, curTotalCost)) {
// Each group expression need to record the outputProperty satisfy what requiredProperty,
// because group expression can generate multi outputProperty. eg. Join may have shuffle local
// and shuffle join two types outputProperty.
groupExpression.putOutputPropertiesMap(outputProperty, requiredProperty);
if (groupExpression.updateLowestCostTable(requestProperty, inputProperties, curTotalCost)) {
// Each group expression need to save { outputProperty --> requestProperty }
groupExpression.putOutputPropertiesMap(outputProperty, requestProperty);
}
this.groupExpression.getOwnerGroup().setBestPlan(groupExpression,
curTotalCost, requiredProperty);
curTotalCost, requestProperty);
}

View File

@ -65,8 +65,7 @@ public class DeriveStatsJob extends Job {
}
}
} else {
StatsCalculator statsCalculator = new StatsCalculator(groupExpression);
statsCalculator.estimate();
StatsCalculator.estimate(groupExpression);
}
}
}

View File

@ -47,7 +47,7 @@ public class Group {
// Map of cost lower bounds
// Map required plan props to cost lower bound of corresponding plan
private Map<PhysicalProperties, Pair<Double, GroupExpression>> lowestCostPlans = Maps.newHashMap();
private final Map<PhysicalProperties, Pair<Double, GroupExpression>> lowestCostPlans = Maps.newHashMap();
private double costLowerBound = -1;
private boolean isExplored = false;
private boolean hasCost = false;
@ -157,13 +157,21 @@ public class Group {
}
}
public GroupExpression getBestExpression(PhysicalProperties properties) {
public GroupExpression getBestPlan(PhysicalProperties properties) {
if (lowestCostPlans.containsKey(properties)) {
return lowestCostPlans.get(properties).second;
}
return null;
}
public void replaceBestPlan(PhysicalProperties oldProperty, PhysicalProperties newProperty, double cost) {
Pair<Double, GroupExpression> pair = lowestCostPlans.get(oldProperty);
GroupExpression lowestGroupExpr = pair.second;
lowestGroupExpr.updateLowestCostTable(newProperty, lowestGroupExpr.getInputPropertiesList(oldProperty), cost);
lowestCostPlans.remove(oldProperty);
lowestCostPlans.put(newProperty, pair);
}
public StatsDeriveResult getStatistics() {
return statistics;
}

View File

@ -148,12 +148,24 @@ public class GroupExpression {
if (lowestCostTable.get(outputProperties).first > cost) {
lowestCostTable.put(outputProperties, new Pair<>(cost, childrenInputProperties));
return true;
} else {
return false;
}
} else {
lowestCostTable.put(outputProperties, new Pair<>(cost, childrenInputProperties));
return true;
}
return false;
}
/**
* get the lowest cost when satisfy property
*
* @param property property that needs to be satisfied
* @return Lowest cost to satisfy that property
*/
public double getCost(PhysicalProperties property) {
Preconditions.checkState(lowestCostTable.containsKey(property));
return lowestCostTable.get(property).first;
}
public void putOutputPropertiesMap(PhysicalProperties outputPropertySet,

View File

@ -39,28 +39,24 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
* │
* childOutputProperty
*/
PhysicalProperties requestProperty;
List<PhysicalProperties> childrenOutputProperties;
private PhysicalProperties requestProperty;
private List<PhysicalProperties> childrenOutputProperties;
private double curTotalCost;
public ChildOutputPropertyDeriver(PhysicalProperties requestProperty,
List<PhysicalProperties> childrenOutputProperties) {
List<PhysicalProperties> childrenOutputProperties, double curTotalCost) {
this.childrenOutputProperties = childrenOutputProperties;
this.requestProperty = requestProperty;
this.curTotalCost = curTotalCost;
}
public static PhysicalProperties getProperties(
PhysicalProperties requirements,
List<PhysicalProperties> childrenOutputProperties,
GroupExpression groupExpression) {
ChildOutputPropertyDeriver childOutputPropertyDeriver = new ChildOutputPropertyDeriver(requirements,
childrenOutputProperties);
return groupExpression.getPlan().accept(childOutputPropertyDeriver, new PlanContext(groupExpression));
public PhysicalProperties getOutputProperties(GroupExpression groupExpression) {
return groupExpression.getPlan().accept(this, new PlanContext(groupExpression));
}
public PhysicalProperties getRequestProperty() {
return requestProperty;
public double getCurTotalCost() {
return curTotalCost;
}
@Override

View File

@ -18,11 +18,11 @@
package org.apache.doris.nereids.properties;
import org.apache.doris.common.Pair;
import org.apache.doris.nereids.cost.CostCalculator;
import org.apache.doris.nereids.jobs.JobContext;
import org.apache.doris.nereids.memo.GroupExpression;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
@ -42,49 +42,56 @@ public class EnforceMissingPropertiesHelper {
this.curTotalCost = curTotalCost;
}
public double getCurTotalCost() {
return curTotalCost;
}
/**
* Enforce missing property.
*/
public Pair<PhysicalProperties, Double> enforceProperty(PhysicalProperties output, PhysicalProperties required) {
boolean isMeetOrder = output.getOrderSpec().satisfy(required.getOrderSpec());
boolean isMeetDistribution = output.getDistributionSpec().satisfy(required.getDistributionSpec());
public PhysicalProperties enforceProperty(PhysicalProperties output, PhysicalProperties request) {
boolean isSatistyOrder = output.getOrderSpec().satisfy(request.getOrderSpec());
boolean isSatistyDistribution = output.getDistributionSpec().satisfy(request.getDistributionSpec());
if (!isMeetDistribution && !isMeetOrder) {
// Both Distribution and Order don't satisfy.
return new Pair<>(enforceSortAndDistribution(output, required), curTotalCost);
} else if (isMeetDistribution && isMeetOrder) {
// Both satisfy.
// TODO: can't reach here.
return new Pair<>(null, curTotalCost);
} else if (!isMeetDistribution) {
// Distribution don't satisfy.
if (required.getOrderSpec().getOrderKeys().isEmpty()) {
return new Pair<>(enforceDistribution(output), curTotalCost);
} else {
// TODO
// It's wrong that SortSpec is empty.
// After redistribute data , original order requirement may be wrong. Need enforce "SortNode" here.
// PhysicalProperties newProperty =
// new PhysicalProperties(new DistributionSpec(), new OrderSpec(Lists.newArrayList()));
// groupExpression.getParent().
// return enforceSortAndDistribution(newProperty, required);
return new Pair<>(enforceDistribution(output), curTotalCost);
if (!isSatistyDistribution && !isSatistyOrder) {
return enforceSortAndDistribution(output, request);
} else if (isSatistyDistribution && isSatistyOrder) {
Preconditions.checkState(false, "can't reach here.");
return null;
} else if (!isSatistyDistribution) {
if (!request.getOrderSpec().getOrderKeys().isEmpty()) {
// After redistribute data , original order request may be wrong.
return enforceDistributionButMeetSort(output, request);
}
return enforceDistribution(output);
} else {
// Order don't satisfy.
return new Pair<>(enforceSort(output), curTotalCost);
return enforceSort(output);
}
}
/**
* When requestProperty include sort, enforce distribution may break the original sort.
* <p>
* But if we additonal enforce sort, it may cause infinite loop.
* <p>
* hackly, use {[empty order], Any} to eliminate the original property.
*/
private PhysicalProperties enforceDistributionButMeetSort(PhysicalProperties output, PhysicalProperties request) {
groupExpression.getOwnerGroup()
.replaceBestPlan(output, PhysicalProperties.ANY, groupExpression.getCost(output));
return enforceSortAndDistribution(output, request);
}
private PhysicalProperties enforceSort(PhysicalProperties oldOutputProperty) {
// clone
// keep consistent in DistributionSpec with the oldOutputProperty
PhysicalProperties newOutputProperty = new PhysicalProperties(
oldOutputProperty.getDistributionSpec(),
context.getRequiredProperties().getOrderSpec());
GroupExpression enforcer =
context.getRequiredProperties().getOrderSpec().addEnforcer(groupExpression.getOwnerGroup());
updateCostWithEnforcer(enforcer, oldOutputProperty, newOutputProperty);
addEnforcerUpdateCost(enforcer, oldOutputProperty, newOutputProperty);
return newOutputProperty;
}
@ -96,12 +103,29 @@ public class EnforceMissingPropertiesHelper {
GroupExpression enforcer =
context.getRequiredProperties().getDistributionSpec().addEnforcer(groupExpression.getOwnerGroup());
updateCostWithEnforcer(enforcer, oldOutputProperty, newOutputProperty);
addEnforcerUpdateCost(enforcer, oldOutputProperty, newOutputProperty);
return newOutputProperty;
}
private void updateCostWithEnforcer(GroupExpression enforcer,
private PhysicalProperties enforceSortAndDistribution(PhysicalProperties outputProperty,
PhysicalProperties requiredProperty) {
PhysicalProperties enforcedProperty;
if (requiredProperty.getDistributionSpec().equals(new DistributionSpecGather())) {
enforcedProperty = enforceSort(outputProperty);
enforcedProperty = enforceDistribution(enforcedProperty);
} else {
enforcedProperty = enforceDistribution(outputProperty);
enforcedProperty = enforceSort(enforcedProperty);
}
return enforcedProperty;
}
/**
* Add enforcer plan, update cost, update property of enforcer, and setBestPlan
*/
private void addEnforcerUpdateCost(GroupExpression enforcer,
PhysicalProperties oldOutputProperty,
PhysicalProperties newOutputProperty) {
context.getPlannerContext().getMemo().addEnforcerPlan(enforcer, groupExpression.getOwnerGroup());
@ -112,19 +136,4 @@ public class EnforceMissingPropertiesHelper {
}
groupExpression.getOwnerGroup().setBestPlan(enforcer, curTotalCost, newOutputProperty);
}
private PhysicalProperties enforceSortAndDistribution(PhysicalProperties outputProperty,
PhysicalProperties requiredProperty) {
PhysicalProperties enforcedProperty;
if (requiredProperty.getDistributionSpec()
.equals(new DistributionSpecGather())) {
enforcedProperty = enforceSort(outputProperty);
enforcedProperty = enforceDistribution(enforcedProperty);
} else {
enforcedProperty = enforceDistribution(outputProperty);
enforcedProperty = enforceSort(enforcedProperty);
}
return enforcedProperty;
}
}

View File

@ -66,25 +66,28 @@ import java.util.Set;
import java.util.stream.Collectors;
/**
* Used to calculate the stats for each operator
* Used to calculate the stats for each plan
*/
public class StatsCalculator extends DefaultPlanVisitor<StatsDeriveResult, Void> {
private final GroupExpression groupExpression;
public StatsCalculator(GroupExpression groupExpression) {
private StatsCalculator(GroupExpression groupExpression) {
this.groupExpression = groupExpression;
}
/**
* Do estimate.
* estimate stats
*/
public void estimate() {
public static void estimate(GroupExpression groupExpression) {
StatsCalculator statsCalculator = new StatsCalculator(groupExpression);
statsCalculator.estimate();
}
private void estimate() {
StatsDeriveResult stats = groupExpression.getPlan().accept(this, null);
groupExpression.getOwnerGroup().setStatistics(stats);
groupExpression.setStatDerived(true);
}
@Override

View File

@ -107,8 +107,7 @@ public class StatsCalculatorTest {
GroupExpression groupExpression = new GroupExpression(logicalAggregate, Arrays.asList(childGroup));
Group ownerGroup = new Group();
groupExpression.setOwnerGroup(ownerGroup);
StatsCalculator statsCalculator = new StatsCalculator(groupExpression);
statsCalculator.estimate();
StatsCalculator.estimate(groupExpression);
Assertions.assertEquals(groupExpression.getOwnerGroup().getStatistics().getRowCount(), 10);
}
@ -151,8 +150,7 @@ public class StatsCalculatorTest {
groupExpression.addChild(childGroup);
Group ownerGroup = new Group();
groupExpression.setOwnerGroup(ownerGroup);
StatsCalculator statsCalculator = new StatsCalculator(groupExpression);
statsCalculator.estimate();
StatsCalculator.estimate(groupExpression);
Assertions.assertEquals((long) (10000 * 0.1 * 0.05), ownerGroup.getStatistics().getRowCount(), 0.001);
LogicalFilter logicalFilterOr = new LogicalFilter(or, groupPlan);
@ -160,8 +158,7 @@ public class StatsCalculatorTest {
groupExpressionOr.addChild(childGroup);
Group ownerGroupOr = new Group();
groupExpressionOr.setOwnerGroup(ownerGroupOr);
StatsCalculator statsCalculator2 = new StatsCalculator(groupExpressionOr);
statsCalculator2.estimate();
StatsCalculator.estimate(groupExpressionOr);
Assertions.assertEquals((long) (10000 * (0.1 + 0.05 - 0.1 * 0.05)),
ownerGroupOr.getStatistics().getRowCount(), 0.001);
}
@ -232,8 +229,7 @@ public class StatsCalculatorTest {
GroupExpression groupExpression = new GroupExpression(logicalOlapScan1, ImmutableList.of(childGroup));
Group ownerGroup = new Group();
groupExpression.setOwnerGroup(ownerGroup);
StatsCalculator statsCalculator = new StatsCalculator(groupExpression);
statsCalculator.estimate();
StatsCalculator.estimate(groupExpression);
StatsDeriveResult stats = ownerGroup.getStatistics();
Assertions.assertEquals(1, stats.getSlotToColumnStats().size());
Assertions.assertNotNull(stats.getSlotToColumnStats().get(slot1));
@ -262,8 +258,7 @@ public class StatsCalculatorTest {
groupExpression.addChild(childGroup);
Group ownerGroup = new Group();
ownerGroup.addGroupExpression(groupExpression);
StatsCalculator statsCalculator = new StatsCalculator(groupExpression);
statsCalculator.estimate();
StatsCalculator.estimate(groupExpression);
StatsDeriveResult limitStats = ownerGroup.getStatistics();
Assertions.assertEquals(1, limitStats.getRowCount());
ColumnStats slot1Stats = limitStats.getSlotToColumnStats().get(slot1);
@ -294,8 +289,7 @@ public class StatsCalculatorTest {
groupExpression.addChild(childGroup);
Group ownerGroup = new Group();
ownerGroup.addGroupExpression(groupExpression);
StatsCalculator statsCalculator = new StatsCalculator(groupExpression);
statsCalculator.estimate();
StatsCalculator.estimate(groupExpression);
StatsDeriveResult topNStats = ownerGroup.getStatistics();
Assertions.assertEquals(1, topNStats.getRowCount());
ColumnStats slot1Stats = topNStats.getSlotToColumnStats().get(slot1);