[pipelineX](improvement) Support multiple instances execution on single tablet (#28178)

This commit is contained in:
Gabriel
2023-12-10 20:18:41 +08:00
committed by GitHub
parent 485d7db516
commit 320ddf4987
10 changed files with 106 additions and 54 deletions

View File

@ -1995,8 +1995,7 @@ public class Coordinator implements CoordInterface {
// 4. Disable shared scan optimization by session variable
if (!enablePipelineEngine || (node.isPresent() && node.get().getShouldColoScan())
|| (node.isPresent() && node.get() instanceof FileScanNode)
|| (node.isPresent() && node.get().shouldDisableSharedScan(context))
|| enablePipelineXEngine) {
|| (node.isPresent() && node.get().shouldDisableSharedScan(context))) {
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
@ -3599,6 +3598,12 @@ public class Coordinator implements CoordInterface {
Map<TNetworkAddress, Integer> instanceIdx = new HashMap();
for (int i = 0; i < instanceExecParams.size(); ++i) {
final FInstanceExecParam instanceExecParam = instanceExecParams.get(i);
Map<Integer, List<TScanRangeParams>> scanRanges = instanceExecParam.perNodeScanRanges;
Map<Integer, Boolean> perNodeSharedScans = instanceExecParam.perNodeSharedScans;
if (scanRanges == null) {
scanRanges = Maps.newHashMap();
perNodeSharedScans = Maps.newHashMap();
}
if (!res.containsKey(instanceExecParam.host)) {
TPipelineFragmentParams params = new TPipelineFragmentParams();
@ -3624,6 +3629,7 @@ public class Coordinator implements CoordInterface {
params.setFileScanParams(fileScanRangeParamsMap);
params.setNumBuckets(fragment.getBucketNum());
params.setPerNodeSharedScans(perNodeSharedScans);
res.put(instanceExecParam.host, params);
res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer, Integer>());
instanceIdx.put(instanceExecParam.host, 0);
@ -3641,12 +3647,6 @@ public class Coordinator implements CoordInterface {
localParams.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin);
localParams.setFragmentInstanceId(instanceExecParam.instanceId);
Map<Integer, List<TScanRangeParams>> scanRanges = instanceExecParam.perNodeScanRanges;
Map<Integer, Boolean> perNodeSharedScans = instanceExecParam.perNodeSharedScans;
if (scanRanges == null) {
scanRanges = Maps.newHashMap();
perNodeSharedScans = Maps.newHashMap();
}
localParams.setPerNodeScanRanges(scanRanges);
localParams.setPerNodeSharedScans(perNodeSharedScans);
localParams.setSenderId(i);