[nereids](topn-filter) support multi-topn filter (FE part) (#31485)

support multi-topn-filter
This commit is contained in:
minghong
2024-03-11 10:43:30 +08:00
committed by yiguolei
parent b0b7161ad0
commit ab21d85e8c
13 changed files with 401 additions and 119 deletions

View File

@ -42,6 +42,7 @@ import org.apache.doris.nereids.jobs.scheduler.SimpleJobScheduler;
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.memo.Memo;
import org.apache.doris.nereids.processor.post.RuntimeFilterContext;
import org.apache.doris.nereids.processor.post.TopnFilterContext;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.RuleFactory;
import org.apache.doris.nereids.rules.RuleSet;
@ -107,6 +108,7 @@ public class CascadesContext implements ScheduleContext {
// subqueryExprIsAnalyzed: whether the subquery has been analyzed.
private final Map<SubqueryExpr, Boolean> subqueryExprIsAnalyzed;
private final RuntimeFilterContext runtimeFilterContext;
private final TopnFilterContext topnFilterContext = new TopnFilterContext();
private Optional<Scope> outerScope = Optional.empty();
private Map<Long, TableIf> tables = null;
@ -280,6 +282,10 @@ public class CascadesContext implements ScheduleContext {
return runtimeFilterContext;
}
public TopnFilterContext getTopnFilterContext() {
return topnFilterContext;
}
public void setCurrentJobContext(JobContext currentJobContext) {
this.currentJobContext = currentJobContext;
}

View File

@ -207,6 +207,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
@ -724,6 +725,10 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
)
);
olapScanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(olapScan.getRelationId()));
if (context.getTopnFilterContext().isTopnFilterTarget(olapScan)) {
olapScanNode.setUseTopnOpt(true);
context.getTopnFilterContext().addLegacyTarget(olapScan, olapScanNode);
}
// TODO: we need to remove all finalizeForNereids
olapScanNode.finalizeForNereids();
// Create PlanFragment
@ -747,6 +752,10 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, PlanTranslatorContext context) {
PlanFragment planFragment = visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context);
OlapScanNode olapScanNode = (OlapScanNode) planFragment.getPlanRoot();
if (context.getTopnFilterContext().isTopnFilterTarget(deferMaterializeOlapScan)) {
olapScanNode.setUseTopnOpt(true);
context.getTopnFilterContext().addLegacyTarget(deferMaterializeOlapScan, olapScanNode);
}
TupleDescriptor tupleDescriptor = context.getTupleDesc(olapScanNode.getTupleId());
for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) {
if (deferMaterializeOlapScan.getDeferMaterializeSlotIds()
@ -2009,20 +2018,23 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanTranslatorContext context) {
PlanFragment inputFragment = topN.child(0).accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(topN.child(0));
// 2. According to the type of sort, generate physical plan
if (!topN.getSortPhase().isMerge()) {
// For localSort or Gather->Sort, we just need to add TopNNode
SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context);
sortNode.setOffset(topN.getOffset());
sortNode.setLimit(topN.getLimit());
if (topN.isEnableRuntimeFilter()) {
if (context.getTopnFilterContext().isTopnFilterSource(topN)) {
sortNode.setUseTopnOpt(true);
PlanNode child = sortNode.getChild(0);
Preconditions.checkArgument(child instanceof OlapScanNode,
"topN opt expect OlapScanNode, but we get " + child);
OlapScanNode scanNode = ((OlapScanNode) child);
scanNode.setUseTopnOpt(true);
context.getTopnFilterContext().getTargets(topN).forEach(
olapScan -> {
Optional<OlapScanNode> legacyScan =
context.getTopnFilterContext().getLegacyScanNode(olapScan);
Preconditions.checkState(legacyScan.isPresent(),
"cannot find OlapScanNode for topn filter");
legacyScan.get().addTopnFilterSortNode(sortNode);
}
);
}
// push sort to scan opt
if (sortNode.getChild(0) instanceof OlapScanNode) {
@ -2067,12 +2079,23 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
@Override
public PlanFragment visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> topN,
PlanTranslatorContext context) {
PlanFragment planFragment = visitPhysicalTopN(topN.getPhysicalTopN(), context);
if (planFragment.getPlanRoot() instanceof SortNode) {
SortNode sortNode = (SortNode) planFragment.getPlanRoot();
sortNode.setUseTwoPhaseReadOpt(true);
sortNode.getSortInfo().setUseTwoPhaseRead();
if (context.getTopnFilterContext().isTopnFilterSource(topN)) {
sortNode.setUseTopnOpt(true);
context.getTopnFilterContext().getTargets(topN).forEach(
olapScan -> {
Optional<OlapScanNode> legacyScan =
context.getTopnFilterContext().getLegacyScanNode(olapScan);
Preconditions.checkState(legacyScan.isPresent(),
"cannot find OlapScanNode for topn filter");
legacyScan.get().addTopnFilterSortNode(sortNode);
}
);
}
TupleDescriptor tupleDescriptor = sortNode.getSortInfo().getSortTupleDescriptor();
for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) {
if (topN.getDeferMaterializeSlotIds()

View File

@ -29,6 +29,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.processor.post.TopnFilterContext;
import org.apache.doris.nereids.trees.expressions.CTEId;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.SlotReference;
@ -70,7 +71,7 @@ public class PlanTranslatorContext {
private final DescriptorTable descTable = new DescriptorTable();
private final RuntimeFilterTranslator translator;
private final TopnFilterContext topnFilterContext;
/**
* index from Nereids' slot to legacy slot.
*/
@ -115,12 +116,14 @@ public class PlanTranslatorContext {
public PlanTranslatorContext(CascadesContext ctx) {
this.connectContext = ctx.getConnectContext();
this.translator = new RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
this.topnFilterContext = ctx.getTopnFilterContext();
}
@VisibleForTesting
public PlanTranslatorContext() {
this.connectContext = null;
this.translator = null;
this.topnFilterContext = new TopnFilterContext();
}
/**
@ -187,6 +190,10 @@ public class PlanTranslatorContext {
return Optional.ofNullable(translator);
}
public TopnFilterContext getTopnFilterContext() {
return topnFilterContext;
}
public PlanFragmentId nextFragmentId() {
return fragmentIdGenerator.getNextId();
}

View File

@ -19,103 +19,57 @@ package org.apache.doris.nereids.processor.post;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.algebra.Filter;
import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.algebra.TopN;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
import org.apache.doris.qe.ConnectContext;
import java.util.Optional;
/**
* topN opt
* refer to:
* <a href="https://github.com/apache/doris/pull/15558">...</a>
* <a href="https://github.com/apache/doris/pull/15663">...</a>
*
* // only support simple case: select ... from tbl [where ...] order by ... limit ...
* // [deprecated] only support simple case: select ... from tbl [where ...] order by ... limit ...
*/
public class TopNScanOpt extends PlanPostProcessor {
@Override
public Plan visit(Plan plan, CascadesContext context) {
return plan;
}
@Override
public Plan visitPhysicalSink(PhysicalSink<? extends Plan> physicalSink, CascadesContext context) {
if (physicalSink.child() instanceof TopN) {
return super.visit(physicalSink, context);
} else if (physicalSink.child() instanceof Project && physicalSink.child().child(0) instanceof TopN) {
PhysicalTopN<?> oldTopN = (PhysicalTopN<?>) physicalSink.child().child(0);
PhysicalTopN<?> newTopN = (PhysicalTopN<?>) oldTopN.accept(this, context);
if (newTopN == oldTopN) {
return physicalSink;
} else {
return physicalSink.withChildren(physicalSink.child().withChildren(newTopN));
}
}
return physicalSink;
}
@Override
public Plan visitPhysicalDistribute(PhysicalDistribute<? extends Plan> distribute, CascadesContext context) {
if (distribute.child() instanceof TopN && distribute.child() instanceof AbstractPhysicalSort
&& ((AbstractPhysicalSort<?>) distribute.child()).getSortPhase() == SortPhase.LOCAL_SORT) {
return super.visit(distribute, context);
}
return distribute;
}
@Override
public PhysicalTopN<? extends Plan> visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
if (topN.getSortPhase() == SortPhase.LOCAL_SORT) {
Plan child = topN.child();
topN = rewriteTopN(topN);
if (child != topN.child()) {
topN = ((PhysicalTopN<? extends Plan>) topN.withChildren(child)).copyStatsAndGroupIdFrom(topN);
}
return topN;
} else if (topN.getSortPhase() == SortPhase.MERGE_SORT) {
return (PhysicalTopN<? extends Plan>) super.visit(topN, ctx);
}
Optional<OlapScan> scanOpt = findScanForTopnFilter(topN);
scanOpt.ifPresent(scan -> ctx.getTopnFilterContext().addTopnFilter(topN, scan));
topN.child().accept(this, ctx);
return topN;
}
@Override
public Plan visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> topN,
CascadesContext context) {
if (topN.getSortPhase() == SortPhase.LOCAL_SORT) {
PhysicalTopN<? extends Plan> rewrittenTopN = rewriteTopN(topN.getPhysicalTopN());
if (topN.getPhysicalTopN() != rewrittenTopN) {
topN = topN.withPhysicalTopN(rewrittenTopN).copyStatsAndGroupIdFrom(topN);
}
return topN;
} else if (topN.getSortPhase() == SortPhase.MERGE_SORT) {
return super.visit(topN, context);
}
Optional<OlapScan> scanOpt = findScanForTopnFilter(topN.getPhysicalTopN());
scanOpt.ifPresent(scan -> context.getTopnFilterContext().addTopnFilter(topN, scan));
topN.child().accept(this, context);
return topN;
}
private PhysicalTopN<? extends Plan> rewriteTopN(PhysicalTopN<? extends Plan> topN) {
Plan child = topN.child();
private Optional<OlapScan> findScanForTopnFilter(PhysicalTopN<? extends Plan> topN) {
if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
return topN;
return Optional.empty();
}
if (topN.getOrderKeys().isEmpty()) {
return topN;
return Optional.empty();
}
// topn opt
long topNOptLimitThreshold = getTopNOptLimitThreshold();
if (topNOptLimitThreshold == -1 || topN.getLimit() > topNOptLimitThreshold) {
return topN;
return Optional.empty();
}
// if firstKey's column is not present, it means the firstKey is not an original column from scan node
// for example: "select cast(k1 as INT) as id from tbl1 order by id limit 2;" the firstKey "id" is
@ -125,27 +79,44 @@ public class TopNScanOpt extends PlanPostProcessor {
// see Alias::toSlot() method to get how column info is passed around by alias of slotReference
Expression firstKey = topN.getOrderKeys().get(0).getExpr();
if (!firstKey.isColumnFromTable()) {
return topN;
return Optional.empty();
}
if (firstKey.getDataType().isFloatType()
|| firstKey.getDataType().isDoubleType()) {
return topN;
return Optional.empty();
}
OlapScan olapScan;
while (child instanceof Project || child instanceof Filter) {
child = child.child(0);
if (! (firstKey instanceof SlotReference)) {
return Optional.empty();
}
if (!(child instanceof OlapScan)) {
return topN;
}
olapScan = (OlapScan) child;
if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
return topN.withEnableRuntimeFilter(true).copyStatsAndGroupIdFrom(topN);
OlapScan olapScan = findScanNodeBySlotReference(topN, (SlotReference) firstKey);
if (olapScan != null
&& olapScan.getTable().isDupKeysOrMergeOnWrite()
&& olapScan instanceof PhysicalCatalogRelation) {
return Optional.of(olapScan);
}
return topN;
return Optional.empty();
}
private OlapScan findScanNodeBySlotReference(Plan root, SlotReference slot) {
OlapScan target = null;
if (root instanceof OlapScan && root.getOutputSet().contains(slot)) {
return (OlapScan) root;
} else {
if (! root.children().isEmpty()) {
// for join and intersect, push topn-filter to their left child.
// TODO for union, topn-filter can be pushed down to all of its children.
Plan child = root.child(0);
if (!(child instanceof PhysicalWindow) && child.getOutputSet().contains(slot)) {
target = findScanNodeBySlotReference(child, slot);
if (target != null) {
return target;
}
}
}
}
return target;
}
private long getTopNOptLimitThreshold() {

View File

@ -0,0 +1,93 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.processor.post;
import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
import org.apache.doris.nereids.trees.plans.algebra.TopN;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.SortNode;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* topN runtime filter context
*/
public class TopnFilterContext {
private final Map<TopN, List<OlapScan>> filters = Maps.newHashMap();
private final Set<TopN> sources = Sets.newHashSet();
private final Set<OlapScan> targets = Sets.newHashSet();
private final Map<OlapScan, OlapScanNode> legacyTargetsMap = Maps.newHashMap();
private final Map<TopN, SortNode> legacySourceMap = Maps.newHashMap();
/**
* add topN filter
*/
public void addTopnFilter(TopN topn, OlapScan scan) {
targets.add(scan);
sources.add(topn);
List<OlapScan> targets = filters.get(topn);
if (targets == null) {
filters.put(topn, Lists.newArrayList(scan));
} else {
targets.add(scan);
}
}
/**
* find the corresponding sortNode for topn filter
*/
public Optional<OlapScanNode> getLegacyScanNode(OlapScan scan) {
return legacyTargetsMap.keySet().contains(scan)
? Optional.of(legacyTargetsMap.get(scan))
: Optional.empty();
}
public Optional<SortNode> getLegacySortNode(TopN topn) {
return legacyTargetsMap.keySet().contains(topn)
? Optional.of(legacySourceMap.get(topn))
: Optional.empty();
}
public boolean isTopnFilterSource(TopN topn) {
return sources.contains(topn);
}
public boolean isTopnFilterTarget(OlapScan scan) {
return targets.contains(scan);
}
public void addLegacySource(TopN topn, SortNode sort) {
legacySourceMap.put(topn, sort);
}
public void addLegacyTarget(OlapScan olapScan, OlapScanNode legacy) {
legacyTargetsMap.put(olapScan, legacy);
}
public List<OlapScan> getTargets(TopN topn) {
return filters.get(topn);
}
}

View File

@ -43,36 +43,33 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
private final long limit;
private final long offset;
private final boolean enableRuntimeFilter;
public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
SortPhase phase, LogicalProperties logicalProperties, CHILD_TYPE child) {
this(orderKeys, limit, offset, phase, false, Optional.empty(), logicalProperties, child);
this(orderKeys, limit, offset, phase, Optional.empty(), logicalProperties, child);
}
/**
* Constructor of PhysicalHashJoinNode.
*/
public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
SortPhase phase, boolean enableRuntimeFilter,
SortPhase phase,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) {
this(orderKeys, limit, offset, phase, enableRuntimeFilter,
groupExpression, logicalProperties, null, null, child);
this(orderKeys, limit, offset, phase, groupExpression,
logicalProperties, null, null, child);
}
/**
* Constructor of PhysicalHashJoinNode.
*/
public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
SortPhase phase, boolean enableRuntimeFilter,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
SortPhase phase, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) {
super(PlanType.PHYSICAL_TOP_N, orderKeys, phase, groupExpression, logicalProperties, physicalProperties,
statistics, child);
Objects.requireNonNull(orderKeys, "orderKeys should not be null in PhysicalTopN.");
this.limit = limit;
this.offset = offset;
this.enableRuntimeFilter = enableRuntimeFilter;
}
public long getLimit() {
@ -83,10 +80,6 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
return offset;
}
public boolean isEnableRuntimeFilter() {
return enableRuntimeFilter;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -99,12 +92,12 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
return false;
}
PhysicalTopN<?> that = (PhysicalTopN<?>) o;
return limit == that.limit && offset == that.offset && enableRuntimeFilter == that.enableRuntimeFilter;
return limit == that.limit && offset == that.offset;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), limit, offset, enableRuntimeFilter);
return Objects.hash(super.hashCode(), limit, offset);
}
@Override
@ -112,22 +105,17 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
return visitor.visitPhysicalTopN(this, context);
}
public PhysicalTopN<Plan> withEnableRuntimeFilter(boolean enableRuntimeFilter) {
return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter,
groupExpression, getLogicalProperties(), child());
}
@Override
public PhysicalTopN<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"PhysicalTopN's children size must be 1, but real is %s", children.size());
return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter, groupExpression,
return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression,
getLogicalProperties(), physicalProperties, statistics, children.get(0));
}
@Override
public PhysicalTopN<CHILD_TYPE> withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter,
return new PhysicalTopN<>(orderKeys, limit, offset, phase,
groupExpression, getLogicalProperties(), child());
}
@ -136,14 +124,14 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"PhysicalTopN's children size must be 1, but real is %s", children.size());
return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter,
return new PhysicalTopN<>(orderKeys, limit, offset, phase,
groupExpression, logicalProperties.get(), children.get(0));
}
@Override
public PhysicalTopN<CHILD_TYPE> withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
Statistics statistics) {
return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter,
return new PhysicalTopN<>(orderKeys, limit, offset, phase,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}
@ -158,8 +146,7 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
"limit", limit,
"offset", offset,
"orderKeys", orderKeys,
"phase", phase.toString(),
"enableRuntimeFilter", enableRuntimeFilter
"phase", phase.toString()
);
}
@ -170,7 +157,7 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
@Override
public PhysicalTopN<Plan> resetLogicalProperties() {
return new PhysicalTopN<>(orderKeys, limit, offset, phase, enableRuntimeFilter, groupExpression,
return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression,
null, physicalProperties, statistics, child());
}

View File

@ -174,8 +174,11 @@ public class OlapScanNode extends ScanNode {
// It's limit for scanner instead of scanNode so we add a new limit.
private long sortLimit = -1;
// useTopnOpt is equivalent to !topnFilterSortNodes.isEmpty().
// keep this flag for compatibility.
private boolean useTopnOpt = false;
// support multi topn filter
private final List<SortNode> topnFilterSortNodes = Lists.newArrayList();
// List of tablets will be scanned by current olap_scan_node
private ArrayList<Long> scanTabletIds = Lists.newArrayList();
@ -1304,7 +1307,10 @@ public class OlapScanNode extends ScanNode {
output.append(prefix).append("SORT LIMIT: ").append(sortLimit).append("\n");
}
if (useTopnOpt) {
output.append(prefix).append("TOPN OPT\n");
String topnFilterSources = String.join(",",
topnFilterSortNodes.stream()
.map(node -> node.getId().asInt() + "").collect(Collectors.toList()));
output.append(prefix).append("TOPN OPT:").append(topnFilterSources).append("\n");
}
if (!conjuncts.isEmpty()) {
@ -1478,6 +1484,13 @@ public class OlapScanNode extends ScanNode {
msg.olap_scan_node.setSortLimit(sortLimit);
}
msg.olap_scan_node.setUseTopnOpt(useTopnOpt);
List<Integer> topnFilterSourceNodeIds = getTopnFilterSortNodes()
.stream()
.map(sortNode -> sortNode.getId().asInt())
.collect(Collectors.toList());
if (!topnFilterSourceNodeIds.isEmpty()) {
msg.olap_scan_node.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds);
}
msg.olap_scan_node.setKeyType(olapTable.getKeysType().toThrift());
msg.olap_scan_node.setTableName(olapTable.getName());
msg.olap_scan_node.setEnableUniqueKeyMergeOnWrite(olapTable.getEnableUniqueKeyMergeOnWrite());
@ -1750,4 +1763,12 @@ public class OlapScanNode extends ScanNode {
public int getScanRangeNum() {
return getScanTabletIds().size();
}
public void addTopnFilterSortNode(SortNode sortNode) {
topnFilterSortNodes.add(sortNode);
}
public List<SortNode> getTopnFilterSortNodes() {
return topnFilterSortNodes;
}
}

View File

@ -61,7 +61,7 @@ public class SortNode extends PlanNode {
List<Expr> resolvedTupleExprs;
private final SortInfo info;
private final boolean useTopN;
private boolean useTopnOpt;
private boolean useTopnOpt = false;
private boolean useTwoPhaseReadOpt;
// If mergeByexchange is set to true, the sort information is pushed to the

View File

@ -3702,7 +3702,10 @@ public class Coordinator implements CoordInterface {
int rate = Math.min(Config.query_colocate_join_memory_limit_penalty_factor, instanceExecParams.size());
memLimit = queryOptions.getMemLimit() / rate;
}
Set<Integer> topnFilterSources = scanNodes.stream()
.filter(scanNode -> scanNode instanceof OlapScanNode)
.flatMap(scanNode -> ((OlapScanNode) scanNode).getTopnFilterSortNodes().stream())
.map(sort -> sort.getId().asInt()).collect(Collectors.toSet());
Map<TNetworkAddress, TPipelineFragmentParams> res = new HashMap();
Map<TNetworkAddress, Integer> instanceIdx = new HashMap();
TPlanFragment fragmentThrift = fragment.toThrift();
@ -3771,6 +3774,12 @@ public class Coordinator implements CoordInterface {
localParams.setBackendNum(backendNum++);
localParams.setRuntimeFilterParams(new TRuntimeFilterParams());
localParams.runtime_filter_params.setRuntimeFilterMergeAddr(runtimeFilterMergeAddr);
if (!topnFilterSources.isEmpty()) {
// topn_filter_source_node_ids is used by nereids not by legacy planner.
// if there is no topnFilterSources, do not set it.
// topn_filter_source_node_ids=null means legacy planner
localParams.topn_filter_source_node_ids = Lists.newArrayList(topnFilterSources);
}
if (instanceExecParam.instanceId.equals(runtimeFilterMergeInstanceId)) {
Set<Integer> broadCastRf = assignedRuntimeFilters.stream().filter(RuntimeFilter::isBroadcast)
.map(r -> r.getFilterId().asInt()).collect(Collectors.toSet());