[pipelineX](scan) ignore storage data distribution by default (#29192)

This commit is contained in:
Gabriel
2023-12-28 14:54:09 +08:00
committed by GitHub
parent fe93a8f1d0
commit 29a7c0d677
68 changed files with 6630 additions and 29 deletions

View File

@ -865,10 +865,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
// Set colocate info in agg node. This is a hint for local shuffling to decide which type of
// local exchanger will be used.
aggregationNode.setColocate(true);
if (aggregate.getAggMode().isFinalPhase) {
inputPlanFragment.setHasColocateFinalizeAggNode(true);
}
}
setPlanRoot(inputPlanFragment, aggregationNode, aggregate);
if (aggregate.getStats() != null) {

View File

@ -149,8 +149,6 @@ public class PlanFragment extends TreeNode<PlanFragment> {
// has colocate plan node
private boolean hasColocatePlanNode = false;
private boolean hasColocateFinalizeAggNode = false;
private boolean hasNullAwareLeftAntiJoin = false;
private TResultSinkType resultSinkType = TResultSinkType.MYSQL_PROTOCAL;
@ -475,14 +473,6 @@ public class PlanFragment extends TreeNode<PlanFragment> {
this.bucketNum = bucketNum;
}
public boolean isHasColocateFinalizeAggNode() {
return hasColocateFinalizeAggNode;
}
public void setHasColocateFinalizeAggNode(boolean hasColocateFinalizeAggNode) {
this.hasColocateFinalizeAggNode = hasColocateFinalizeAggNode;
}
public boolean isHasNullAwareLeftAntiJoin() {
return hasNullAwareLeftAntiJoin;
}

View File

@ -724,7 +724,6 @@ public abstract class ScanNode extends PlanNode {
return !isKeySearch() && context != null
&& context.getSessionVariable().isIgnoreStorageDataDistribution()
&& context.getSessionVariable().getEnablePipelineXEngine()
&& !fragment.isHasColocateFinalizeAggNode()
&& !fragment.isHasNullAwareLeftAntiJoin();
}

View File

@ -2021,16 +2021,18 @@ public class Coordinator implements CoordInterface {
return scanNode.getId().asInt() == planNodeId;
}).findFirst();
// disable shared scan optimization if one of conditions below is met:
// 1. Use non-pipeline or pipelineX engine
// 2. This fragment has a colocated scan node
// 3. This fragment has a FileScanNode
// 4. Disable shared scan optimization by session variable
/**
* Ignore storage data distribution iff:
* 1. Current fragment is not forced to use data distribution.
* 2. `parallelExecInstanceNum` is larger than scan ranges.
* 3. Use Nereids planner.
*/
boolean sharedScan = true;
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
if (node.isPresent() && (!node.get().shouldDisableSharedScan(context)
|| (node.get().ignoreStorageDataDistribution(context) && useNereids))) {
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
leftMostNode.getNumInstances());
|| (node.get().ignoreStorageDataDistribution(context)
&& expectedInstanceNum > perNodeScanRanges.size() && useNereids))) {
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
// if have limit and conjunts, only need 1 instance to save cpu and
// mem resource
@ -2040,7 +2042,7 @@ public class Coordinator implements CoordInterface {
perInstanceScanRanges = Collections.nCopies(expectedInstanceNum, perNodeScanRanges);
} else {
int expectedInstanceNum = 1;
expectedInstanceNum = 1;
if (parallelExecInstanceNum > 1) {
//the scan instance num should not larger than the tablets num
expectedInstanceNum = Math.min(perNodeScanRanges.size(), parallelExecInstanceNum);
@ -2859,10 +2861,6 @@ public class Coordinator implements CoordInterface {
BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
boolean ignoreStorageDataDistribution = scanNodes.stream().filter(scanNode -> {
return scanNodeIds.contains(scanNode.getId().asInt());
}).allMatch(node -> node.ignoreStorageDataDistribution(context)) && useNereids;
// 1. count each node in one fragment should scan how many tablet, gather them in one list
Map<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressToScanRanges
= Maps.newHashMap();
@ -2885,6 +2883,20 @@ public class Coordinator implements CoordInterface {
}
addressToScanRanges.get(address).add(filteredScanRanges);
}
/**
* Ignore storage data distribution iff:
* 1. Current fragment is not forced to use data distribution.
* 2. `parallelExecInstanceNum` is larger than scan ranges.
* 3. Use Nereids planner.
*/
boolean ignoreStorageDataDistribution = scanNodes.stream().filter(scanNode -> {
return scanNodeIds.contains(scanNode.getId().asInt());
}).allMatch(node -> node.ignoreStorageDataDistribution(context))
&& addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> {
return addressScanRange.getValue().size() < parallelExecInstanceNum;
}) && useNereids;
FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressScanRange
: addressToScanRanges.entrySet()) {

View File

@ -808,7 +808,7 @@ public class SessionVariable implements Serializable, Writable {
@VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
private boolean ignoreStorageDataDistribution = false;
private boolean ignoreStorageDataDistribution = true;
@VariableMgr.VarAttr(
name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,