From 1a8a889d56d3a6a8ed41dfc03879466555af3609 Mon Sep 17 00:00:00 2001 From: jakevin Date: Fri, 19 Aug 2022 17:55:43 +0800 Subject: [PATCH] [refactor](planner): improve enfocer job. (#11922) - handle enforce distribution when meet sort. - calculate stats in enforcer job - refactor calculate stats. --- .../jobs/cascades/CostAndEnforcerJob.java | 51 +++++----- .../nereids/jobs/cascades/DeriveStatsJob.java | 3 +- .../org/apache/doris/nereids/memo/Group.java | 12 ++- .../doris/nereids/memo/GroupExpression.java | 14 ++- .../ChildOutputPropertyDeriver.java | 24 ++--- .../EnforceMissingPropertiesHelper.java | 97 ++++++++++--------- .../doris/nereids/stats/StatsCalculator.java | 13 ++- .../nereids/stats/StatsCalculatorTest.java | 18 ++-- 8 files changed, 123 insertions(+), 109 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java index 5cd0096a09..061f13c25c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java @@ -18,7 +18,6 @@ package org.apache.doris.nereids.jobs.cascades; import org.apache.doris.common.Pair; -import org.apache.doris.nereids.PlanContext; import org.apache.doris.nereids.cost.CostCalculator; import org.apache.doris.nereids.jobs.Job; import org.apache.doris.nereids.jobs.JobContext; @@ -29,6 +28,7 @@ import org.apache.doris.nereids.properties.ChildOutputPropertyDeriver; import org.apache.doris.nereids.properties.EnforceMissingPropertiesHelper; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.properties.RequestPropertyDeriver; +import org.apache.doris.nereids.stats.StatsCalculator; import com.google.common.collect.Lists; @@ -162,22 +162,24 @@ public class CostAndEnforcerJob extends Job implements Cloneable { // Not need to do pruning here because it has been done when we get the // best expr from the child group - // TODO: it could update the cost. - PhysicalProperties outputProperty = ChildOutputPropertyDeriver.getProperties( - context.getRequiredProperties(), - childrenOutputProperty, groupExpression); + ChildOutputPropertyDeriver childOutputPropertyDeriver = new ChildOutputPropertyDeriver( + context.getRequiredProperties(), childrenOutputProperty, curTotalCost); + PhysicalProperties outputProperty = childOutputPropertyDeriver.getOutputProperties(groupExpression); + curTotalCost = childOutputPropertyDeriver.getCurTotalCost(); if (curTotalCost > context.getCostUpperBound()) { break; } - /* update current group statistics and re-compute costs. */ + // update current group statistics and re-compute costs. if (groupExpression.children().stream().anyMatch(group -> group.getStatistics() == null)) { return; } - PlanContext planContext = new PlanContext(groupExpression); - // TODO: calculate stats. ?????? - groupExpression.getOwnerGroup().setStatistics(planContext.getStatisticsWithCheck()); + StatsCalculator.estimate(groupExpression); + + // record map { outputProperty -> outputProperty }, { ANY -> outputProperty }, + recordPropertyAndCost(groupExpression, outputProperty, outputProperty, requestChildrenProperty); + recordPropertyAndCost(groupExpression, outputProperty, PhysicalProperties.ANY, requestChildrenProperty); enforce(outputProperty, requestChildrenProperty); @@ -194,47 +196,38 @@ public class CostAndEnforcerJob extends Job implements Cloneable { } } - private void enforce(PhysicalProperties outputProperty, List childrenInputProperties) { - - // groupExpression can satisfy its own output property - putProperty(groupExpression, outputProperty, outputProperty, childrenInputProperties); - // groupExpression can satisfy the ANY type output property - putProperty(groupExpression, outputProperty, PhysicalProperties.ANY, childrenInputProperties); - + private void enforce(PhysicalProperties outputProperty, List requestChildrenProperty) { EnforceMissingPropertiesHelper enforceMissingPropertiesHelper = new EnforceMissingPropertiesHelper(context, groupExpression, curTotalCost); PhysicalProperties requestedProperties = context.getRequiredProperties(); if (!outputProperty.satisfy(requestedProperties)) { - Pair pair = enforceMissingPropertiesHelper.enforceProperty(outputProperty, + PhysicalProperties addEnforcedProperty = enforceMissingPropertiesHelper.enforceProperty(outputProperty, requestedProperties); - PhysicalProperties addEnforcedProperty = pair.first; - curTotalCost = pair.second; + curTotalCost = enforceMissingPropertiesHelper.getCurTotalCost(); // enforcedProperty is superset of requiredProperty if (!addEnforcedProperty.equals(requestedProperties)) { - putProperty(groupExpression.getOwnerGroup().getBestExpression(addEnforcedProperty), + recordPropertyAndCost(groupExpression.getOwnerGroup().getBestPlan(addEnforcedProperty), requestedProperties, requestedProperties, Lists.newArrayList(outputProperty)); } } else { if (!outputProperty.equals(requestedProperties)) { - putProperty(groupExpression, outputProperty, requestedProperties, childrenInputProperties); + recordPropertyAndCost(groupExpression, outputProperty, requestedProperties, requestChildrenProperty); } } } - private void putProperty(GroupExpression groupExpression, + private void recordPropertyAndCost(GroupExpression groupExpression, PhysicalProperties outputProperty, - PhysicalProperties requiredProperty, + PhysicalProperties requestProperty, List inputProperties) { - if (groupExpression.updateLowestCostTable(requiredProperty, inputProperties, curTotalCost)) { - // Each group expression need to record the outputProperty satisfy what requiredProperty, - // because group expression can generate multi outputProperty. eg. Join may have shuffle local - // and shuffle join two types outputProperty. - groupExpression.putOutputPropertiesMap(outputProperty, requiredProperty); + if (groupExpression.updateLowestCostTable(requestProperty, inputProperties, curTotalCost)) { + // Each group expression need to save { outputProperty --> requestProperty } + groupExpression.putOutputPropertiesMap(outputProperty, requestProperty); } this.groupExpression.getOwnerGroup().setBestPlan(groupExpression, - curTotalCost, requiredProperty); + curTotalCost, requestProperty); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/DeriveStatsJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/DeriveStatsJob.java index 389c54c347..7cd3a3e9c6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/DeriveStatsJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/DeriveStatsJob.java @@ -65,8 +65,7 @@ public class DeriveStatsJob extends Job { } } } else { - StatsCalculator statsCalculator = new StatsCalculator(groupExpression); - statsCalculator.estimate(); + StatsCalculator.estimate(groupExpression); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java index 8e093f0506..93d23cc7ec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/Group.java @@ -47,7 +47,7 @@ public class Group { // Map of cost lower bounds // Map required plan props to cost lower bound of corresponding plan - private Map> lowestCostPlans = Maps.newHashMap(); + private final Map> lowestCostPlans = Maps.newHashMap(); private double costLowerBound = -1; private boolean isExplored = false; private boolean hasCost = false; @@ -157,13 +157,21 @@ public class Group { } } - public GroupExpression getBestExpression(PhysicalProperties properties) { + public GroupExpression getBestPlan(PhysicalProperties properties) { if (lowestCostPlans.containsKey(properties)) { return lowestCostPlans.get(properties).second; } return null; } + public void replaceBestPlan(PhysicalProperties oldProperty, PhysicalProperties newProperty, double cost) { + Pair pair = lowestCostPlans.get(oldProperty); + GroupExpression lowestGroupExpr = pair.second; + lowestGroupExpr.updateLowestCostTable(newProperty, lowestGroupExpr.getInputPropertiesList(oldProperty), cost); + lowestCostPlans.remove(oldProperty); + lowestCostPlans.put(newProperty, pair); + } + public StatsDeriveResult getStatistics() { return statistics; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java index 587051d618..dc9e343b0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/memo/GroupExpression.java @@ -148,12 +148,24 @@ public class GroupExpression { if (lowestCostTable.get(outputProperties).first > cost) { lowestCostTable.put(outputProperties, new Pair<>(cost, childrenInputProperties)); return true; + } else { + return false; } } else { lowestCostTable.put(outputProperties, new Pair<>(cost, childrenInputProperties)); return true; } - return false; + } + + /** + * get the lowest cost when satisfy property + * + * @param property property that needs to be satisfied + * @return Lowest cost to satisfy that property + */ + public double getCost(PhysicalProperties property) { + Preconditions.checkState(lowestCostTable.containsKey(property)); + return lowestCostTable.get(property).first; } public void putOutputPropertiesMap(PhysicalProperties outputPropertySet, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index 1902411c27..72029e54f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -39,28 +39,24 @@ public class ChildOutputPropertyDeriver extends PlanVisitor childrenOutputProperties; + private PhysicalProperties requestProperty; + private List childrenOutputProperties; + private double curTotalCost; + public ChildOutputPropertyDeriver(PhysicalProperties requestProperty, - List childrenOutputProperties) { + List childrenOutputProperties, double curTotalCost) { this.childrenOutputProperties = childrenOutputProperties; this.requestProperty = requestProperty; + this.curTotalCost = curTotalCost; } - public static PhysicalProperties getProperties( - PhysicalProperties requirements, - List childrenOutputProperties, - GroupExpression groupExpression) { - - ChildOutputPropertyDeriver childOutputPropertyDeriver = new ChildOutputPropertyDeriver(requirements, - childrenOutputProperties); - - return groupExpression.getPlan().accept(childOutputPropertyDeriver, new PlanContext(groupExpression)); + public PhysicalProperties getOutputProperties(GroupExpression groupExpression) { + return groupExpression.getPlan().accept(this, new PlanContext(groupExpression)); } - public PhysicalProperties getRequestProperty() { - return requestProperty; + public double getCurTotalCost() { + return curTotalCost; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java index aa7a94161e..7f946055b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/EnforceMissingPropertiesHelper.java @@ -18,11 +18,11 @@ package org.apache.doris.nereids.properties; -import org.apache.doris.common.Pair; import org.apache.doris.nereids.cost.CostCalculator; import org.apache.doris.nereids.jobs.JobContext; import org.apache.doris.nereids.memo.GroupExpression; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; /** @@ -42,49 +42,56 @@ public class EnforceMissingPropertiesHelper { this.curTotalCost = curTotalCost; } + public double getCurTotalCost() { + return curTotalCost; + } + /** * Enforce missing property. */ - public Pair enforceProperty(PhysicalProperties output, PhysicalProperties required) { - boolean isMeetOrder = output.getOrderSpec().satisfy(required.getOrderSpec()); - boolean isMeetDistribution = output.getDistributionSpec().satisfy(required.getDistributionSpec()); + public PhysicalProperties enforceProperty(PhysicalProperties output, PhysicalProperties request) { + boolean isSatistyOrder = output.getOrderSpec().satisfy(request.getOrderSpec()); + boolean isSatistyDistribution = output.getDistributionSpec().satisfy(request.getDistributionSpec()); - if (!isMeetDistribution && !isMeetOrder) { - // Both Distribution and Order don't satisfy. - return new Pair<>(enforceSortAndDistribution(output, required), curTotalCost); - } else if (isMeetDistribution && isMeetOrder) { - // Both satisfy. - // TODO: can't reach here. - return new Pair<>(null, curTotalCost); - } else if (!isMeetDistribution) { - // Distribution don't satisfy. - if (required.getOrderSpec().getOrderKeys().isEmpty()) { - return new Pair<>(enforceDistribution(output), curTotalCost); - } else { - // TODO - // It's wrong that SortSpec is empty. - // After redistribute data , original order requirement may be wrong. Need enforce "SortNode" here. - // PhysicalProperties newProperty = - // new PhysicalProperties(new DistributionSpec(), new OrderSpec(Lists.newArrayList())); - // groupExpression.getParent(). - // return enforceSortAndDistribution(newProperty, required); - return new Pair<>(enforceDistribution(output), curTotalCost); + if (!isSatistyDistribution && !isSatistyOrder) { + return enforceSortAndDistribution(output, request); + } else if (isSatistyDistribution && isSatistyOrder) { + Preconditions.checkState(false, "can't reach here."); + return null; + } else if (!isSatistyDistribution) { + if (!request.getOrderSpec().getOrderKeys().isEmpty()) { + // After redistribute data , original order request may be wrong. + return enforceDistributionButMeetSort(output, request); } + return enforceDistribution(output); } else { // Order don't satisfy. - return new Pair<>(enforceSort(output), curTotalCost); + return enforceSort(output); } } + /** + * When requestProperty include sort, enforce distribution may break the original sort. + *

+ * But if we additonal enforce sort, it may cause infinite loop. + *

+ * hackly, use {[empty order], Any} to eliminate the original property. + */ + private PhysicalProperties enforceDistributionButMeetSort(PhysicalProperties output, PhysicalProperties request) { + groupExpression.getOwnerGroup() + .replaceBestPlan(output, PhysicalProperties.ANY, groupExpression.getCost(output)); + return enforceSortAndDistribution(output, request); + } + private PhysicalProperties enforceSort(PhysicalProperties oldOutputProperty) { - // clone + // keep consistent in DistributionSpec with the oldOutputProperty PhysicalProperties newOutputProperty = new PhysicalProperties( oldOutputProperty.getDistributionSpec(), context.getRequiredProperties().getOrderSpec()); GroupExpression enforcer = context.getRequiredProperties().getOrderSpec().addEnforcer(groupExpression.getOwnerGroup()); - updateCostWithEnforcer(enforcer, oldOutputProperty, newOutputProperty); + addEnforcerUpdateCost(enforcer, oldOutputProperty, newOutputProperty); return newOutputProperty; } @@ -96,12 +103,29 @@ public class EnforceMissingPropertiesHelper { GroupExpression enforcer = context.getRequiredProperties().getDistributionSpec().addEnforcer(groupExpression.getOwnerGroup()); - updateCostWithEnforcer(enforcer, oldOutputProperty, newOutputProperty); + addEnforcerUpdateCost(enforcer, oldOutputProperty, newOutputProperty); return newOutputProperty; } - private void updateCostWithEnforcer(GroupExpression enforcer, + private PhysicalProperties enforceSortAndDistribution(PhysicalProperties outputProperty, + PhysicalProperties requiredProperty) { + PhysicalProperties enforcedProperty; + if (requiredProperty.getDistributionSpec().equals(new DistributionSpecGather())) { + enforcedProperty = enforceSort(outputProperty); + enforcedProperty = enforceDistribution(enforcedProperty); + } else { + enforcedProperty = enforceDistribution(outputProperty); + enforcedProperty = enforceSort(enforcedProperty); + } + + return enforcedProperty; + } + + /** + * Add enforcer plan, update cost, update property of enforcer, and setBestPlan + */ + private void addEnforcerUpdateCost(GroupExpression enforcer, PhysicalProperties oldOutputProperty, PhysicalProperties newOutputProperty) { context.getPlannerContext().getMemo().addEnforcerPlan(enforcer, groupExpression.getOwnerGroup()); @@ -112,19 +136,4 @@ public class EnforceMissingPropertiesHelper { } groupExpression.getOwnerGroup().setBestPlan(enforcer, curTotalCost, newOutputProperty); } - - private PhysicalProperties enforceSortAndDistribution(PhysicalProperties outputProperty, - PhysicalProperties requiredProperty) { - PhysicalProperties enforcedProperty; - if (requiredProperty.getDistributionSpec() - .equals(new DistributionSpecGather())) { - enforcedProperty = enforceSort(outputProperty); - enforcedProperty = enforceDistribution(enforcedProperty); - } else { - enforcedProperty = enforceDistribution(outputProperty); - enforcedProperty = enforceSort(enforcedProperty); - } - - return enforcedProperty; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java index a5fee61259..62fb2d0e3c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java @@ -66,25 +66,28 @@ import java.util.Set; import java.util.stream.Collectors; /** - * Used to calculate the stats for each operator + * Used to calculate the stats for each plan */ public class StatsCalculator extends DefaultPlanVisitor { private final GroupExpression groupExpression; - public StatsCalculator(GroupExpression groupExpression) { + private StatsCalculator(GroupExpression groupExpression) { this.groupExpression = groupExpression; } /** - * Do estimate. + * estimate stats */ - public void estimate() { + public static void estimate(GroupExpression groupExpression) { + StatsCalculator statsCalculator = new StatsCalculator(groupExpression); + statsCalculator.estimate(); + } + private void estimate() { StatsDeriveResult stats = groupExpression.getPlan().accept(this, null); groupExpression.getOwnerGroup().setStatistics(stats); groupExpression.setStatDerived(true); - } @Override diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java index 628161c971..290cbd7d32 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java @@ -107,8 +107,7 @@ public class StatsCalculatorTest { GroupExpression groupExpression = new GroupExpression(logicalAggregate, Arrays.asList(childGroup)); Group ownerGroup = new Group(); groupExpression.setOwnerGroup(ownerGroup); - StatsCalculator statsCalculator = new StatsCalculator(groupExpression); - statsCalculator.estimate(); + StatsCalculator.estimate(groupExpression); Assertions.assertEquals(groupExpression.getOwnerGroup().getStatistics().getRowCount(), 10); } @@ -151,8 +150,7 @@ public class StatsCalculatorTest { groupExpression.addChild(childGroup); Group ownerGroup = new Group(); groupExpression.setOwnerGroup(ownerGroup); - StatsCalculator statsCalculator = new StatsCalculator(groupExpression); - statsCalculator.estimate(); + StatsCalculator.estimate(groupExpression); Assertions.assertEquals((long) (10000 * 0.1 * 0.05), ownerGroup.getStatistics().getRowCount(), 0.001); LogicalFilter logicalFilterOr = new LogicalFilter(or, groupPlan); @@ -160,8 +158,7 @@ public class StatsCalculatorTest { groupExpressionOr.addChild(childGroup); Group ownerGroupOr = new Group(); groupExpressionOr.setOwnerGroup(ownerGroupOr); - StatsCalculator statsCalculator2 = new StatsCalculator(groupExpressionOr); - statsCalculator2.estimate(); + StatsCalculator.estimate(groupExpressionOr); Assertions.assertEquals((long) (10000 * (0.1 + 0.05 - 0.1 * 0.05)), ownerGroupOr.getStatistics().getRowCount(), 0.001); } @@ -232,8 +229,7 @@ public class StatsCalculatorTest { GroupExpression groupExpression = new GroupExpression(logicalOlapScan1, ImmutableList.of(childGroup)); Group ownerGroup = new Group(); groupExpression.setOwnerGroup(ownerGroup); - StatsCalculator statsCalculator = new StatsCalculator(groupExpression); - statsCalculator.estimate(); + StatsCalculator.estimate(groupExpression); StatsDeriveResult stats = ownerGroup.getStatistics(); Assertions.assertEquals(1, stats.getSlotToColumnStats().size()); Assertions.assertNotNull(stats.getSlotToColumnStats().get(slot1)); @@ -262,8 +258,7 @@ public class StatsCalculatorTest { groupExpression.addChild(childGroup); Group ownerGroup = new Group(); ownerGroup.addGroupExpression(groupExpression); - StatsCalculator statsCalculator = new StatsCalculator(groupExpression); - statsCalculator.estimate(); + StatsCalculator.estimate(groupExpression); StatsDeriveResult limitStats = ownerGroup.getStatistics(); Assertions.assertEquals(1, limitStats.getRowCount()); ColumnStats slot1Stats = limitStats.getSlotToColumnStats().get(slot1); @@ -294,8 +289,7 @@ public class StatsCalculatorTest { groupExpression.addChild(childGroup); Group ownerGroup = new Group(); ownerGroup.addGroupExpression(groupExpression); - StatsCalculator statsCalculator = new StatsCalculator(groupExpression); - statsCalculator.estimate(); + StatsCalculator.estimate(groupExpression); StatsDeriveResult topNStats = ownerGroup.getStatistics(); Assertions.assertEquals(1, topNStats.getRowCount()); ColumnStats slot1Stats = topNStats.getSlotToColumnStats().get(slot1);