diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 39c94320c7..f4a03f9c07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1432,7 +1432,45 @@ public class Coordinator { } List destinations = multiSink.getDestinations().get(i); - if (enablePipelineEngine && enableShareHashTableForBroadcastJoin + if (sink.getOutputPartition() != null + && sink.getOutputPartition().isBucketShuffleHashPartition()) { + // the destFragment must be bucket shuffle + Preconditions.checkState(bucketShuffleJoinController + .isBucketShuffleJoin(destFragment.getFragmentId().asInt()), "Sink is" + + "Bucket Shuffle Partition, The destFragment must have bucket shuffle join node "); + + int bucketSeq = 0; + int bucketNum = bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId()); + + // when left table is empty, it's bucketset is empty. + // set right table destination address to the address of left table + if (destParams.instanceExecParams.size() == 1 && (bucketNum == 0 + || destParams.instanceExecParams.get(0).bucketSeqSet.isEmpty())) { + bucketNum = 1; + destParams.instanceExecParams.get(0).bucketSeqSet.add(0); + } + // process bucket shuffle join on fragment without scan node + TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0); + while (bucketSeq < bucketNum) { + TPlanFragmentDestination dest = new TPlanFragmentDestination(); + + dest.fragment_instance_id = new TUniqueId(-1, -1); + dest.server = dummyServer; + dest.setBrpcServer(dummyServer); + + for (FInstanceExecParam instanceExecParams : destParams.instanceExecParams) { + if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) { + dest.fragment_instance_id = instanceExecParams.instanceId; + dest.server = toRpcHost(instanceExecParams.host); + dest.setBrpcServer(toBrpcHost(instanceExecParams.host)); + break; + } + } + + bucketSeq++; + destinations.add(dest); + } + } else if (enablePipelineEngine && enableShareHashTableForBroadcastJoin && ((ExchangeNode) exchNode).isRightChildOfBroadcastHashJoin()) { // here choose the first instance to build hash table. Map destHosts = new HashMap<>();