[Pipeline](shared_scan_opt) Support shared scan opt in pipeline exec engine

This commit is contained in:
HappenLee
2023-03-13 10:33:57 +08:00
committed by GitHub
parent edb2d90852
commit 39b5682d59
35 changed files with 476 additions and 288 deletions

View File

@ -44,6 +44,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggrega
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.JoinUtils;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@ -218,7 +219,9 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
@Override
public PhysicalProperties visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanContext context) {
// TODO: find a better way to handle both tablet num == 1 and colocate table together in future
if (!olapScan.getTable().isColocateTable() && olapScan.getScanTabletNum() == 1) {
if (!olapScan.getTable().isColocateTable() && olapScan.getScanTabletNum() == 1
&& (!ConnectContext.get().getSessionVariable().enablePipelineEngine()
|| ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() == 1)) {
return PhysicalProperties.GATHER;
} else if (olapScan.getDistributionSpec() instanceof DistributionSpecHash) {
return PhysicalProperties.createHash((DistributionSpecHash) olapScan.getDistributionSpec());

View File

@ -1118,6 +1118,9 @@ public class OlapScanNode extends ScanNode {
@Override
public int getNumInstances() {
if (ConnectContext.get().getSessionVariable().enablePipelineEngine()) {
return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
}
return result.size();
}

View File

@ -1622,6 +1622,7 @@ public class Coordinator {
bucketShuffleJoinController.computeInstanceParam(fragment.getFragmentId(),
parallelExecInstanceNum, params);
} else {
params.sharedScanOpt = true;
// case A
for (Entry<TNetworkAddress, Map<Integer, List<TScanRangeParams>>> entry : fragmentExecParamsMap.get(
fragment.getFragmentId()).scanRangeAssignment.entrySet()) {
@ -1630,13 +1631,22 @@ public class Coordinator {
for (Integer planNodeId : value.keySet()) {
List<TScanRangeParams> perNodeScanRanges = value.get(planNodeId);
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
expectedInstanceNum = Math.min(perNodeScanRanges.size(), parallelExecInstanceNum);
List<List<TScanRangeParams>> perInstanceScanRanges = Lists.newArrayList();
if (!enablePipelineEngine) {
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
expectedInstanceNum = Math.min(perNodeScanRanges.size(), parallelExecInstanceNum);
}
perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges,
expectedInstanceNum);
} else {
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
for (int j = 0; j < Math.max(expectedInstanceNum, 1); j++) {
perInstanceScanRanges.add(perNodeScanRanges);
}
}
List<List<TScanRangeParams>> perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges,
expectedInstanceNum);
LOG.debug("scan range number per instance is: {}", perInstanceScanRanges.size());
@ -3034,6 +3044,8 @@ public class Coordinator {
public List<FInstanceExecParam> instanceExecParams = Lists.newArrayList();
public FragmentScanRangeAssignment scanRangeAssignment = new FragmentScanRangeAssignment();
public boolean sharedScanOpt = false;
public FragmentExecParams(PlanFragment fragment) {
this.fragment = fragment;
}
@ -3125,6 +3137,7 @@ public class Coordinator {
fragment.isTransferQueryStatisticsWithEveryBatch());
params.setFragment(fragment.toThrift());
params.setLocalParams(Lists.newArrayList());
params.setSharedScanOpt(sharedScanOpt);
res.put(instanceExecParam.host, params);
}
TPipelineFragmentParams params = res.get(instanceExecParam.host);

View File

@ -1593,6 +1593,7 @@ public class SessionVariable implements Serializable, Writable {
tResult.setCodegenLevel(codegenLevel);
tResult.setBeExecVersion(Config.be_exec_version);
tResult.setEnablePipelineEngine(enablePipelineEngine);
tResult.setParallelInstance(parallelExecInstanceNum);
tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary);
tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery);
tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin);