[opt](nereids) recover adoptive bucket shuffle (#39598)

## Proposed changes

pick from https://github.com/apache/doris/pull/36784

Co-authored-by: xiongzhongjian <xiongzhongjian@selectdb.com>
This commit is contained in:
xzj7019
2024-08-21 09:26:53 +08:00
committed by GitHub
parent 6df6f1dc97
commit 8a562aeb77
39 changed files with 112 additions and 83 deletions

View File

@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction;
import org.apache.doris.nereids.trees.plans.AggMode;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
@ -39,6 +40,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
@ -201,12 +203,53 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
return true;
}
private boolean isBucketShuffleDownGrade(DistributionSpecHash srcSideSpec) {
boolean isBucketShuffleDownGrade = ConnectContext.get().getSessionVariable().isEnableBucketShuffleDownGrade();
if (!isBucketShuffleDownGrade) {
private boolean isBucketShuffleDownGrade(Plan oneSidePlan, DistributionSpecHash otherSideSpec) {
// improper to do bucket shuffle join:
// oneSide:
// - base table and tablets' number is small enough (< paraInstanceNum)
// otherSide:
// - ShuffleType.EXECUTION_BUCKETED
boolean isEnableBucketShuffleJoin = ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin();
if (!isEnableBucketShuffleJoin) {
return true;
} else if (otherSideSpec.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
|| !(oneSidePlan instanceof GroupPlan)) {
return false;
} else {
return srcSideSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED;
PhysicalOlapScan candidate = findDownGradeBucketShuffleCandidate((GroupPlan) oneSidePlan);
if (candidate == null || candidate.getTable() == null
|| candidate.getTable().getDefaultDistributionInfo() == null) {
return false;
} else {
int prunedPartNum = candidate.getSelectedPartitionIds().size();
int bucketNum = candidate.getTable().getDefaultDistributionInfo().getBucketNum();
int totalBucketNum = prunedPartNum * bucketNum;
int backEndNum = Math.max(1, ConnectContext.get().getEnv().getClusterInfo()
.getBackendsNumber(true));
int paraNum = Math.max(1, ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
int totalParaNum = Math.min(10, backEndNum * paraNum);
return totalBucketNum < totalParaNum;
}
}
}
private PhysicalOlapScan findDownGradeBucketShuffleCandidate(GroupPlan groupPlan) {
if (groupPlan == null || groupPlan.getGroup() == null
|| groupPlan.getGroup().getPhysicalExpressions().isEmpty()) {
return null;
} else {
Plan targetPlan = groupPlan.getGroup().getPhysicalExpressions().get(0).getPlan();
while (targetPlan != null
&& (targetPlan instanceof PhysicalProject || targetPlan instanceof PhysicalFilter)
&& !((GroupPlan) targetPlan.child(0)).getGroup().getPhysicalExpressions().isEmpty()) {
targetPlan = ((GroupPlan) targetPlan.child(0)).getGroup()
.getPhysicalExpressions().get(0).getPlan();
}
if (targetPlan == null || !(targetPlan instanceof PhysicalOlapScan)) {
return null;
} else {
return (PhysicalOlapScan) targetPlan;
}
}
}
@ -243,6 +286,9 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
throw new RuntimeException("should not come here, two children of shuffle join should all be shuffle");
}
Plan leftChild = hashJoin.child(0);
Plan rightChild = hashJoin.child(1);
DistributionSpecHash leftHashSpec = (DistributionSpecHash) leftDistributionSpec;
DistributionSpecHash rightHashSpec = (DistributionSpecHash) rightDistributionSpec;
@ -263,7 +309,7 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if (isBucketShuffleDownGrade(rightHashSpec)) {
} else if (isBucketShuffleDownGrade(leftChild, rightHashSpec)) {
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
@ -272,7 +318,7 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if (isBucketShuffleDownGrade(leftHashSpec)) {
} else if (isBucketShuffleDownGrade(rightChild, leftHashSpec)) {
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, rightHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),

View File

@ -260,8 +260,6 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_AGG_STATE = "enable_agg_state";
public static final String ENABLE_BUCKET_SHUFFLE_DOWNGRADE = "enable_bucket_shuffle_downgrade";
public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline";
public static final String ENABLE_SINGLE_DISTINCT_COLUMN_OPT = "enable_single_distinct_column_opt";
@ -850,9 +848,6 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_JOIN, varType = VariableAnnotation.EXPERIMENTAL_ONLINE)
public boolean enableBucketShuffleJoin = true;
@VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_DOWNGRADE, needForward = true)
public boolean enableBucketShuffleDownGrade = false;
/**
* explode function row count enlarge factor.
*/
@ -2552,10 +2547,6 @@ public class SessionVariable implements Serializable, Writable {
return enableBucketShuffleJoin;
}
public boolean isEnableBucketShuffleDownGrade() {
return enableBucketShuffleDownGrade;
}
public boolean isEnableOdbcTransaction() {
return enableOdbcTransaction;
}