diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index a2877e7538..cd498ccf9b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -3596,6 +3596,7 @@ public class Coordinator implements CoordInterface { } Map res = new HashMap(); + Map instanceIdx = new HashMap(); for (int i = 0; i < instanceExecParams.size(); ++i) { final FInstanceExecParam instanceExecParam = instanceExecParams.get(i); if (!res.containsKey(instanceExecParam.host)) { @@ -3625,10 +3626,16 @@ public class Coordinator implements CoordInterface { params.setNumBuckets(fragment.getBucketNum()); res.put(instanceExecParam.host, params); res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap()); + instanceIdx.put(instanceExecParam.host, 0); } + // Set each bucket belongs to which instance on this BE. + // This is used for LocalExchange(BUCKET_HASH_SHUFFLE). + int instanceId = instanceIdx.get(instanceExecParam.host); for (int bucket : instanceExecParam.bucketSeqSet) { - res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket, i); + res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket, instanceId); + } + instanceIdx.replace(instanceExecParam.host, ++instanceId); TPipelineFragmentParams params = res.get(instanceExecParam.host); TPipelineInstanceParams localParams = new TPipelineInstanceParams();