diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index a81bd1984b..5ccbbcaa1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1418,12 +1418,37 @@ public class Coordinator { params.instanceExecParams.size() + destParams.perExchNumSenders.get(exchId.asInt())); } - for (int j = 0; j < destParams.instanceExecParams.size(); ++j) { - TPlanFragmentDestination dest = new TPlanFragmentDestination(); - dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId; - dest.server = toRpcHost(destParams.instanceExecParams.get(j).host); - dest.brpc_server = toBrpcHost(destParams.instanceExecParams.get(j).host); - multiSink.getDestinations().get(i).add(dest); + List destinations = multiSink.getDestinations().get(i); + if (enablePipelineEngine && enableShareHashTableForBroadcastJoin + && params.fragment.isRightChildOfBroadcastHashJoin()) { + // here choose the first instance to build hash table. + Map destHosts = new HashMap<>(); + + destParams.instanceExecParams.forEach(param -> { + if (destHosts.containsKey(param.host)) { + destHosts.get(param.host).instancesSharingHashTable.add(param.instanceId); + } else { + destHosts.put(param.host, param); + param.buildHashTableForBroadcastJoin = true; + TPlanFragmentDestination dest = new TPlanFragmentDestination(); + dest.fragment_instance_id = param.instanceId; + try { + dest.server = toRpcHost(param.host); + dest.setBrpcServer(toBrpcHost(param.host)); + } catch (Exception e) { + throw new RuntimeException(e); + } + destinations.add(dest); + } + }); + } else { + for (int j = 0; j < destParams.instanceExecParams.size(); ++j) { + TPlanFragmentDestination dest = new TPlanFragmentDestination(); + dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId; + dest.server = toRpcHost(destParams.instanceExecParams.get(j).host); + dest.brpc_server = toBrpcHost(destParams.instanceExecParams.get(j).host); + destinations.add(dest); + } } } }