[fix](nereids) fix cte bucket shuffle path (#22311)
This commit is contained in:
@ -1432,7 +1432,45 @@ public class Coordinator {
|
||||
}
|
||||
|
||||
List<TPlanFragmentDestination> destinations = multiSink.getDestinations().get(i);
|
||||
if (enablePipelineEngine && enableShareHashTableForBroadcastJoin
|
||||
if (sink.getOutputPartition() != null
|
||||
&& sink.getOutputPartition().isBucketShuffleHashPartition()) {
|
||||
// the destFragment must be bucket shuffle
|
||||
Preconditions.checkState(bucketShuffleJoinController
|
||||
.isBucketShuffleJoin(destFragment.getFragmentId().asInt()), "Sink is"
|
||||
+ "Bucket Shuffle Partition, The destFragment must have bucket shuffle join node ");
|
||||
|
||||
int bucketSeq = 0;
|
||||
int bucketNum = bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId());
|
||||
|
||||
// when left table is empty, it's bucketset is empty.
|
||||
// set right table destination address to the address of left table
|
||||
if (destParams.instanceExecParams.size() == 1 && (bucketNum == 0
|
||||
|| destParams.instanceExecParams.get(0).bucketSeqSet.isEmpty())) {
|
||||
bucketNum = 1;
|
||||
destParams.instanceExecParams.get(0).bucketSeqSet.add(0);
|
||||
}
|
||||
// process bucket shuffle join on fragment without scan node
|
||||
TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0);
|
||||
while (bucketSeq < bucketNum) {
|
||||
TPlanFragmentDestination dest = new TPlanFragmentDestination();
|
||||
|
||||
dest.fragment_instance_id = new TUniqueId(-1, -1);
|
||||
dest.server = dummyServer;
|
||||
dest.setBrpcServer(dummyServer);
|
||||
|
||||
for (FInstanceExecParam instanceExecParams : destParams.instanceExecParams) {
|
||||
if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
|
||||
dest.fragment_instance_id = instanceExecParams.instanceId;
|
||||
dest.server = toRpcHost(instanceExecParams.host);
|
||||
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
bucketSeq++;
|
||||
destinations.add(dest);
|
||||
}
|
||||
} else if (enablePipelineEngine && enableShareHashTableForBroadcastJoin
|
||||
&& ((ExchangeNode) exchNode).isRightChildOfBroadcastHashJoin()) {
|
||||
// here choose the first instance to build hash table.
|
||||
Map<TNetworkAddress, FInstanceExecParam> destHosts = new HashMap<>();
|
||||
|
||||
Reference in New Issue
Block a user