[opt](Nereids) bucket shuffle downgrade expansion (#34088)

Expand bucket shuffle downgrade condition, which originally requiring a single partition after pruning, basic table and bucket number < para number. Currently, we expect this option can be used for disabling bucket shuffle more efficiently, without above restrictions.

Co-authored-by: zhongjian.xzj <zhongjian.xzj@zhongjianxzjdeMacBook-Pro.local>
This commit is contained in:
xzj7019
2024-04-26 18:27:51 +08:00
committed by yiguolei
parent 5e9eb417ad
commit c125148deb
25 changed files with 2173 additions and 35 deletions

View File

@ -30,7 +30,6 @@ import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction;
import org.apache.doris.nereids.trees.plans.AggMode;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
@ -40,7 +39,6 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
@ -209,38 +207,12 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
return true;
}
private boolean isBucketShuffleDownGrade(Plan oneSidePlan, DistributionSpecHash otherSideSpec) {
// improper to do bucket shuffle join:
// oneSide:
// 1. base table
// 2. single partition after pruning
// 3. tablets' number is small enough (< paraInstanceNum)
// otherSide: ShuffleType.EXECUTION_BUCKETED
private boolean isBucketShuffleDownGrade(DistributionSpecHash srcSideSpec) {
boolean isBucketShuffleDownGrade = ConnectContext.get().getSessionVariable().isEnableBucketShuffleDownGrade();
if (!isBucketShuffleDownGrade) {
return false;
} else if (otherSideSpec.getShuffleType() != ShuffleType.EXECUTION_BUCKETED) {
return false;
} else {
int paraNum = Math.max(1, ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
if (((GroupPlan) oneSidePlan).getGroup().getPhysicalExpressions().isEmpty()) {
return false;
} else {
Plan plan = ((GroupPlan) oneSidePlan).getGroup().getPhysicalExpressions().get(0).getPlan();
while ((plan instanceof PhysicalProject || plan instanceof PhysicalFilter)
&& !((GroupPlan) plan.child(0)).getGroup().getPhysicalExpressions().isEmpty()) {
plan = ((GroupPlan) plan.child(0)).getGroup().getPhysicalExpressions().get(0).getPlan();
}
if (plan != null && plan instanceof PhysicalOlapScan
&& ((PhysicalOlapScan) plan).getSelectedPartitionIds().size() <= 1
&& ((PhysicalOlapScan) plan).getTable() != null
&& ((PhysicalOlapScan) plan).getTable().getDefaultDistributionInfo() != null
&& ((PhysicalOlapScan) plan).getTable().getDefaultDistributionInfo().getBucketNum() < paraNum) {
return true;
} else {
return false;
}
}
return srcSideSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED;
}
}
@ -262,9 +234,6 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
DistributionSpec leftDistributionSpec = childrenProperties.get(0).getDistributionSpec();
DistributionSpec rightDistributionSpec = childrenProperties.get(1).getDistributionSpec();
Plan leftChild = hashJoin.child(0);
Plan rightChild = hashJoin.child(1);
// broadcast do not need regular
if (rightDistributionSpec instanceof DistributionSpecReplicated) {
return true;
@ -296,7 +265,7 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if (isBucketShuffleDownGrade(leftChild, rightHashSpec)) {
} else if (isBucketShuffleDownGrade(rightHashSpec)) {
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
@ -305,7 +274,7 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if (isBucketShuffleDownGrade(rightChild, leftHashSpec)) {
} else if (isBucketShuffleDownGrade(leftHashSpec)) {
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, rightHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),