[enhancement](Nereids) avoiding broadcast join heuristically and pruning more in CostAndEnforceJob (#25137)

When the rowCount exceeds a certain threshold, refrain from generating a broadcast join.
Only enforce the best expression in CostAndEnforce Job, rather than enforcing every expression.
Remove lower bound group pruning
This commit is contained in:
谢健
2023-10-10 13:38:10 +08:00
committed by GitHub
parent 181c58c691
commit 7276665f1e
5 changed files with 55 additions and 15 deletions

View File

@ -207,15 +207,6 @@ class CostModelV1 extends PlanVisitor<Cost, PlanContext> {
// replicate
if (spec instanceof DistributionSpecReplicated) {
double dataSize = childStatistics.computeSize();
double memLimit = ConnectContext.get().getSessionVariable().getMaxExecMemByte();
//if build side is big, avoid use broadcast join
double rowsLimit = ConnectContext.get().getSessionVariable().getBroadcastRowCountLimit();
double brMemlimit = ConnectContext.get().getSessionVariable().getBroadcastHashtableMemLimitPercentage();
if (dataSize > memLimit * brMemlimit
|| childStatistics.getRowCount() > rowsLimit) {
return CostV1.of(Double.MAX_VALUE, Double.MAX_VALUE, Double.MAX_VALUE);
}
// estimate broadcast cost by an experience formula: beNumber^0.5 * rowCount
// - sender number and receiver number is not available at RBO stage now, so we use beNumber
// - senders and receivers work in parallel, that why we use square of beNumber

View File

@ -188,9 +188,17 @@ public class CostAndEnforcerJob extends Job implements Cloneable {
curNodeCost,
lowestCostExpr.getCostValueByProperties(requestChildProperty),
curChildIndex);
if (curTotalCost.getValue() > context.getCostUpperBound()) {
curTotalCost = Cost.infinite();
}
// Not performing lower bound group pruning here is to avoid redundant optimization of children.
// For example:
// Group1 : betterExpr, currentExpr(child: Group2), otherExpr(child: Group)
// steps
// 1. CostAndEnforce(currentExpr) with upperBound betterExpr.cost
// 2. OptimzeGroup(Group2) with upperBound bestExpr.cost - currentExpr.nodeCost
// 3. CostAndEnforce(Expr in Group2) trigger here and exit
// ...
// n. CostAndEnforce(otherExpr) can trigger optimize group2 again for the same requireProp
// the request child properties will be covered by the output properties
// that corresponding to the request properties. so if we run a costAndEnforceJob of the same
// group expression, that request child properties will be different of this.
@ -275,6 +283,23 @@ public class CostAndEnforcerJob extends Job implements Cloneable {
}
return;
}
if (context.getRequiredProperties().isDistributionOnlyProperties()) {
// For properties without an orderSpec, enforceMissingPropertiesHelper always adds a distributor
// above this group expression. The cost of the distributor is equal to the cost of the groupExpression
// plus the cost of the distributor. The distributor remains unchanged for different groupExpressions.
// Therefore, if there is a better groupExpr, it is preferable to enforce the better groupExpr.
// Consequently, we can avoid this enforcement.
Optional<Pair<Cost, GroupExpression>> bestExpr = groupExpression.getOwnerGroup()
.getLowestCostPlan(context.getRequiredProperties());
double bestCost = bestExpr
.map(costGroupExpressionPair -> costGroupExpressionPair.first.getValue())
.orElse(Double.POSITIVE_INFINITY);
if (curTotalCost.getValue() > bestCost) {
return;
}
}
EnforceMissingPropertiesHelper enforceMissingPropertiesHelper
= new EnforceMissingPropertiesHelper(context, groupExpression, curTotalCost);
PhysicalProperties addEnforcedProperty = enforceMissingPropertiesHelper

View File

@ -106,6 +106,10 @@ public class PhysicalProperties {
return distributionSpec;
}
public boolean isDistributionOnlyProperties() {
return orderSpec.getOrderKeys().isEmpty();
}
@Override
public boolean equals(Object o) {
if (this == o) {

View File

@ -165,8 +165,14 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> {
if (JoinUtils.couldShuffle(hashJoin)) {
addShuffleJoinRequestProperty(hashJoin);
}
// for broadcast join
if (JoinUtils.couldBroadcast(hashJoin)) {
double memLimit = ConnectContext.get().getSessionVariable().getMaxExecMemByte();
double rowsLimit = ConnectContext.get().getSessionVariable().getBroadcastRowCountLimit();
double brMemlimit = ConnectContext.get().getSessionVariable().getBroadcastHashtableMemLimitPercentage();
double datasize = hashJoin.getGroupExpression().get().child(1).getStatistics().computeSize();
double rowCount = hashJoin.getGroupExpression().get().child(1).getStatistics().getRowCount();
if (JoinUtils.couldBroadcast(hashJoin) && rowCount <= rowsLimit && datasize <= memLimit * brMemlimit) {
addBroadcastJoinRequestProperty();
}
return null;