[pipelineX](fix) Fix correctness problem due to local hash shuffle (#29881)

This commit is contained in:
Gabriel
2024-01-11 22:28:00 +08:00
committed by yiguolei
parent acda8d2129
commit 1718341051
6 changed files with 59 additions and 21 deletions

View File

@ -1611,6 +1611,7 @@ public class Coordinator implements CoordInterface {
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
instanceExecParams.recvrId = params.destinations.size();
break;
}
}
@ -1630,6 +1631,7 @@ public class Coordinator implements CoordInterface {
destHosts.put(param.host, param);
param.buildHashTableForBroadcastJoin = true;
TPlanFragmentDestination dest = new TPlanFragmentDestination();
param.recvrId = params.destinations.size();
dest.fragment_instance_id = param.instanceId;
try {
dest.server = toRpcHost(param.host);
@ -1653,6 +1655,7 @@ public class Coordinator implements CoordInterface {
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
dest.setBrpcServer(toBrpcHost(destParams.instanceExecParams.get(j).host));
destParams.instanceExecParams.get(j).recvrId = params.destinations.size();
params.destinations.add(dest);
}
}
@ -1732,6 +1735,7 @@ public class Coordinator implements CoordInterface {
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
dest.setBrpcServer(toBrpcHost(instanceExecParams.host));
instanceExecParams.recvrId = params.destinations.size();
break;
}
}
@ -1752,6 +1756,7 @@ public class Coordinator implements CoordInterface {
param.buildHashTableForBroadcastJoin = true;
TPlanFragmentDestination dest = new TPlanFragmentDestination();
dest.fragment_instance_id = param.instanceId;
param.recvrId = params.destinations.size();
try {
dest.server = toRpcHost(param.host);
dest.setBrpcServer(toBrpcHost(param.host));
@ -1773,6 +1778,7 @@ public class Coordinator implements CoordInterface {
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
dest.brpc_server = toBrpcHost(destParams.instanceExecParams.get(j).host);
destParams.instanceExecParams.get(j).recvrId = params.destinations.size();
destinations.add(dest);
}
}
@ -3755,22 +3761,26 @@ public class Coordinator implements CoordInterface {
params.setFileScanParams(fileScanRangeParamsMap);
params.setNumBuckets(fragment.getBucketNum());
params.setPerNodeSharedScans(perNodeSharedScans);
params.setTotalInstances(instanceExecParams.size());
if (ignoreDataDistribution) {
params.setParallelInstances(parallelTasksNum);
}
res.put(instanceExecParam.host, params);
res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer, Integer>());
res.get(instanceExecParam.host).setShuffleIdxToInstanceIdx(new HashMap<Integer, Integer>());
instanceIdx.put(instanceExecParam.host, 0);
}
// Set each bucket belongs to which instance on this BE.
// This is used for LocalExchange(BUCKET_HASH_SHUFFLE).
int instanceId = instanceIdx.get(instanceExecParam.host);
for (int bucket : instanceExecParam.bucketSeqSet) {
res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket, instanceId);
}
instanceIdx.replace(instanceExecParam.host, ++instanceId);
TPipelineFragmentParams params = res.get(instanceExecParam.host);
res.get(instanceExecParam.host).getShuffleIdxToInstanceIdx().put(instanceExecParam.recvrId,
params.getLocalParams().size());
TPipelineInstanceParams localParams = new TPipelineInstanceParams();
localParams.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin);
@ -3919,6 +3929,8 @@ public class Coordinator implements CoordInterface {
boolean buildHashTableForBroadcastJoin = false;
int recvrId = -1;
List<TUniqueId> instancesSharingHashTable = Lists.newArrayList();
public void addBucketSeq(int bucketSeq) {