[enhancement](nereids)support topN opt in nereids (#17741)
1. support topN opt in nereids 2. pushdown limit->proj->sort
This commit is contained in:
@ -877,6 +877,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context);
|
||||
sortNode.setOffset(topN.getOffset());
|
||||
sortNode.setLimit(topN.getLimit());
|
||||
if (topN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()) {
|
||||
sortNode.setUseTopnOpt(true);
|
||||
PlanNode child = sortNode.getChild(0);
|
||||
Preconditions.checkArgument(child instanceof OlapScanNode,
|
||||
"topN opt expect OlapScanNode, but we get " + child);
|
||||
((OlapScanNode) child).setUseTopnOpt(true);
|
||||
}
|
||||
addPlanRoot(currentFragment, sortNode, topN);
|
||||
} else {
|
||||
// For mergeSort, we need to push sortInfo to exchangeNode
|
||||
|
||||
@ -173,10 +173,8 @@ public class PlanTranslatorContext {
|
||||
@Nullable TableIf table) {
|
||||
SlotDescriptor slotDescriptor = this.addSlotDesc(tupleDesc);
|
||||
// Only the SlotDesc that in the tuple generated for scan node would have corresponding column.
|
||||
if (table != null) {
|
||||
Optional<Column> column = slotReference.getColumn();
|
||||
column.ifPresent(slotDescriptor::setColumn);
|
||||
}
|
||||
Optional<Column> column = slotReference.getColumn();
|
||||
column.ifPresent(slotDescriptor::setColumn);
|
||||
slotDescriptor.setType(slotReference.getDataType().toCatalogDataType());
|
||||
slotDescriptor.setIsMaterialized(true);
|
||||
SlotRef slotRef;
|
||||
|
||||
@ -68,6 +68,7 @@ public class PlanPostProcessors {
|
||||
}
|
||||
}
|
||||
builder.add(new Validator());
|
||||
builder.add(new TopNScanOpt());
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,80 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.processor.post;
|
||||
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.SortPhase;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.Filter;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.Project;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
/**
|
||||
* topN opt
|
||||
* refer to:
|
||||
* https://github.com/apache/doris/pull/15558
|
||||
* https://github.com/apache/doris/pull/15663
|
||||
*/
|
||||
|
||||
public class TopNScanOpt extends PlanPostProcessor {
|
||||
@Override
|
||||
public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
|
||||
topN.child().accept(this, ctx);
|
||||
Plan child = topN.child();
|
||||
if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
|
||||
return topN;
|
||||
}
|
||||
long threshold = getTopNOptLimitThreshold();
|
||||
if (threshold == -1 || topN.getLimit() > threshold) {
|
||||
return topN;
|
||||
}
|
||||
if (topN.getOrderKeys().isEmpty()) {
|
||||
return topN;
|
||||
}
|
||||
Expression firstKey = topN.getOrderKeys().get(0).getExpr();
|
||||
if (!(firstKey instanceof SlotReference)) {
|
||||
return topN;
|
||||
}
|
||||
if (firstKey.getDataType().isStringLikeType()
|
||||
|| firstKey.getDataType().isFloatType()
|
||||
|| firstKey.getDataType().isDoubleType()) {
|
||||
return topN;
|
||||
}
|
||||
while (child != null && (child instanceof Project || child instanceof Filter)) {
|
||||
child = child.child(0);
|
||||
}
|
||||
if (child instanceof PhysicalOlapScan) {
|
||||
PhysicalOlapScan scan = (PhysicalOlapScan) child;
|
||||
if (scan.getTable().isDupKeysOrMergeOnWrite()) {
|
||||
topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true);
|
||||
}
|
||||
}
|
||||
return topN;
|
||||
}
|
||||
|
||||
private long getTopNOptLimitThreshold() {
|
||||
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
|
||||
return ConnectContext.get().getSessionVariable().topnOptLimitThreshold;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
@ -33,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@ -94,6 +95,17 @@ public class PushdownLimit implements RewriteRuleFactory {
|
||||
sort.child(0));
|
||||
return topN;
|
||||
}).toRule(RuleType.PUSH_LIMIT_INTO_SORT),
|
||||
//limit->proj->sort ==> proj->topN
|
||||
logicalLimit(logicalProject(logicalSort()))
|
||||
.then(limit -> {
|
||||
LogicalProject project = limit.child();
|
||||
LogicalSort sort = limit.child().child();
|
||||
LogicalTopN topN = new LogicalTopN(sort.getOrderKeys(),
|
||||
limit.getLimit(),
|
||||
limit.getOffset(),
|
||||
sort.child(0));
|
||||
return project.withChildren(Lists.newArrayList(topN));
|
||||
}).toRule(RuleType.PUSH_LIMIT_INTO_SORT),
|
||||
logicalLimit(logicalOneRowRelation())
|
||||
.then(limit -> limit.getLimit() > 0 && limit.getOffset() == 0
|
||||
? limit.child() : new LogicalEmptyRelation(limit.child().getOutput()))
|
||||
|
||||
@ -54,13 +54,8 @@ public class PhysicalOlapScan extends PhysicalRelation implements OlapScan {
|
||||
public PhysicalOlapScan(ObjectId id, OlapTable olapTable, List<String> qualifier, long selectedIndexId,
|
||||
List<Long> selectedTabletIds, List<Long> selectedPartitionIds, DistributionSpec distributionSpec,
|
||||
PreAggStatus preAggStatus, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties) {
|
||||
super(id, PlanType.PHYSICAL_OLAP_SCAN, qualifier, groupExpression, logicalProperties);
|
||||
this.olapTable = olapTable;
|
||||
this.selectedIndexId = selectedIndexId;
|
||||
this.selectedTabletIds = ImmutableList.copyOf(selectedTabletIds);
|
||||
this.selectedPartitionIds = ImmutableList.copyOf(selectedPartitionIds);
|
||||
this.distributionSpec = distributionSpec;
|
||||
this.preAggStatus = preAggStatus;
|
||||
this(id, olapTable, qualifier, selectedIndexId, selectedTabletIds, selectedPartitionIds, distributionSpec,
|
||||
preAggStatus, groupExpression, logicalProperties, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -39,7 +39,7 @@ import java.util.Optional;
|
||||
* Physical top-N plan.
|
||||
*/
|
||||
public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<CHILD_TYPE> implements TopN {
|
||||
|
||||
public static String TOPN_RUNTIME_FILTER = "topn_runtime_filter";
|
||||
private final long limit;
|
||||
private final long offset;
|
||||
|
||||
@ -54,10 +54,8 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
|
||||
public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
|
||||
SortPhase phase, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
|
||||
CHILD_TYPE child) {
|
||||
super(PlanType.PHYSICAL_TOP_N, orderKeys, phase, groupExpression, logicalProperties, child);
|
||||
Objects.requireNonNull(orderKeys, "orderKeys should not be null in PhysicalTopN.");
|
||||
this.limit = limit;
|
||||
this.offset = offset;
|
||||
this(orderKeys, limit, offset, phase, groupExpression, logicalProperties,
|
||||
null, null, child);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -0,0 +1,61 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.postprocess;
|
||||
|
||||
import org.apache.doris.nereids.datasets.ssb.SSBTestBase;
|
||||
import org.apache.doris.nereids.processor.post.PlanPostProcessors;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
|
||||
import org.apache.doris.nereids.util.PlanChecker;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
public class TopNRuntimeFilterTest extends SSBTestBase {
|
||||
@Override
|
||||
public void runBeforeAll() throws Exception {
|
||||
super.runBeforeAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUseTopNRf() {
|
||||
String sql = "select * from customer order by c_custkey limit 5";
|
||||
PlanChecker checker = PlanChecker.from(connectContext).analyze(sql)
|
||||
.rewrite()
|
||||
.implement();
|
||||
PhysicalPlan plan = checker.getPhysicalPlan();
|
||||
new PlanPostProcessors(checker.getCascadesContext()).process(plan);
|
||||
Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN);
|
||||
PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0);
|
||||
Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
|
||||
}
|
||||
|
||||
// topn rf do not apply on string-like and float column
|
||||
@Test
|
||||
public void testNotUseTopNRf() {
|
||||
String sql = "select * from customer order by c_name limit 5";
|
||||
PlanChecker checker = PlanChecker.from(connectContext).analyze(sql)
|
||||
.rewrite()
|
||||
.implement();
|
||||
PhysicalPlan plan = checker.getPhysicalPlan();
|
||||
new PlanPostProcessors(checker.getCascadesContext()).process(plan);
|
||||
Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN);
|
||||
PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0);
|
||||
Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user