From ce3ae764e5a2f61e4fcfd809ed8cfd1f59bf9de7 Mon Sep 17 00:00:00 2001 From: HappenLee Date: Thu, 27 May 2021 09:05:15 -0500 Subject: [PATCH] [Bug] Bucket Shuffle Join may cause:Failed to send brpc batch, Not connected to 0.0.0.0:0 (#5901) --- .../main/java/org/apache/doris/planner/DataPartition.java | 4 ++++ .../src/main/java/org/apache/doris/qe/Coordinator.java | 7 +++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java index ec474562a7..a8ee29a244 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java @@ -95,6 +95,10 @@ public class DataPartition { return type != TPartitionType.UNPARTITIONED; } + public boolean isBucketShuffleHashPartition() { + return type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED; + } + public TPartitionType getType() { return type; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index c891a0378c..1f8c38fd34 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -828,7 +828,11 @@ public class Coordinator { params.instanceExecParams.size() + destParams.perExchNumSenders.get(exchId.asInt())); } - if (bucketShuffleJoinController.isBucketShuffleJoin(destFragment.getFragmentId().asInt())) { + if (sink.getOutputPartition().isBucketShuffleHashPartition()) { + // the destFragment must be bucket shuffle + Preconditions.checkState(bucketShuffleJoinController. + isBucketShuffleJoin(destFragment.getFragmentId().asInt())); + int bucketSeq = 0; int bucketNum = bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId()); TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0); @@ -1544,7 +1548,6 @@ public class Coordinator { return true; } - // One fragment could only have one HashJoinNode if (node instanceof HashJoinNode) { HashJoinNode joinNode = (HashJoinNode) node; if (joinNode.isBucketShuffle()) {