From 241a2fc25c3c8f22624b58eda51b2ad4d2c28680 Mon Sep 17 00:00:00 2001 From: xzj7019 <131111794+xzj7019@users.noreply.github.com> Date: Thu, 22 Feb 2024 17:20:19 +0800 Subject: [PATCH] [nereids] downgrade bucket shuffle if tablet num < instance num (#31222) --- .../ChildrenPropertiesRegulator.java | 58 +++++++++++++++++++ .../org/apache/doris/qe/SessionVariable.java | 9 +++ 2 files changed, 67 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index 3dfa2615af..a70d72c565 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction; import org.apache.doris.nereids.trees.plans.AggMode; +import org.apache.doris.nereids.trees.plans.GroupPlan; import org.apache.doris.nereids.trees.plans.JoinType; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.SortPhase; @@ -39,6 +40,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate; import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation; @@ -189,6 +191,41 @@ public class ChildrenPropertiesRegulator extends PlanVisitor { return true; } + private boolean isBucketShuffleDownGrade(Plan oneSidePlan, DistributionSpecHash otherSideSpec) { + // improper to do bucket shuffle join: + // oneSide: + // 1. base table + // 2. single partition after pruning + // 3. tablets' number is small enough (< paraInstanceNum) + // otherSide: ShuffleType.EXECUTION_BUCKETED + boolean isBucketShuffleDownGrade = ConnectContext.get().getSessionVariable().isEnableBucketShuffleDownGrade(); + if (!isBucketShuffleDownGrade) { + return false; + } else if (otherSideSpec.getShuffleType() != ShuffleType.EXECUTION_BUCKETED) { + return false; + } else { + int paraNum = Math.max(1, ConnectContext.get().getSessionVariable().getParallelExecInstanceNum()); + if (((GroupPlan) oneSidePlan).getGroup().getPhysicalExpressions().isEmpty()) { + return false; + } else { + Plan plan = ((GroupPlan) oneSidePlan).getGroup().getPhysicalExpressions().get(0).getPlan(); + while ((plan instanceof PhysicalProject || plan instanceof PhysicalFilter) + && !((GroupPlan) plan.child(0)).getGroup().getPhysicalExpressions().isEmpty()) { + plan = ((GroupPlan) plan.child(0)).getGroup().getPhysicalExpressions().get(0).getPlan(); + } + if (plan != null && plan instanceof PhysicalOlapScan + && ((PhysicalOlapScan) plan).getSelectedPartitionIds().size() <= 1 + && ((PhysicalOlapScan) plan).getTable() != null + && ((PhysicalOlapScan) plan).getTable().getDefaultDistributionInfo() != null + && ((PhysicalOlapScan) plan).getTable().getDefaultDistributionInfo().getBucketNum() < paraNum) { + return true; + } else { + return false; + } + } + } + } + private boolean couldNotRightBucketShuffleJoin(JoinType joinType) { return joinType == JoinType.RIGHT_ANTI_JOIN || joinType == JoinType.RIGHT_OUTER_JOIN @@ -207,6 +244,9 @@ public class ChildrenPropertiesRegulator extends PlanVisitor { DistributionSpec leftDistributionSpec = childrenProperties.get(0).getDistributionSpec(); DistributionSpec rightDistributionSpec = childrenProperties.get(1).getDistributionSpec(); + Plan leftChild = hashJoin.child(0); + Plan rightChild = hashJoin.child(1); + // broadcast do not need regular if (rightDistributionSpec instanceof DistributionSpecReplicated) { return true; @@ -238,6 +278,24 @@ public class ChildrenPropertiesRegulator extends PlanVisitor { ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec, (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec())); + } else if (isBucketShuffleDownGrade(leftChild, rightHashSpec)) { + updatedForLeft = Optional.of(calAnotherSideRequired( + ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec, + (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), + (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec())); + updatedForRight = Optional.of(calAnotherSideRequired( + ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec, + (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(), + (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec())); + } else if (isBucketShuffleDownGrade(rightChild, leftHashSpec)) { + updatedForLeft = Optional.of(calAnotherSideRequired( + ShuffleType.EXECUTION_BUCKETED, rightHashSpec, leftHashSpec, + (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(), + (DistributionSpecHash) requiredProperties.get(0).getDistributionSpec())); + updatedForRight = Optional.of(calAnotherSideRequired( + ShuffleType.EXECUTION_BUCKETED, rightHashSpec, rightHashSpec, + (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(), + (DistributionSpecHash) requiredProperties.get(1).getDistributionSpec())); } else if ((leftHashSpec.getShuffleType() == ShuffleType.NATURAL && rightHashSpec.getShuffleType() == ShuffleType.NATURAL)) { updatedForRight = Optional.of(calAnotherSideRequired( diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 4b4d108650..0124efc429 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -242,6 +242,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_AGG_STATE = "enable_agg_state"; + public static final String ENABLE_BUCKET_SHUFFLE_DOWNGRADE = "enable_bucket_shuffle_downgrade"; + public static final String ENABLE_RPC_OPT_FOR_PIPELINE = "enable_rpc_opt_for_pipeline"; public static final String ENABLE_SINGLE_DISTINCT_COLUMN_OPT = "enable_single_distinct_column_opt"; @@ -733,6 +735,9 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_JOIN, varType = VariableAnnotation.EXPERIMENTAL_ONLINE) public boolean enableBucketShuffleJoin = true; + @VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_DOWNGRADE, needForward = true) + public boolean enableBucketShuffleDownGrade = false; + @VariableMgr.VarAttr(name = PREFER_JOIN_METHOD) public String preferJoinMethod = "broadcast"; @@ -2096,6 +2101,10 @@ public class SessionVariable implements Serializable, Writable { return enableBucketShuffleJoin; } + public boolean isEnableBucketShuffleDownGrade() { + return enableBucketShuffleDownGrade; + } + public boolean isEnableOdbcTransaction() { return enableOdbcTransaction; }