[nereids] downgrade bucket shuffle if tablet num < instance num (#31222)

This commit is contained in:
xzj7019
2024-02-22 17:20:19 +08:00
committed by yiguolei
parent 8e24b42d7a
commit 241a2fc25c
2 changed files with 67 additions and 0 deletions

View File

@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction;
import org.apache.doris.nereids.trees.plans.AggMode;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
@ -39,6 +40,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
@ -189,6 +191,41 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
return true;
}
private boolean isBucketShuffleDownGrade(Plan oneSidePlan, DistributionSpecHash otherSideSpec) {
// improper to do bucket shuffle join:
// oneSide:
// 1. base table
// 2. single partition after pruning
// 3. tablets' number is small enough (< paraInstanceNum)
// otherSide: ShuffleType.EXECUTION_BUCKETED
boolean isBucketShuffleDownGrade = ConnectContext.get().getSessionVariable().isEnableBucketShuffleDownGrade();
if (!isBucketShuffleDownGrade) {
return false;
} else if (otherSideSpec.getShuffleType() != ShuffleType.EXECUTION_BUCKETED) {
return false;
} else {
int paraNum = Math.max(1, ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
if (((GroupPlan) oneSidePlan).getGroup().getPhysicalExpressions().isEmpty()) {
return false;
} else {
Plan plan = ((GroupPlan) oneSidePlan).getGroup().getPhysicalExpressions().get(0).getPlan();
while ((plan instanceof PhysicalProject || plan instanceof PhysicalFilter)
&& !((GroupPlan) plan.child(0)).getGroup().getPhysicalExpressions().isEmpty()) {
plan = ((GroupPlan) plan.child(0)).getGroup().getPhysicalExpressions().get(0).getPlan();
}
if (plan != null && plan instanceof PhysicalOlapScan
&& ((PhysicalOlapScan) plan).getSelectedPartitionIds().size() <= 1
&& ((PhysicalOlapScan) plan).getTable() != null
&& ((PhysicalOlapScan) plan).getTable().getDefaultDistributionInfo() != null
&& ((PhysicalOlapScan) plan).getTable().getDefaultDistributionInfo().getBucketNum() < paraNum) {
return true;
} else {
return false;
}
}
}
}
private boolean couldNotRightBucketShuffleJoin(JoinType joinType) {
return joinType == JoinType.RIGHT_ANTI_JOIN
|| joinType == JoinType.RIGHT_OUTER_JOIN
@ -207,6 +244,9 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
DistributionSpec leftDistributionSpec = childrenProperties.get(0).getDistributionSpec();
DistributionSpec rightDistributionSpec = childrenProperties.get(1).getDistributionSpec();
Plan leftChild = hashJoin.child(0);
Plan rightChild = hashJoin.child(1);
// broadcast do not need regular
if (rightDistributionSpec instanceof DistributionSpecReplicated) {
return true;
@ -238,6 +278,24 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if (isBucketShuffleDownGrade(leftChild, rightHashSpec)) {
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if (isBucketShuffleDownGrade(rightChild, leftHashSpec)) {
updatedForLeft = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, rightHashSpec, leftHashSpec,
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
updatedForRight = Optional.of(calAnotherSideRequired(
ShuffleType.EXECUTION_BUCKETED, rightHashSpec, rightHashSpec,
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
} else if ((leftHashSpec.getShuffleType() == ShuffleType.NATURAL
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL)) {
updatedForRight = Optional.of(calAnotherSideRequired(

View File

@ -242,6 +242,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_AGG_STATE = "enable_agg_state";
public static final String ENABLE_BUCKET_SHUFFLE_DOWNGRADE = "enable_bucket_shuffle_downgrade";
public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline";
public static final String ENABLE_SINGLE_DISTINCT_COLUMN_OPT = "enable_single_distinct_column_opt";
@ -733,6 +735,9 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_JOIN, varType = VariableAnnotation.EXPERIMENTAL_ONLINE)
public boolean enableBucketShuffleJoin = true;
@VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_DOWNGRADE, needForward = true)
public boolean enableBucketShuffleDownGrade = false;
@VariableMgr.VarAttr(name = PREFER_JOIN_METHOD)
public String preferJoinMethod = "broadcast";
@ -2096,6 +2101,10 @@ public class SessionVariable implements Serializable, Writable {
return enableBucketShuffleJoin;
}
public boolean isEnableBucketShuffleDownGrade() {
return enableBucketShuffleDownGrade;
}
public boolean isEnableOdbcTransaction() {
return enableOdbcTransaction;
}