[refactor](Nereids) let topn runtime filter as PhysicalTopN's attr (#22745)

The original implement use MutableMap on PhysicalTopN.
It is easy to lose if we rewrite the plan after this processor.
The new implement use attr to indict whether use topn runtime filter
This commit is contained in:
morrySnow
2023-08-09 12:13:21 +08:00
committed by GitHub
parent 7890e464ee
commit a8d690272f
5 changed files with 71 additions and 42 deletions

View File

@ -1698,7 +1698,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context);
sortNode.setOffset(topN.getOffset());
sortNode.setLimit(topN.getLimit());
if (topN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()) {
if (topN.isEnableRuntimeFilter()) {
sortNode.setUseTopnOpt(true);
PlanNode child = sortNode.getChild(0);
Preconditions.checkArgument(child instanceof OlapScanNode,

View File

@ -28,6 +28,8 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTop
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ImmutableList;
/**
* topN opt
* refer to:
@ -39,7 +41,29 @@ public class TopNScanOpt extends PlanPostProcessor {
@Override
public PhysicalTopN<? extends Plan> visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
topN.child().accept(this, ctx);
Plan child = topN.child().accept(this, ctx);
topN = rewriteTopN(topN);
if (child != topN.child()) {
topN.withChildren(child);
}
return topN;
}
@Override
public Plan visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> topN,
CascadesContext context) {
Plan child = topN.child().accept(this, context);
if (child != topN.child()) {
topN = topN.withChildren(ImmutableList.of(child));
}
PhysicalTopN<? extends Plan> rewrittenTopN = rewriteTopN(topN.getPhysicalTopN());
if (topN.getPhysicalTopN() != rewrittenTopN) {
topN = topN.withPhysicalTopN(rewrittenTopN);
}
return topN;
}
private PhysicalTopN<? extends Plan> rewriteTopN(PhysicalTopN<? extends Plan> topN) {
Plan child = topN.child();
if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
return topN;
@ -79,18 +103,12 @@ public class TopNScanOpt extends PlanPostProcessor {
olapScan = (OlapScan) child;
if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true);
return topN.withEnableRuntimeFilter(true);
}
return topN;
}
@Override
public Plan visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> topN,
CascadesContext context) {
return topN.withPhysicalTopN(visitPhysicalTopN(topN.getPhysicalTopN(), context));
}
private long getTopNOptLimitThreshold() {
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
return ConnectContext.get().getSessionVariable().topnOptLimitThreshold;

View File

@ -91,13 +91,13 @@ public class PhysicalDeferMaterializeTopN<CHILD_TYPE extends Plan>
return physicalTopN.getLimit();
}
public Plan withPhysicalTopN(PhysicalTopN<? extends Plan> physicalTopN) {
public PhysicalDeferMaterializeTopN<? extends Plan> withPhysicalTopN(PhysicalTopN<? extends Plan> physicalTopN) {
return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot, groupExpression,
getLogicalProperties(), physicalProperties, statistics, physicalTopN.child());
}
@Override
public Plan withChildren(List<Plan> children) {
public PhysicalDeferMaterializeTopN<? extends Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"PhysicalDeferMaterializeTopN's children size must be 1, but real is %s", children.size());
return new PhysicalDeferMaterializeTopN<>(physicalTopN.withChildren(ImmutableList.of(children.get(0))),
@ -111,13 +111,14 @@ public class PhysicalDeferMaterializeTopN<CHILD_TYPE extends Plan>
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
public PhysicalDeferMaterializeTopN<? extends Plan> withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
public PhysicalDeferMaterializeTopN<? extends Plan> withGroupExprLogicalPropChildren(
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"PhysicalDeferMaterializeTopN's children size must be 1, but real is %s", children.size());
@ -127,7 +128,8 @@ public class PhysicalDeferMaterializeTopN<CHILD_TYPE extends Plan>
}
@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
public PhysicalDeferMaterializeTopN<? extends Plan> withPhysicalPropertiesAndStats(
PhysicalProperties physicalProperties, Statistics statistics) {
return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}
@ -138,7 +140,7 @@ public class PhysicalDeferMaterializeTopN<CHILD_TYPE extends Plan>
}
@Override
public PhysicalDeferMaterializeTopN<CHILD_TYPE> resetLogicalProperties() {
public PhysicalDeferMaterializeTopN<? extends Plan> resetLogicalProperties() {
return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot,
groupExpression, null, physicalProperties, statistics, child());
}

View File

@ -41,37 +41,38 @@ import java.util.Optional;
*/
public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<CHILD_TYPE> implements TopN {
public static final String TOPN_RUNTIME_FILTER = "topn_runtime_filter";
private final long limit;
private final long offset;
private final boolean enableRuntimeFilter;
public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
SortPhase phase, LogicalProperties logicalProperties, CHILD_TYPE child) {
this(orderKeys, limit, offset, phase, Optional.empty(), logicalProperties, child);
this(orderKeys, limit, offset, phase, false, Optional.empty(), logicalProperties, child);
}
/**
* Constructor of PhysicalHashJoinNode.
*/
public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
SortPhase phase, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
CHILD_TYPE child) {
this(orderKeys, limit, offset, phase, groupExpression, logicalProperties,
null, null, child);
SortPhase phase, boolean enableRuntimeFilter,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) {
this(orderKeys, limit, offset, phase, enableRuntimeFilter,
groupExpression, logicalProperties, null, null, child);
}
/**
* Constructor of PhysicalHashJoinNode.
*/
public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
SortPhase phase, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
SortPhase phase, boolean enableRuntimeFilter,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) {
super(PlanType.PHYSICAL_TOP_N, orderKeys, phase, groupExpression, logicalProperties, physicalProperties,
statistics, child);
Objects.requireNonNull(orderKeys, "orderKeys should not be null in PhysicalTopN.");
this.limit = limit;
this.offset = offset;
this.enableRuntimeFilter = enableRuntimeFilter;
}
public long getLimit() {
@ -82,6 +83,10 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
return offset;
}
public boolean isEnableRuntimeFilter() {
return enableRuntimeFilter;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -94,12 +99,12 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
return false;
}
PhysicalTopN<?> that = (PhysicalTopN<?>) o;
return limit == that.limit && offset == that.offset;
return limit == that.limit && offset == that.offset && enableRuntimeFilter == that.enableRuntimeFilter;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), limit, offset);
return Objects.hash(super.hashCode(), limit, offset, enableRuntimeFilter);
}
@Override
@ -107,33 +112,39 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
return visitor.visitPhysicalTopN(this, context);
}
public PhysicalTopN<Plan> withEnableRuntimeFilter(boolean enableRuntimeFilter) {
return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter,
groupExpression, getLogicalProperties(), child());
}
@Override
public PhysicalTopN<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"PhysicalTopN's children size must be 1, but real is %s", children.size());
return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression,
return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter, groupExpression,
getLogicalProperties(), physicalProperties, statistics, children.get(0));
}
@Override
public PhysicalTopN<CHILD_TYPE> withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression, getLogicalProperties(), child());
return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter,
groupExpression, getLogicalProperties(), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
public PhysicalTopN<Plan> withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"PhysicalTopN's children size must be 1, but real is %s", children.size());
return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression, logicalProperties.get(),
children.get(0));
return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter,
groupExpression, logicalProperties.get(), children.get(0));
}
@Override
public PhysicalTopN<CHILD_TYPE> withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
Statistics statistics) {
return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression,
getLogicalProperties(), physicalProperties, statistics, child());
return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}
@Override
@ -142,7 +153,8 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
"limit", limit,
"offset", offset,
"orderKeys", orderKeys,
"phase", phase.toString()
"phase", phase.toString(),
"enableRuntimeFilter", enableRuntimeFilter
);
}
@ -152,8 +164,8 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
}
@Override
public PhysicalTopN<CHILD_TYPE> resetLogicalProperties() {
return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression,
public PhysicalTopN<Plan> resetLogicalProperties() {
return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter, groupExpression,
null, physicalProperties, statistics, child());
}

View File

@ -22,7 +22,6 @@ import org.apache.doris.nereids.processor.post.PlanPostProcessors;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.nereids.util.PlanChecker;
import org.junit.jupiter.api.Assertions;
@ -41,12 +40,11 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
.rewrite()
.implement();
PhysicalPlan plan = checker.getPhysicalPlan();
new PlanPostProcessors(checker.getCascadesContext()).process(plan);
plan = new PlanPostProcessors(checker.getCascadesContext()).process(plan);
Assertions.assertTrue(plan.children().get(0).child(0) instanceof PhysicalDeferMaterializeTopN);
PhysicalDeferMaterializeTopN<? extends Plan> localTopN
= (PhysicalDeferMaterializeTopN<? extends Plan>) plan.child(0).child(0);
Assertions.assertTrue(localTopN.getPhysicalTopN()
.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
Assertions.assertTrue(localTopN.getPhysicalTopN().isEnableRuntimeFilter());
}
// topn rf do not apply on string-like and float column
@ -57,11 +55,10 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
.rewrite()
.implement();
PhysicalPlan plan = checker.getPhysicalPlan();
new PlanPostProcessors(checker.getCascadesContext()).process(plan);
plan = new PlanPostProcessors(checker.getCascadesContext()).process(plan);
Assertions.assertTrue(plan.children().get(0).child(0) instanceof PhysicalDeferMaterializeTopN);
PhysicalDeferMaterializeTopN<? extends Plan> localTopN
= (PhysicalDeferMaterializeTopN<? extends Plan>) plan.child(0).child(0);
Assertions.assertFalse(localTopN.getPhysicalTopN()
.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
Assertions.assertFalse(localTopN.getPhysicalTopN().isEnableRuntimeFilter());
}
}