[fix](nereids) fix cte as bc right side hang bug (#21897)

During original computeMultiCastFragmentParams process, we don't handle the scenario the cte as the broadcast right side, which will lead the missing setting of the buildHashTableForBroadcastJoin flag true and finally the sql hang.
This commit is contained in:
xzj7019
2023-07-19 09:43:31 +08:00
committed by GitHub
parent 5b043a980e
commit c993663827

View File

@ -1418,12 +1418,37 @@ public class Coordinator {
params.instanceExecParams.size() + destParams.perExchNumSenders.get(exchId.asInt()));
}
for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
dest.brpc_server = toBrpcHost(destParams.instanceExecParams.get(j).host);
multiSink.getDestinations().get(i).add(dest);
List<TPlanFragmentDestination> destinations = multiSink.getDestinations().get(i);
if (enablePipelineEngine && enableShareHashTableForBroadcastJoin
&& params.fragment.isRightChildOfBroadcastHashJoin()) {
// here choose the first instance to build hash table.
Map<TNetworkAddress, FInstanceExecParam> destHosts = new HashMap<>();
destParams.instanceExecParams.forEach(param -> {
if (destHosts.containsKey(param.host)) {
destHosts.get(param.host).instancesSharingHashTable.add(param.instanceId);
} else {
destHosts.put(param.host, param);
param.buildHashTableForBroadcastJoin = true;
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = param.instanceId;
try {
dest.server = toRpcHost(param.host);
dest.setBrpcServer(toBrpcHost(param.host));
} catch (Exception e) {
throw new RuntimeException(e);
}
destinations.add(dest);
}
});
} else {
for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
dest.brpc_server = toBrpcHost(destParams.instanceExecParams.get(j).host);
destinations.add(dest);
}
}
}
}