From c993663827df871949c959bd99210c3be5cbf8a3 Mon Sep 17 00:00:00 2001 From: xzj7019 <131111794+xzj7019@users.noreply.github.com> Date: Wed, 19 Jul 2023 09:43:31 +0800 Subject: [PATCH] [fix](nereids) fix cte as bc right side hang bug (#21897) During original computeMultiCastFragmentParams process, we don't handle the scenario the cte as the broadcast right side, which will lead the missing setting of the buildHashTableForBroadcastJoin flag true and finally the sql hang. --- .../java/org/apache/doris/qe/Coordinator.java | 37 ++++++++++++++++--- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index a81bd1984b..5ccbbcaa1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -1418,12 +1418,37 @@ public class Coordinator { params.instanceExecParams.size() + destParams.perExchNumSenders.get(exchId.asInt())); } - for (int j = 0; j < destParams.instanceExecParams.size(); ++j) { - TPlanFragmentDestination dest = new TPlanFragmentDestination(); - dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId; - dest.server = toRpcHost(destParams.instanceExecParams.get(j).host); - dest.brpc_server = toBrpcHost(destParams.instanceExecParams.get(j).host); - multiSink.getDestinations().get(i).add(dest); + List destinations = multiSink.getDestinations().get(i); + if (enablePipelineEngine && enableShareHashTableForBroadcastJoin + && params.fragment.isRightChildOfBroadcastHashJoin()) { + // here choose the first instance to build hash table. + Map destHosts = new HashMap<>(); + + destParams.instanceExecParams.forEach(param -> { + if (destHosts.containsKey(param.host)) { + destHosts.get(param.host).instancesSharingHashTable.add(param.instanceId); + } else { + destHosts.put(param.host, param); + param.buildHashTableForBroadcastJoin = true; + TPlanFragmentDestination dest = new TPlanFragmentDestination(); + dest.fragment_instance_id = param.instanceId; + try { + dest.server = toRpcHost(param.host); + dest.setBrpcServer(toBrpcHost(param.host)); + } catch (Exception e) { + throw new RuntimeException(e); + } + destinations.add(dest); + } + }); + } else { + for (int j = 0; j < destParams.instanceExecParams.size(); ++j) { + TPlanFragmentDestination dest = new TPlanFragmentDestination(); + dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId; + dest.server = toRpcHost(destParams.instanceExecParams.get(j).host); + dest.brpc_server = toBrpcHost(destParams.instanceExecParams.get(j).host); + destinations.add(dest); + } } } }