[Opt](pipeline) opt pipeline shared scan (#18715)

This commit is contained in:
HappenLee
2023-04-17 13:06:39 +08:00
committed by GitHub
parent 9e960f4c4f
commit eb128753ac
9 changed files with 54 additions and 20 deletions

View File

@ -1222,6 +1222,11 @@ public class OlapScanNode extends ScanNode {
shouldColoScan = true;
}
@Override
public boolean getShouldColoScan() {
return shouldColoScan;
}
@Override
protected void toThrift(TPlanNode msg) {
List<String> keyColumnNames = new ArrayList<String>();

View File

@ -850,6 +850,10 @@ public abstract class PlanNode extends TreeNode<PlanNode> implements PlanStats {
public void setShouldColoScan() {}
public boolean getShouldColoScan() {
return false;
}
public void setNumInstances(int numInstances) {
this.numInstances = numInstances;
}

View File

@ -139,6 +139,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@ -1631,7 +1632,6 @@ public class Coordinator {
bucketShuffleJoinController.computeInstanceParam(fragment.getFragmentId(),
parallelExecInstanceNum, params);
} else {
params.sharedScanOpt = true;
// case A
for (Entry<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> entry : fragmentExecParamsMap.get(
fragment.getFragmentId()).scanRangeAssignment.entrySet()) {
@ -1641,7 +1641,14 @@ public class Coordinator {
for (Integer planNodeId : value.keySet()) {
List<TScanRangeParams> perNodeScanRanges = value.get(planNodeId);
List<List<TScanRangeParams>> perInstanceScanRanges = Lists.newArrayList();
if (!enablePipelineEngine) {
List<Boolean> sharedScanOpts = Lists.newArrayList();
Optional<ScanNode> node = scanNodes.stream().filter(scanNode -> {
return scanNode.getId().asInt() == planNodeId;
}).findFirst();
if (!enablePipelineEngine || perNodeScanRanges.size() > parallelExecInstanceNum
|| (node.isPresent() && node.get().getShouldColoScan())) {
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
@ -1649,19 +1656,24 @@ public class Coordinator {
}
perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges,
expectedInstanceNum);
sharedScanOpts = Collections.nCopies(perInstanceScanRanges.size(), false);
} else {
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
for (int j = 0; j < Math.max(expectedInstanceNum, 1); j++) {
perInstanceScanRanges.add(perNodeScanRanges);
}
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
perInstanceScanRanges = Collections.nCopies(expectedInstanceNum, perNodeScanRanges);
sharedScanOpts = Collections.nCopies(perInstanceScanRanges.size(), true);
}
LOG.debug("scan range number per instance is: {}", perInstanceScanRanges.size());
for (List<TScanRangeParams> scanRangeParams : perInstanceScanRanges) {
for (int j = 0; j < perInstanceScanRanges.size(); j++) {
List<TScanRangeParams> scanRangeParams = perInstanceScanRanges.get(j);
boolean sharedScan = sharedScanOpts.get(j);
FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, 0, params);
instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams);
instanceParam.perNodeSharedScans.put(planNodeId, sharedScan);
params.instanceExecParams.add(instanceParam);
}
}
@ -3059,8 +3071,6 @@ public class Coordinator {
public List<FInstanceExecParam> instanceExecParams = Lists.newArrayList();
public FragmentScanRangeAssignment scanRangeAssignment = new FragmentScanRangeAssignment();
public boolean sharedScanOpt = false;
public FragmentExecParams(PlanFragment fragment) {
this.fragment = fragment;
}
@ -3152,7 +3162,6 @@ public class Coordinator {
fragment.isTransferQueryStatisticsWithEveryBatch());
params.setFragment(fragment.toThrift());
params.setLocalParams(Lists.newArrayList());
params.setSharedScanOpt(sharedScanOpt);
if (tResourceGroups != null) {
params.setResourceGroups(tResourceGroups);
}
@ -3164,10 +3173,13 @@ public class Coordinator {
localParams.setBuildHashTableForBroadcastJoin(instanceExecParam.buildHashTableForBroadcastJoin);
localParams.setFragmentInstanceId(instanceExecParam.instanceId);
Map<Integer, List<TScanRangeParams>> scanRanges = instanceExecParam.perNodeScanRanges;
Map<Integer, Boolean> perNodeSharedScans = instanceExecParam.perNodeSharedScans;
if (scanRanges == null) {
scanRanges = Maps.newHashMap();
perNodeSharedScans = Maps.newHashMap();
}
localParams.setPerNodeScanRanges(scanRanges);
localParams.setPerNodeSharedScans(perNodeSharedScans);
localParams.setSenderId(i);
localParams.setBackendNum(backendNum++);
localParams.setRuntimeFilterParams(new TRuntimeFilterParams());
@ -3265,6 +3277,7 @@ public class Coordinator {
TUniqueId instanceId;
TNetworkAddress host;
Map<Integer, List<TScanRangeParams>> perNodeScanRanges = Maps.newHashMap();
Map<Integer, Boolean> perNodeSharedScans = Maps.newHashMap();
int perFragmentInstanceIdx;