[pipelineX](local shuffle) Support parallel execution despite of tablet number (#28266)

This commit is contained in:
Gabriel
2023-12-14 12:53:54 +08:00
committed by GitHub
parent b443db6e3e
commit c00dca70e6
35 changed files with 480 additions and 205 deletions

View File

@ -863,6 +863,10 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
// Set colocate info in agg node. This is a hint for local shuffling to decide which type of
// local exchanger will be used.
aggregationNode.setColocate(true);
if (aggregate.getAggMode().isFinalPhase) {
inputPlanFragment.setHasColocateFinalizeAggNode(true);
}
}
setPlanRoot(inputPlanFragment, aggregationNode, aggregate);
if (aggregate.getStats() != null) {
@ -1134,6 +1138,10 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
null, null, null, hashJoin.isMarkJoin());
PlanFragment currentFragment = connectJoinNode(hashJoinNode, leftFragment, rightFragment, context, hashJoin);
if (joinType == JoinType.NULL_AWARE_LEFT_ANTI_JOIN) {
currentFragment.setHasNullAwareLeftAntiJoin(true);
}
if (JoinUtils.shouldColocateJoin(physicalHashJoin)) {
// TODO: add reason
hashJoinNode.setColocate(true, "");

View File

@ -1277,9 +1277,14 @@ public class OlapScanNode extends ScanNode {
// In pipeline exec engine, the instance num equals be_num * parallel instance.
// so here we need count distinct be_num to do the work. make sure get right instance
if (ConnectContext.get().getSessionVariable().getEnablePipelineEngine()
&& !ConnectContext.get().getSessionVariable().getEnablePipelineXEngine()
&& ConnectContext.get().getSessionVariable().getEnableSharedScan()) {
return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
}
if (ConnectContext.get().getSessionVariable().getEnablePipelineXEngine()
&& ConnectContext.get().getSessionVariable().isIgnoreScanDistribution()) {
return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
}
return scanRangeLocations.size();
}

View File

@ -149,6 +149,10 @@ public class PlanFragment extends TreeNode<PlanFragment> {
// has colocate plan node
private boolean hasColocatePlanNode = false;
private boolean hasColocateFinalizeAggNode = false;
private boolean hasNullAwareLeftAntiJoin = false;
private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL;
/**
@ -470,4 +474,20 @@ public class PlanFragment extends TreeNode<PlanFragment> {
public void setBucketNum(int bucketNum) {
this.bucketNum = bucketNum;
}
public boolean isHasColocateFinalizeAggNode() {
return hasColocateFinalizeAggNode;
}
public void setHasColocateFinalizeAggNode(boolean hasColocateFinalizeAggNode) {
this.hasColocateFinalizeAggNode = hasColocateFinalizeAggNode;
}
public boolean isHasNullAwareLeftAntiJoin() {
return hasNullAwareLeftAntiJoin;
}
public void setHasNullAwareLeftAntiJoin(boolean hasNullAwareLeftAntiJoin) {
this.hasNullAwareLeftAntiJoin = hasNullAwareLeftAntiJoin;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.external.FederationBackendPolicy;
import org.apache.doris.planner.external.FileScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
@ -711,7 +712,20 @@ public abstract class ScanNode extends PlanNode {
// 1. is key search
// 2. session variable not enable_shared_scan
public boolean shouldDisableSharedScan(ConnectContext context) {
return isKeySearch() || !context.getSessionVariable().getEnableSharedScan();
return isKeySearch() || context == null
|| !context.getSessionVariable().getEnableSharedScan()
|| !context.getSessionVariable().getEnablePipelineEngine()
|| context.getSessionVariable().getEnablePipelineXEngine()
|| this instanceof FileScanNode
|| getShouldColoScan();
}
public boolean ignoreScanDistribution(ConnectContext context) {
return !isKeySearch() && context != null
&& context.getSessionVariable().isIgnoreScanDistribution()
&& context.getSessionVariable().getEnablePipelineXEngine()
&& !fragment.isHasColocateFinalizeAggNode()
&& !fragment.isHasNullAwareLeftAntiJoin();
}
public boolean haveLimitAndConjunts() {

View File

@ -34,6 +34,7 @@ import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.loadv2.LoadJob;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.nereids.NereidsPlanner;
import org.apache.doris.nereids.stats.StatsErrorEstimator;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataSink;
@ -45,7 +46,6 @@ import org.apache.doris.planner.IntersectNode;
import org.apache.doris.planner.MultiCastDataSink;
import org.apache.doris.planner.MultiCastPlanFragment;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OriginalPlanner;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNode;
@ -60,7 +60,6 @@ import org.apache.doris.planner.SetOperationNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.planner.external.ExternalScanNode;
import org.apache.doris.planner.external.FileQueryScanNode;
import org.apache.doris.planner.external.FileScanNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
import org.apache.doris.proto.InternalService.PExecPlanFragmentStartRequest;
@ -247,6 +246,7 @@ public class Coordinator implements CoordInterface {
private boolean enablePipelineEngine = false;
private boolean enablePipelineXEngine = false;
private boolean useNereids = false;
// Runtime filter merge instance address and ID
public TNetworkAddress runtimeFilterMergeAddr;
@ -307,7 +307,8 @@ public class Coordinator implements CoordInterface {
&& (fragments.size() > 0);
initQueryOptions(context);
if (planner instanceof OriginalPlanner) {
useNereids = planner instanceof NereidsPlanner;
if (!useNereids) {
// Enable local shuffle on pipelineX engine only if Nereids planner is applied.
queryOptions.setEnableLocalShuffle(false);
}
@ -1584,7 +1585,10 @@ public class Coordinator implements CoordInterface {
dest.server = dummyServer;
dest.setBrpcServer(dummyServer);
for (FInstanceExecParam instanceExecParams : destParams.instanceExecParams) {
int parallelTasksNum = destParams.ignoreDataDistribution
? destParams.parallelTasksNum : destParams.instanceExecParams.size();
for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) {
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
@ -1692,7 +1696,10 @@ public class Coordinator implements CoordInterface {
dest.server = dummyServer;
dest.setBrpcServer(dummyServer);
for (FInstanceExecParam instanceExecParams : destParams.instanceExecParams) {
int parallelTasksNum = destParams.ignoreDataDistribution
? destParams.parallelTasksNum : destParams.instanceExecParams.size();
for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) {
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
dest.fragment_instance_id = instanceExecParams.instanceId;
dest.server = toRpcHost(instanceExecParams.host);
@ -1989,7 +1996,6 @@ public class Coordinator implements CoordInterface {
for (Integer planNodeId : value.keySet()) {
List<TScanRangeParams> perNodeScanRanges = value.get(planNodeId);
List<List<TScanRangeParams>> perInstanceScanRanges = Lists.newArrayList();
List<Boolean> sharedScanOpts = Lists.newArrayList();
Optional<ScanNode> node = scanNodes.stream().filter(scanNode -> {
return scanNode.getId().asInt() == planNodeId;
@ -2000,9 +2006,20 @@ public class Coordinator implements CoordInterface {
// 2. This fragment has a colocated scan node
// 3. This fragment has a FileScanNode
// 4. Disable shared scan optimization by session variable
if (!enablePipelineEngine || (node.isPresent() && node.get().getShouldColoScan())
|| (node.isPresent() && node.get() instanceof FileScanNode)
|| (node.isPresent() && node.get().shouldDisableSharedScan(context))) {
boolean sharedScan = true;
if (node.isPresent() && (!node.get().shouldDisableSharedScan(context)
|| (node.get().ignoreScanDistribution(context) && useNereids))) {
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
// if have limit and conjunts, only need 1 instance to save cpu and
// mem resource
if (node.get().haveLimitAndConjunts()) {
expectedInstanceNum = 1;
}
perInstanceScanRanges = Collections.nCopies(expectedInstanceNum, perNodeScanRanges);
} else {
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
@ -2010,38 +2027,27 @@ public class Coordinator implements CoordInterface {
}
// if have limit and conjunts, only need 1 instance to save cpu and
// mem resource
if (node.isPresent() && node.get().haveLimitAndConjunts()) {
if (node.get().haveLimitAndConjunts()) {
expectedInstanceNum = 1;
}
perInstanceScanRanges = ListUtil.splitBySize(perNodeScanRanges,
expectedInstanceNum);
sharedScanOpts = Collections.nCopies(perInstanceScanRanges.size(), false);
} else {
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
// if have limit and conjunts, only need 1 instance to save cpu and
// mem resource
if (node.isPresent() && node.get().haveLimitAndConjunts()) {
expectedInstanceNum = 1;
}
perInstanceScanRanges = Collections.nCopies(expectedInstanceNum, perNodeScanRanges);
sharedScanOpts = Collections.nCopies(perInstanceScanRanges.size(), true);
sharedScan = false;
}
LOG.debug("scan range number per instance is: {}", perInstanceScanRanges.size());
for (int j = 0; j < perInstanceScanRanges.size(); j++) {
List<TScanRangeParams> scanRangeParams = perInstanceScanRanges.get(j);
boolean sharedScan = sharedScanOpts.get(j);
FInstanceExecParam instanceParam = new FInstanceExecParam(null, key, 0, params);
instanceParam.perNodeScanRanges.put(planNodeId, scanRangeParams);
instanceParam.perNodeSharedScans.put(planNodeId, sharedScan);
params.instanceExecParams.add(instanceParam);
}
params.ignoreDataDistribution = sharedScan;
params.parallelTasksNum = sharedScan ? 1 : params.instanceExecParams.size();
}
}
}
@ -2156,74 +2162,8 @@ public class Coordinator implements CoordInterface {
private void computeColocateJoinInstanceParam(PlanFragmentId fragmentId,
int parallelExecInstanceNum, FragmentExecParams params) {
Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragmentId);
BucketSeqToScanRange bucketSeqToScanRange = fragmentIdTobucketSeqToScanRangeMap.get(fragmentId);
Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
// 1. count each node in one fragment should scan how many tablet, gather them in one list
Map<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressToScanRanges
= Maps.newHashMap();
for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> scanRanges : bucketSeqToScanRange.entrySet()) {
TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey());
Map<Integer, List<TScanRangeParams>> nodeScanRanges = scanRanges.getValue();
// We only care about the node scan ranges of scan nodes which belong to this fragment
Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges = Maps.newHashMap();
for (Integer scanNodeId : nodeScanRanges.keySet()) {
if (scanNodeIds.contains(scanNodeId)) {
filteredNodeScanRanges.put(scanNodeId, nodeScanRanges.get(scanNodeId));
}
}
// set bucket for scanRange, the pair is <bucket_num, map<scanNode_id, list<scanRange>>>>
// we should make sure
// 1. same bucket in some address be
// 2. different scanNode id scan different scanRange which belong to the scanNode id
// 3. split how many scanRange one instance should scan, same bucket do not split to different instance
Pair<Integer, Map<Integer, List<TScanRangeParams>>> filteredScanRanges
= Pair.of(scanRanges.getKey(), filteredNodeScanRanges);
if (!addressToScanRanges.containsKey(address)) {
addressToScanRanges.put(address, Lists.newArrayList());
}
addressToScanRanges.get(address).add(filteredScanRanges);
}
FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange
: addressToScanRanges.entrySet()) {
List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> scanRange = addressScanRange.getValue();
Map<Integer, List<TScanRangeParams>> range
= findOrInsert(assignment, addressScanRange.getKey(), new HashMap<>());
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
expectedInstanceNum = Math.min(scanRange.size(), parallelExecInstanceNum);
}
// 2.split how many scanRange one instance should scan
List<List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> perInstanceScanRanges
= ListUtil.splitBySize(scanRange, expectedInstanceNum);
// 3.construct instanceExecParam add the scanRange should be scan by instance
for (List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> perInstanceScanRange
: perInstanceScanRanges) {
FInstanceExecParam instanceParam = new FInstanceExecParam(null, addressScanRange.getKey(), 0, params);
for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : perInstanceScanRange) {
instanceParam.bucketSeqSet.add(nodeScanRangeMap.first);
for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange
: nodeScanRangeMap.second.entrySet()) {
if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
range.put(nodeScanRange.getKey(), Lists.newArrayList());
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), Lists.newArrayList());
}
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
}
}
params.instanceExecParams.add(instanceParam);
}
}
assignScanRanges(fragmentId, parallelExecInstanceNum, params, fragmentIdTobucketSeqToScanRangeMap,
fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds);
}
private Map<TNetworkAddress, Long> getReplicaNumPerHostForOlapTable() {
@ -2882,44 +2822,81 @@ public class Coordinator implements CoordInterface {
private void computeInstanceParam(PlanFragmentId fragmentId,
int parallelExecInstanceNum, FragmentExecParams params) {
Map<Integer, TNetworkAddress> bucketSeqToAddress = fragmentIdToSeqToAddressMap.get(fragmentId);
BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
assignScanRanges(fragmentId, parallelExecInstanceNum, params, fragmentIdBucketSeqToScanRangeMap,
fragmentIdToSeqToAddressMap, fragmentIdToScanNodeIds);
}
}
// 1. count each node in one fragment should scan how many tablet, gather them in one list
Map<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressToScanRanges
= Maps.newHashMap();
for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> scanRanges
: bucketSeqToScanRange.entrySet()) {
TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey());
Map<Integer, List<TScanRangeParams>> nodeScanRanges = scanRanges.getValue();
// We only care about the node scan ranges of scan nodes which belong to this fragment
Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges = Maps.newHashMap();
for (Integer scanNodeId : nodeScanRanges.keySet()) {
if (scanNodeIds.contains(scanNodeId)) {
filteredNodeScanRanges.put(scanNodeId, nodeScanRanges.get(scanNodeId));
private void assignScanRanges(PlanFragmentId fragmentId, int parallelExecInstanceNum, FragmentExecParams params,
Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdBucketSeqToScanRangeMap,
Map<PlanFragmentId, Map<Integer, TNetworkAddress>> curFragmentIdToSeqToAddressMap,
Map<PlanFragmentId, Set<Integer>> fragmentIdToScanNodeIds) {
Map<Integer, TNetworkAddress> bucketSeqToAddress = curFragmentIdToSeqToAddressMap.get(fragmentId);
BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
boolean ignoreScanDistribution = scanNodes.stream().filter(scanNode -> {
return scanNodeIds.contains(scanNode.getId().asInt());
}).allMatch(node -> node.ignoreScanDistribution(context)) && useNereids;
// 1. count each node in one fragment should scan how many tablet, gather them in one list
Map<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressToScanRanges
= Maps.newHashMap();
for (Map.Entry<Integer, Map<Integer, List<TScanRangeParams>>> scanRanges
: bucketSeqToScanRange.entrySet()) {
TNetworkAddress address = bucketSeqToAddress.get(scanRanges.getKey());
Map<Integer, List<TScanRangeParams>> nodeScanRanges = scanRanges.getValue();
// We only care about the node scan ranges of scan nodes which belong to this fragment
Map<Integer, List<TScanRangeParams>> filteredNodeScanRanges = Maps.newHashMap();
for (Integer scanNodeId : nodeScanRanges.keySet()) {
if (scanNodeIds.contains(scanNodeId)) {
filteredNodeScanRanges.put(scanNodeId, nodeScanRanges.get(scanNodeId));
}
}
Pair<Integer, Map<Integer, List<TScanRangeParams>>> filteredScanRanges
= Pair.of(scanRanges.getKey(), filteredNodeScanRanges);
if (!addressToScanRanges.containsKey(address)) {
addressToScanRanges.put(address, Lists.newArrayList());
}
addressToScanRanges.get(address).add(filteredScanRanges);
}
FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange
: addressToScanRanges.entrySet()) {
List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> scanRange = addressScanRange.getValue();
Map<Integer, List<TScanRangeParams>> range
= findOrInsert(assignment, addressScanRange.getKey(), new HashMap<>());
if (ignoreScanDistribution) {
FInstanceExecParam instanceParam = new FInstanceExecParam(
null, addressScanRange.getKey(), 0, params);
for (Pair<Integer, Map<Integer, List<TScanRangeParams>>> nodeScanRangeMap : scanRange) {
instanceParam.addBucketSeq(nodeScanRangeMap.first);
for (Map.Entry<Integer, List<TScanRangeParams>> nodeScanRange
: nodeScanRangeMap.second.entrySet()) {
if (!instanceParam.perNodeScanRanges.containsKey(nodeScanRange.getKey())) {
range.put(nodeScanRange.getKey(), Lists.newArrayList());
instanceParam.perNodeScanRanges.put(nodeScanRange.getKey(), Lists.newArrayList());
instanceParam.perNodeSharedScans.put(nodeScanRange.getKey(), true);
}
range.get(nodeScanRange.getKey()).addAll(nodeScanRange.getValue());
instanceParam.perNodeScanRanges.get(nodeScanRange.getKey())
.addAll(nodeScanRange.getValue());
}
}
Pair<Integer, Map<Integer, List<TScanRangeParams>>> filteredScanRanges
= Pair.of(scanRanges.getKey(), filteredNodeScanRanges);
if (!addressToScanRanges.containsKey(address)) {
addressToScanRanges.put(address, Lists.newArrayList());
params.instanceExecParams.add(instanceParam);
for (int i = 1; i < parallelExecInstanceNum; i++) {
params.instanceExecParams.add(new FInstanceExecParam(
null, addressScanRange.getKey(), 0, params));
}
addressToScanRanges.get(address).add(filteredScanRanges);
}
FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange
: addressToScanRanges.entrySet()) {
List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>> scanRange = addressScanRange.getValue();
Map<Integer, List<TScanRangeParams>> range
= findOrInsert(assignment, addressScanRange.getKey(), new HashMap<>());
} else {
int expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
expectedInstanceNum = Math.min(scanRange.size(), parallelExecInstanceNum);
}
// 2. split how many scanRange one instance should scan
List<List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> perInstanceScanRanges
= ListUtil.splitBySize(scanRange, expectedInstanceNum);
@ -2947,6 +2924,8 @@ public class Coordinator implements CoordInterface {
}
}
}
params.parallelTasksNum = ignoreScanDistribution ? 1 : params.instanceExecParams.size();
params.ignoreDataDistribution = ignoreScanDistribution;
}
private final Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdTobucketSeqToScanRangeMap = Maps.newHashMap();
@ -3561,6 +3540,8 @@ public class Coordinator implements CoordInterface {
// used to assemble TPlanFragmentExecParams
protected class FragmentExecParams {
public PlanFragment fragment;
public int parallelTasksNum = 0;
public boolean ignoreDataDistribution = false;
public List<TPlanFragmentDestination> destinations = Lists.newArrayList();
public Map<Integer, Integer> perExchNumSenders = Maps.newHashMap();
@ -3705,6 +3686,9 @@ public class Coordinator implements CoordInterface {
params.setFileScanParams(fileScanRangeParamsMap);
params.setNumBuckets(fragment.getBucketNum());
params.setPerNodeSharedScans(perNodeSharedScans);
if (ignoreDataDistribution) {
params.setParallelInstances(parallelTasksNum);
}
res.put(instanceExecParam.host, params);
res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer, Integer>());
instanceIdx.put(instanceExecParam.host, 0);

View File

@ -219,6 +219,7 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_PIPELINE_X_ENGINE = "enable_pipeline_x_engine";
public static final String ENABLE_SHARED_SCAN = "enable_shared_scan";
public static final String IGNORE_SCAN_DISTRIBUTION = "ignore_scan_distribution";
public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle";
@ -783,6 +784,10 @@ public class SessionVariable implements Serializable, Writable {
needForward = true)
private boolean enableSharedScan = false;
@VariableMgr.VarAttr(name = IGNORE_SCAN_DISTRIBUTION, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
needForward = true)
private boolean ignoreScanDistribution = false;
@VariableMgr.VarAttr(
name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
description = {"是否在pipelineX引擎上开启local shuffle优化",
@ -3143,4 +3148,12 @@ public class SessionVariable implements Serializable, Writable {
public boolean isMaterializedViewRewriteEnableContainForeignTable() {
return materializedViewRewriteEnableContainForeignTable;
}
public boolean isIgnoreScanDistribution() {
return ignoreScanDistribution && getEnablePipelineXEngine() && enableLocalShuffle;
}
public void setIgnoreScanDistribution(boolean ignoreScanDistribution) {
this.ignoreScanDistribution = ignoreScanDistribution;
}
}