[Bug] Fix the bug of bucket shuffle join cause error plan (#6172)
This commit is contained in:
@ -23,7 +23,6 @@ import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.InsertStmt;
|
||||
import org.apache.doris.analysis.JoinOperator;
|
||||
import org.apache.doris.analysis.QueryStmt;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.ColocateTableIndex;
|
||||
@ -49,6 +48,7 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import avro.shaded.com.google.common.collect.Maps;
|
||||
|
||||
@ -329,7 +329,7 @@ public class DistributedPlanner {
|
||||
// bucket shuffle join is better than broadcast and shuffle join
|
||||
// it can reduce the network cost of join, so doris chose it first
|
||||
List<Expr> rhsPartitionxprs = Lists.newArrayList();
|
||||
if (canBucketShuffleJoin(node, leftChildFragment, rightChildFragment, rhsPartitionxprs)) {
|
||||
if (canBucketShuffleJoin(node, leftChildFragment, rhsPartitionxprs)) {
|
||||
node.setDistributionMode(HashJoinNode.DistributionMode.BUCKET_SHUFFLE);
|
||||
DataPartition rhsJoinPartition =
|
||||
new DataPartition(TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED, rhsPartitionxprs);
|
||||
@ -633,7 +633,7 @@ public class DistributedPlanner {
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment, PlanFragment rightChildFragment,
|
||||
private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment leftChildFragment,
|
||||
List<Expr> rhsHashExprs) {
|
||||
if (!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
|
||||
return false;
|
||||
@ -649,14 +649,10 @@ public class DistributedPlanner {
|
||||
return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
|
||||
}
|
||||
|
||||
// 2.leftRoot be hashjoin node and not shuffle join
|
||||
// 2.leftRoot be hashjoin node
|
||||
if (leftRoot instanceof HashJoinNode) {
|
||||
while (leftRoot instanceof HashJoinNode) {
|
||||
if (!((HashJoinNode)leftRoot).isShuffleJoin()) {
|
||||
leftRoot = leftRoot.getChild(0);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
leftRoot = leftRoot.getChild(0);
|
||||
}
|
||||
if (leftRoot instanceof OlapScanNode) {
|
||||
return canBucketShuffleJoin(node, leftRoot, rhsHashExprs);
|
||||
@ -683,9 +679,12 @@ public class DistributedPlanner {
|
||||
DistributionInfo leftDistribution = leftScanNode.getOlapTable().getDefaultDistributionInfo();
|
||||
|
||||
if (leftDistribution instanceof HashDistributionInfo) {
|
||||
// use the table_name + '-' + column_name as check condition
|
||||
List<Column> leftDistributeColumns = ((HashDistributionInfo) leftDistribution).getDistributionColumns();
|
||||
List<String> leftDistributeColumnNames = leftDistributeColumns.stream().
|
||||
map(col -> leftTable.getName() + "." + col.getName()).collect(Collectors.toList());
|
||||
|
||||
List<Column> leftJoinColumns = new ArrayList<>();
|
||||
List<String> leftJoinColumnNames = new ArrayList<>();
|
||||
List<Expr> rightExprs = new ArrayList<>();
|
||||
List<BinaryPredicate> eqJoinConjuncts = node.getEqJoinConjuncts();
|
||||
|
||||
@ -696,21 +695,32 @@ public class DistributedPlanner {
|
||||
continue;
|
||||
}
|
||||
|
||||
SlotDescriptor leftSlot = lhsJoinExpr.unwrapSlotRef().getDesc();
|
||||
|
||||
leftJoinColumns.add(leftSlot.getColumn());
|
||||
rightExprs.add(rhsJoinExpr);
|
||||
SlotRef leftSlot = lhsJoinExpr.unwrapSlotRef();
|
||||
if (leftSlot.getTable() instanceof OlapTable) {
|
||||
// table name in SlotRef is not the really name. `select * from test as t`
|
||||
// table name in SlotRef is `t`, but here we need is `test`.
|
||||
leftJoinColumnNames.add(leftSlot.getTable().getName() + "." + leftSlot.getColumnName());
|
||||
rightExprs.add(rhsJoinExpr);
|
||||
}
|
||||
}
|
||||
|
||||
//2 the join columns should contains all left table distribute columns to enable bucket shuffle join
|
||||
for (Column distributeColumn : leftDistributeColumns) {
|
||||
int loc = leftJoinColumns.indexOf(distributeColumn);
|
||||
// TODO: now support bucket shuffle join when distribute column type different with
|
||||
// right expr type
|
||||
if (loc == -1 || !rightExprs.get(loc).getType().equals(distributeColumn.getType())) {
|
||||
return false;
|
||||
for (int i = 0; i < leftDistributeColumnNames.size(); i++) {
|
||||
String distributeColumnName = leftDistributeColumnNames.get(i);
|
||||
boolean findRhsExprs = false;
|
||||
// check the join column name is same as distribute column name and
|
||||
// check the rhs join expr type is same as distribute column
|
||||
for (int j = 0; j < leftJoinColumnNames.size(); j++) {
|
||||
if (leftJoinColumnNames.get(j).equals(distributeColumnName)) {
|
||||
if (rightExprs.get(j).getType().equals(leftDistributeColumns.get(i).getType())) {
|
||||
rhsJoinExprs.add(rightExprs.get(j));
|
||||
findRhsExprs = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
rhsJoinExprs.add(rightExprs.get(loc));
|
||||
|
||||
if (!findRhsExprs) return false;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
|
||||
@ -1121,6 +1121,21 @@ public class QueryPlanTest {
|
||||
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
|
||||
Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
|
||||
Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t3`.`k1`, `t3`.`k2`"));
|
||||
|
||||
// support recurse of bucket shuffle because t4 join t2 and join column name is same as t2 distribute column name
|
||||
queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3 " +
|
||||
"on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t2.k1 and t4.k1 = t2.k2";
|
||||
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
|
||||
Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
|
||||
Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`"));
|
||||
|
||||
// some column name in join expr t3 join t4 and t1 distribute column name, so should not be bucket shuffle join
|
||||
queryStr = "explain select * from test.jointest t1 join test.bucket_shuffle1 t2 on t1.k1 = t2.k1 and t1.k1 = t2.k2 join test.colocate1 t3 " +
|
||||
"on t2.k1 = t3.k1 join test.jointest t4 on t4.k1 = t3.k1 and t4.k2 = t3.k2";
|
||||
explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
|
||||
Assert.assertTrue(explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t1`.`k1`, `t1`.`k1`"));
|
||||
Assert.assertTrue(!explainString.contains("BUCKET_SHFFULE_HASH_PARTITIONED: `t4`.`k1`, `t4`.`k1`"));
|
||||
|
||||
// disable bucket shuffle join again
|
||||
Deencapsulation.setField(connectContext.getSessionVariable(), "enableBucketShuffleJoin", false);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user