[feature](nereids)prune runtime filter (tpch part) #19312

A rf is effective if it could filter target data.
In this pr, a rf is effective if any one of following conditions is satisfied:

A filter is applied on rf src, like T.A =1
A effective rf applied on this rf's src,
denote X as src and target insertsection range. src.ndv with respect to X is smaller than target.ndv
explaination of condition 2
Supplier join Nation on s_nationkey = n_nationkey
join Region on n_regionkey = r_regionkey
RF(nation->supplier) is effective because nation is filtered by an effective rf: RF(region->nation)
This commit is contained in:
minghong
2023-09-13 20:12:08 +08:00
committed by GitHub
parent db9ed626da
commit dad671af8e
23 changed files with 1758 additions and 16 deletions

View File

@ -56,6 +56,7 @@ import java.util.Set;
* runtime filter context used at post process and translation.
*/
public class RuntimeFilterContext {
public List<RuntimeFilter> prunedRF = Lists.newArrayList();
private final IdGenerator<RuntimeFilterId> generator = RuntimeFilterId.createGenerator();
@ -153,8 +154,11 @@ public class RuntimeFilterContext {
if (filters != null) {
Iterator<RuntimeFilter> iter = filters.iterator();
while (iter.hasNext()) {
if (iter.next().getBuilderNode().equals(builderNode)) {
RuntimeFilter rf = iter.next();
if (rf.getBuilderNode().equals(builderNode)) {
builderNode.getRuntimeFilters().remove(rf);
iter.remove();
prunedRF.add(rf);
}
}
}

View File

@ -27,7 +27,6 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
@ -54,17 +53,6 @@ import java.util.Set;
*/
public class RuntimeFilterPruner extends PlanPostProcessor {
// *******************************
// Physical plans
// *******************************
@Override
public PhysicalHashAggregate visitPhysicalHashAggregate(
PhysicalHashAggregate<? extends Plan> agg, CascadesContext context) {
agg.child().accept(this, context);
context.getRuntimeFilterContext().addEffectiveSrcNode(agg);
return agg;
}
@Override
public PhysicalQuickSort visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sort, CascadesContext context) {
sort.child().accept(this, context);
@ -165,7 +153,9 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
/**
* consider L join R on L.a=R.b
* runtime-filter: L.a<-R.b is effective,
* if R.b.selectivity<1 or b is partly covered by a
* if rf could reduce tuples of L,
* 1. some L.a distinctive value are not covered by R.b, or
* 2. if there is a effective RF applied on R
*
* TODO: min-max
* @param equalTo join condition
@ -199,7 +189,7 @@ public class RuntimeFilterPruner extends PlanPostProcessor {
if (probeColumnStat.isUnKnown || buildColumnStat.isUnKnown) {
return true;
}
return probeColumnStat.notEnclosed(buildColumnStat)
|| buildColumnStat.ndv < probeColumnStat.ndv * 0.95;
double buildNdvInProbeRange = buildColumnStat.ndvIntersection(probeColumnStat);
return probeColumnStat.ndv > buildNdvInProbeRange * (1 + ColumnStatistic.STATS_ERROR);
}
}

View File

@ -38,6 +38,8 @@ import java.util.Set;
public class ColumnStatistic {
public static final double STATS_ERROR = 0.1D;
public static final StatsType NDV = StatsType.NDV;
public static final StatsType AVG_SIZE = StatsType.AVG_SIZE;
public static final StatsType MAX_SIZE = StatsType.MAX_SIZE;