diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index f03a5e2c94..677bde529e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -23,7 +23,6 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.InsertStmt; import org.apache.doris.analysis.JoinOperator; import org.apache.doris.analysis.QueryStmt; -import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotRef; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.ColocateTableIndex; @@ -49,6 +48,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import avro.shaded.com.google.common.collect.Maps; @@ -329,7 +329,7 @@ public class DistributedPlanner { // bucket shuffle join is better than broadcast and shuffle join // it can reduce the network cost of join, so doris chose it first List rhsPartitionxprs = Lists.newArrayList(); - if (canBucketShuffleJoin(node, leftChildFragment, rightChildFragment, rhsPartitionxprs)) { + if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionxprs)) { node.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE); DataPartition rhsJoinPartition = new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, rhsPartitionxprs); @@ -633,7 +633,7 @@ public class DistributedPlanner { return false; } - private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment, PlanFragment rightChildFragment, + private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment, List rhsHashExprs) { if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) { return false; @@ -649,14 +649,10 @@ public class DistributedPlanner { return canBucketShuffleJoin(node, leftRoot, rhsHashExprs); } - // 2.leftRoot be hashjoin node and not shuffle join + // 2.leftRoot be hashjoin node if (leftRoot instanceof HashJoinNode) { while (leftRoot instanceof HashJoinNode) { - if (!((HashJoinNode)leftRoot).isShuffleJoin()) { - leftRoot = leftRoot.getChild(0); - } else { - return false; - } + leftRoot = leftRoot.getChild(0); } if (leftRoot instanceof OlapScanNode) { return canBucketShuffleJoin(node, leftRoot, rhsHashExprs); @@ -683,9 +679,12 @@ public class DistributedPlanner { DistributionInfo leftDistribution = leftScanNode.getOlapTable().getDefaultDistributionInfo(); if (leftDistribution instanceof HashDistributionInfo) { + // use the table_name + '-' + column_name as check condition List leftDistributeColumns = ((HashDistributionInfo) leftDistribution).getDistributionColumns(); + List leftDistributeColumnNames = leftDistributeColumns.stream(). + map(col -> leftTable.getName() + "." + col.getName()).collect(Collectors.toList()); - List leftJoinColumns = new ArrayList<>(); + List leftJoinColumnNames = new ArrayList<>(); List rightExprs = new ArrayList<>(); List eqJoinConjuncts = node.getEqJoinConjuncts(); @@ -696,21 +695,32 @@ public class DistributedPlanner { continue; } - SlotDescriptor leftSlot = lhsJoinExpr.unwrapSlotRef().getDesc(); - - leftJoinColumns.add(leftSlot.getColumn()); - rightExprs.add(rhsJoinExpr); + SlotRef leftSlot = lhsJoinExpr.unwrapSlotRef(); + if (leftSlot.getTable() instanceof OlapTable) { + // table name in SlotRef is not the really name. `select * from test as t` + // table name in SlotRef is `t`, but here we need is `test`. + leftJoinColumnNames.add(leftSlot.getTable().getName() + "." + leftSlot.getColumnName()); + rightExprs.add(rhsJoinExpr); + } } //2 the join columns should contains all left table distribute columns to enable bucket shuffle join - for (Column distributeColumn : leftDistributeColumns) { - int loc = leftJoinColumns.indexOf(distributeColumn); - // TODO: now support bucket shuffle join when distribute column type different with - // right expr type - if (loc == -1 || !rightExprs.get(loc).getType().equals(distributeColumn.getType())) { - return false; + for (int i = 0; i < leftDistributeColumnNames.size(); i++) { + String distributeColumnName = leftDistributeColumnNames.get(i); + boolean findRhsExprs = false; + // check the join column name is same as distribute column name and + // check the rhs join expr type is same as distribute column + for (int j = 0; j < leftJoinColumnNames.size(); j++) { + if (leftJoinColumnNames.get(j).equals(distributeColumnName)) { + if (rightExprs.get(j).getType().equals(leftDistributeColumns.get(i).getType())) { + rhsJoinExprs.add(rightExprs.get(j)); + findRhsExprs = true; + break; + } + } } - rhsJoinExprs.add(rightExprs.get(loc)); + + if (!findRhsExprs) return false; } } else { return false; diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 53fede1e79..4aac01242c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -1121,6 +1121,21 @@ public class QueryPlanTest { explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t3`.`k1`, `t3`.`k2`")); + + // support recurse of bucket shuffle because t4 join t2 and join column name is same as t2 distribute column name + queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3 " + + "on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t2.k1 and t4.k1 = t2.k2"; + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); + Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`")); + + // some column name in join expr t3 join t4 and t1 distribute column name, so should not be bucket shuffle join + queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3 " + + "on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t3.k1 and t4.k2 = t3.k2"; + explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); + Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`")); + Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`")); + // disable bucket shuffle join again Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false); }