[pipelineX](improvement) Support global runtime filter (#28692)
This commit is contained in:
@ -289,6 +289,11 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
return join;
|
||||
}
|
||||
RuntimeFilterContext ctx = context.getRuntimeFilterContext();
|
||||
if (ctx.getSessionVariable().isIgnoreStorageDataDistribution()) {
|
||||
// BITMAP filter is not supported to merge. So we disable this kind of runtime filter
|
||||
// if IgnoreStorageDataDistribution is enabled.
|
||||
return join;
|
||||
}
|
||||
|
||||
if ((ctx.getSessionVariable().getRuntimeFilterType() & TRuntimeFilterType.BITMAP.getValue()) != 0) {
|
||||
generateBitMapRuntimeFilterForNLJ(join, ctx);
|
||||
@ -363,6 +368,11 @@ public class RuntimeFilterGenerator extends PlanPostProcessor {
|
||||
List<TRuntimeFilterType> legalTypes = Arrays.stream(TRuntimeFilterType.values())
|
||||
.filter(type -> (type.getValue() & ctx.getSessionVariable().getRuntimeFilterType()) > 0)
|
||||
.collect(Collectors.toList());
|
||||
if (ctx.getSessionVariable().isIgnoreStorageDataDistribution()) {
|
||||
// If storage data distribution is ignored, we use BLOOM filter.
|
||||
legalTypes.clear();
|
||||
legalTypes.add(TRuntimeFilterType.BLOOM);
|
||||
}
|
||||
List<EqualTo> hashJoinConjuncts = join.getEqualToConjuncts();
|
||||
for (int i = 0; i < hashJoinConjuncts.size(); i++) {
|
||||
EqualTo equalTo = ((EqualTo) JoinUtils.swapEqualToForChildrenOrder(
|
||||
|
||||
@ -1297,7 +1297,7 @@ public class OlapScanNode extends ScanNode {
|
||||
return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
|
||||
}
|
||||
if (ConnectContext.get().getSessionVariable().getEnablePipelineXEngine()
|
||||
&& ConnectContext.get().getSessionVariable().isIgnoreScanDistribution()) {
|
||||
&& ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) {
|
||||
return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
|
||||
}
|
||||
return scanRangeLocations.size();
|
||||
|
||||
@ -720,9 +720,9 @@ public abstract class ScanNode extends PlanNode {
|
||||
|| getShouldColoScan();
|
||||
}
|
||||
|
||||
public boolean ignoreScanDistribution(ConnectContext context) {
|
||||
public boolean ignoreStorageDataDistribution(ConnectContext context) {
|
||||
return !isKeySearch() && context != null
|
||||
&& context.getSessionVariable().isIgnoreScanDistribution()
|
||||
&& context.getSessionVariable().isIgnoreStorageDataDistribution()
|
||||
&& context.getSessionVariable().getEnablePipelineXEngine()
|
||||
&& !fragment.isHasColocateFinalizeAggNode()
|
||||
&& !fragment.isHasNullAwareLeftAntiJoin();
|
||||
|
||||
@ -1624,7 +1624,9 @@ public class Coordinator implements CoordInterface {
|
||||
});
|
||||
} else {
|
||||
// add destination host to this fragment's destination
|
||||
for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
|
||||
int parallelTasksNum = destParams.ignoreDataDistribution
|
||||
? destParams.parallelTasksNum : destParams.instanceExecParams.size();
|
||||
for (int j = 0; j < parallelTasksNum; ++j) {
|
||||
TPlanFragmentDestination dest = new TPlanFragmentDestination();
|
||||
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
|
||||
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
|
||||
@ -2008,7 +2010,7 @@ public class Coordinator implements CoordInterface {
|
||||
// 4. Disable shared scan optimization by session variable
|
||||
boolean sharedScan = true;
|
||||
if (node.isPresent() && (!node.get().shouldDisableSharedScan(context)
|
||||
|| (node.get().ignoreScanDistribution(context) && useNereids))) {
|
||||
|| (node.get().ignoreStorageDataDistribution(context) && useNereids))) {
|
||||
int expectedInstanceNum = Math.min(parallelExecInstanceNum,
|
||||
leftMostNode.getNumInstances());
|
||||
expectedInstanceNum = Math.max(expectedInstanceNum, 1);
|
||||
@ -2835,9 +2837,9 @@ public class Coordinator implements CoordInterface {
|
||||
BucketSeqToScanRange bucketSeqToScanRange = fragmentIdBucketSeqToScanRangeMap.get(fragmentId);
|
||||
Set<Integer> scanNodeIds = fragmentIdToScanNodeIds.get(fragmentId);
|
||||
|
||||
boolean ignoreScanDistribution = scanNodes.stream().filter(scanNode -> {
|
||||
boolean ignoreStorageDataDistribution = scanNodes.stream().filter(scanNode -> {
|
||||
return scanNodeIds.contains(scanNode.getId().asInt());
|
||||
}).allMatch(node -> node.ignoreScanDistribution(context)) && useNereids;
|
||||
}).allMatch(node -> node.ignoreStorageDataDistribution(context)) && useNereids;
|
||||
|
||||
// 1. count each node in one fragment should scan how many tablet, gather them in one list
|
||||
Map<TNetworkAddress, List<Pair<Integer, Map<Integer, List<TScanRangeParams>>>>> addressToScanRanges
|
||||
@ -2868,7 +2870,7 @@ public class Coordinator implements CoordInterface {
|
||||
Map<Integer, List<TScanRangeParams>> range
|
||||
= findOrInsert(assignment, addressScanRange.getKey(), new HashMap<>());
|
||||
|
||||
if (ignoreScanDistribution) {
|
||||
if (ignoreStorageDataDistribution) {
|
||||
FInstanceExecParam instanceParam = new FInstanceExecParam(
|
||||
null, addressScanRange.getKey(), 0, params);
|
||||
|
||||
@ -2924,8 +2926,8 @@ public class Coordinator implements CoordInterface {
|
||||
}
|
||||
}
|
||||
}
|
||||
params.parallelTasksNum = ignoreScanDistribution ? 1 : params.instanceExecParams.size();
|
||||
params.ignoreDataDistribution = ignoreScanDistribution;
|
||||
params.parallelTasksNum = ignoreStorageDataDistribution ? 1 : params.instanceExecParams.size();
|
||||
params.ignoreDataDistribution = ignoreStorageDataDistribution;
|
||||
}
|
||||
|
||||
private final Map<PlanFragmentId, BucketSeqToScanRange> fragmentIdTobucketSeqToScanRangeMap = Maps.newHashMap();
|
||||
|
||||
@ -219,7 +219,8 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public static final String ENABLE_PIPELINE_X_ENGINE = "enable_pipeline_x_engine";
|
||||
|
||||
public static final String ENABLE_SHARED_SCAN = "enable_shared_scan";
|
||||
public static final String IGNORE_SCAN_DISTRIBUTION = "ignore_scan_distribution";
|
||||
|
||||
public static final String IGNORE_STORAGE_DATA_DISTRIBUTION = "ignore_storage_data_distribution";
|
||||
|
||||
public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle";
|
||||
|
||||
@ -785,9 +786,9 @@ public class SessionVariable implements Serializable, Writable {
|
||||
needForward = true)
|
||||
private boolean enableSharedScan = false;
|
||||
|
||||
@VariableMgr.VarAttr(name = IGNORE_SCAN_DISTRIBUTION, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
|
||||
needForward = true)
|
||||
private boolean ignoreScanDistribution = false;
|
||||
@VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy = false,
|
||||
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
|
||||
private boolean ignoreStorageDataDistribution = false;
|
||||
|
||||
@VariableMgr.VarAttr(
|
||||
name = ENABLE_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL,
|
||||
@ -3173,11 +3174,11 @@ public class SessionVariable implements Serializable, Writable {
|
||||
return materializedViewRewriteEnableContainForeignTable;
|
||||
}
|
||||
|
||||
public boolean isIgnoreScanDistribution() {
|
||||
return ignoreScanDistribution && getEnablePipelineXEngine() && enableLocalShuffle;
|
||||
public boolean isIgnoreStorageDataDistribution() {
|
||||
return ignoreStorageDataDistribution && getEnablePipelineXEngine() && enableLocalShuffle;
|
||||
}
|
||||
|
||||
public void setIgnoreScanDistribution(boolean ignoreScanDistribution) {
|
||||
this.ignoreScanDistribution = ignoreScanDistribution;
|
||||
public void setIgnoreStorageDataDistribution(boolean ignoreStorageDataDistribution) {
|
||||
this.ignoreStorageDataDistribution = ignoreStorageDataDistribution;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user