diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java index ec474562a7..a8ee29a244 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java @@ -95,6 +95,10 @@ public class DataPartition { return type != TPartitionType.UNPARTITIONED; } + public boolean isBucketShuffleHashPartition() { + return type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED; + } + public TPartitionType getType() { return type; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index c891a0378c..1f8c38fd34 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -828,7 +828,11 @@ public class Coordinator { params.instanceExecParams.size() + destParams.perExchNumSenders.get(exchId.asInt())); } - if (bucketShuffleJoinController.isBucketShuffleJoin(destFragment.getFragmentId().asInt())) { + if (sink.getOutputPartition().isBucketShuffleHashPartition()) { + // the destFragment must be bucket shuffle + Preconditions.checkState(bucketShuffleJoinController. + isBucketShuffleJoin(destFragment.getFragmentId().asInt())); + int bucketSeq = 0; int bucketNum = bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId()); TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0); @@ -1544,7 +1548,6 @@ public class Coordinator { return true; } - // One fragment could only have one HashJoinNode if (node instanceof HashJoinNode) { HashJoinNode joinNode = (HashJoinNode) node; if (joinNode.isBucketShuffle()) {