[enhancement](Nereids) two phase read for topn (#18829)
add two phase read topn opt, the legacy planner's PR are: - #15642 - #16460 - #16848 TODO: we forbid limit(sort(project(scan))) since be core when plan has a project on the scan. we need to remove this restirction after we fix be bug
This commit is contained in:
@ -68,7 +68,7 @@ public class SlotDescriptor {
|
||||
private boolean isAgg;
|
||||
private boolean isMultiRef;
|
||||
// If set to false, then such slots will be ignored during
|
||||
// materialize them.Used to optmize to read less data and less memory usage
|
||||
// materialize them.Used to optimize to read less data and less memory usage
|
||||
private boolean needMaterialize = true;
|
||||
|
||||
public SlotDescriptor(SlotId id, TupleDescriptor parent) {
|
||||
|
||||
@ -176,11 +176,18 @@ import java.util.stream.Stream;
|
||||
* </STRONG>
|
||||
*/
|
||||
public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, PlanTranslatorContext> {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(PhysicalPlanTranslator.class);
|
||||
protected StatsErrorEstimator statsErrorEstimator;
|
||||
PlanTranslatorContext context;
|
||||
|
||||
private final StatsErrorEstimator statsErrorEstimator;
|
||||
private final PlanTranslatorContext context;
|
||||
|
||||
public PhysicalPlanTranslator() {
|
||||
this(null, null);
|
||||
}
|
||||
|
||||
public PhysicalPlanTranslator(PlanTranslatorContext context) {
|
||||
this(context, null);
|
||||
}
|
||||
|
||||
public PhysicalPlanTranslator(PlanTranslatorContext context, StatsErrorEstimator statsErrorEstimator) {
|
||||
@ -188,11 +195,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
this.statsErrorEstimator = statsErrorEstimator;
|
||||
}
|
||||
|
||||
public PlanFragment translatePlan(PhysicalPlan physicalPlan, PlanTranslatorContext context) {
|
||||
this.context = context;
|
||||
return translatePlan(physicalPlan);
|
||||
}
|
||||
|
||||
/**
|
||||
* Translate Nereids Physical Plan tree to Stale Planner PlanFragment tree.
|
||||
*
|
||||
@ -497,8 +499,16 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
.addAll(olapScan.getOutput())
|
||||
.addAll(filterSlotsOfSelectedIndex(olapScan.getNonUserVisibleOutput(), olapScan))
|
||||
.build();
|
||||
Set<ExprId> deferredMaterializedExprIds = Collections.emptySet();
|
||||
if (olapScan.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).isPresent()) {
|
||||
deferredMaterializedExprIds = (Set<ExprId>) (olapScan
|
||||
.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).get());
|
||||
}
|
||||
OlapTable olapTable = olapScan.getTable();
|
||||
TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, olapTable, context);
|
||||
TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, olapTable, deferredMaterializedExprIds, context);
|
||||
if (olapScan.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).isPresent()) {
|
||||
injectRowIdColumnSlot(tupleDescriptor);
|
||||
}
|
||||
|
||||
// Use column with the same name in selected materialized index meta for slot desc,
|
||||
// to get the correct col unique id.
|
||||
@ -513,7 +523,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
tupleDescriptor.setTable(olapTable);
|
||||
|
||||
OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(), tupleDescriptor, "OlapScanNode");
|
||||
@ -875,7 +884,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context);
|
||||
sortNode.setOffset(topN.getOffset());
|
||||
sortNode.setLimit(topN.getLimit());
|
||||
if (topN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()) {
|
||||
if (topN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent()) {
|
||||
sortNode.setUseTopnOpt(true);
|
||||
PlanNode child = sortNode.getChild(0);
|
||||
Preconditions.checkArgument(child instanceof OlapScanNode,
|
||||
@ -929,6 +938,15 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
SortInfo sortInfo = new SortInfo(newOrderingExprList, ascOrderList, nullsFirstParamList, tupleDesc);
|
||||
SortNode sortNode = new SortNode(context.nextPlanNodeId(), childNode, sortInfo, true);
|
||||
sortNode.finalizeForNereids(tupleDesc, sortTupleOutputList, oldOrderingExprList);
|
||||
if (sort.getMutableState(PhysicalTopN.TWO_PHASE_READ_OPT).isPresent()) {
|
||||
sortNode.setUseTwoPhaseReadOpt(true);
|
||||
sortNode.getSortInfo().setUseTwoPhaseRead();
|
||||
injectRowIdColumnSlot(sortNode.getSortInfo().getSortTupleDescriptor());
|
||||
TupleDescriptor childTuple = childNode.getOutputTupleDesc() != null
|
||||
? childNode.getOutputTupleDesc() : context.getTupleDesc(childNode.getTupleIds().get(0));
|
||||
SlotDescriptor childRowIdDesc = childTuple.getSlots().get(childTuple.getSlots().size() - 1);
|
||||
sortNode.getResolvedTupleExprs().add(new SlotRef(childRowIdDesc));
|
||||
}
|
||||
if (sort.getStats() != null) {
|
||||
sortNode.setCardinality((long) sort.getStats().getRowCount());
|
||||
}
|
||||
@ -1093,7 +1111,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
continue;
|
||||
}
|
||||
SlotReference sf = leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId()));
|
||||
SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
|
||||
SlotDescriptor sd;
|
||||
if (sf == null && leftSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
|
||||
// TODO: temporary code for two phase read, should remove it after refactor
|
||||
sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, leftSlotDescriptor);
|
||||
} else {
|
||||
sd = context.createSlotDesc(intermediateDescriptor, sf);
|
||||
}
|
||||
leftIntermediateSlotDescriptor.add(sd);
|
||||
}
|
||||
} else if (hashJoin.getOtherJoinConjuncts().isEmpty()
|
||||
@ -1103,7 +1127,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
continue;
|
||||
}
|
||||
SlotReference sf = rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId()));
|
||||
SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
|
||||
SlotDescriptor sd;
|
||||
if (sf == null && rightSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
|
||||
// TODO: temporary code for two phase read, should remove it after refactor
|
||||
sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, rightSlotDescriptor);
|
||||
} else {
|
||||
sd = context.createSlotDesc(intermediateDescriptor, sf);
|
||||
}
|
||||
rightIntermediateSlotDescriptor.add(sd);
|
||||
}
|
||||
} else {
|
||||
@ -1112,9 +1142,15 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
continue;
|
||||
}
|
||||
SlotReference sf = leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId()));
|
||||
SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
|
||||
if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
|
||||
hashJoinNode.addSlotIdToHashOutputSlotIds(leftSlotDescriptor.getId());
|
||||
SlotDescriptor sd;
|
||||
if (sf == null && leftSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
|
||||
// TODO: temporary code for two phase read, should remove it after refactor
|
||||
sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, leftSlotDescriptor);
|
||||
} else {
|
||||
sd = context.createSlotDesc(intermediateDescriptor, sf);
|
||||
if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
|
||||
hashJoinNode.addSlotIdToHashOutputSlotIds(leftSlotDescriptor.getId());
|
||||
}
|
||||
}
|
||||
leftIntermediateSlotDescriptor.add(sd);
|
||||
}
|
||||
@ -1123,9 +1159,15 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
continue;
|
||||
}
|
||||
SlotReference sf = rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId()));
|
||||
SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
|
||||
if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
|
||||
hashJoinNode.addSlotIdToHashOutputSlotIds(rightSlotDescriptor.getId());
|
||||
SlotDescriptor sd;
|
||||
if (sf == null && rightSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
|
||||
// TODO: temporary code for two phase read, should remove it after refactor
|
||||
sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, rightSlotDescriptor);
|
||||
} else {
|
||||
sd = context.createSlotDesc(intermediateDescriptor, sf);
|
||||
if (hashOutputSlotReferenceMap.get(sf.getExprId()) != null) {
|
||||
hashJoinNode.addSlotIdToHashOutputSlotIds(rightSlotDescriptor.getId());
|
||||
}
|
||||
}
|
||||
rightIntermediateSlotDescriptor.add(sd);
|
||||
}
|
||||
@ -1280,7 +1322,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
continue;
|
||||
}
|
||||
SlotReference sf = leftChildOutputMap.get(context.findExprId(leftSlotDescriptor.getId()));
|
||||
SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
|
||||
SlotDescriptor sd;
|
||||
if (sf == null && leftSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
|
||||
// TODO: temporary code for two phase read, should remove it after refactor
|
||||
sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, leftSlotDescriptor);
|
||||
} else {
|
||||
sd = context.createSlotDesc(intermediateDescriptor, sf);
|
||||
}
|
||||
leftIntermediateSlotDescriptor.add(sd);
|
||||
}
|
||||
for (SlotDescriptor rightSlotDescriptor : rightSlotDescriptors) {
|
||||
@ -1288,7 +1336,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
continue;
|
||||
}
|
||||
SlotReference sf = rightChildOutputMap.get(context.findExprId(rightSlotDescriptor.getId()));
|
||||
SlotDescriptor sd = context.createSlotDesc(intermediateDescriptor, sf);
|
||||
SlotDescriptor sd;
|
||||
if (sf == null && rightSlotDescriptor.getColumn().getName().equals(Column.ROWID_COL)) {
|
||||
// TODO: temporary code for two phase read, should remove it after refactor
|
||||
sd = context.getDescTable().copySlotDescriptor(intermediateDescriptor, rightSlotDescriptor);
|
||||
} else {
|
||||
sd = context.createSlotDesc(intermediateDescriptor, sf);
|
||||
}
|
||||
rightIntermediateSlotDescriptor.add(sd);
|
||||
}
|
||||
|
||||
@ -1405,6 +1459,17 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
TupleDescriptor tupleDescriptor = generateTupleDesc(slotList, null, context);
|
||||
inputPlanNode.setProjectList(execExprList);
|
||||
inputPlanNode.setOutputTupleDesc(tupleDescriptor);
|
||||
// TODO: this is a temporary scheme to support two phase read when has project.
|
||||
// we need to refactor all topn opt into rbo stage.
|
||||
if (inputPlanNode instanceof OlapScanNode) {
|
||||
ArrayList<SlotDescriptor> slots = context.getTupleDesc(inputPlanNode.getTupleIds().get(0)).getSlots();
|
||||
SlotDescriptor lastSlot = slots.get(slots.size() - 1);
|
||||
if (lastSlot.getColumn() != null && lastSlot.getColumn().getName().equals(Column.ROWID_COL)) {
|
||||
inputPlanNode.getProjectList().add(new SlotRef(lastSlot));
|
||||
injectRowIdColumnSlot(tupleDescriptor);
|
||||
requiredSlotIdSet.add(lastSlot.getId());
|
||||
}
|
||||
}
|
||||
|
||||
if (inputPlanNode instanceof ScanNode) {
|
||||
updateChildSlotsMaterialization(inputPlanNode, requiredSlotIdSet, requiredByProjectSlotIdSet, context);
|
||||
@ -1739,6 +1804,19 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
}
|
||||
}
|
||||
|
||||
private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf table,
|
||||
Set<ExprId> deferredMaterializedExprIds, PlanTranslatorContext context) {
|
||||
TupleDescriptor tupleDescriptor = context.generateTupleDesc();
|
||||
tupleDescriptor.setTable(table);
|
||||
for (Slot slot : slotList) {
|
||||
SlotDescriptor slotDescriptor = context.createSlotDesc(tupleDescriptor, (SlotReference) slot, table);
|
||||
if (deferredMaterializedExprIds.contains(slot.getExprId())) {
|
||||
slotDescriptor.setNeedMaterialize(false);
|
||||
}
|
||||
}
|
||||
return tupleDescriptor;
|
||||
}
|
||||
|
||||
private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf table, PlanTranslatorContext context) {
|
||||
TupleDescriptor tupleDescriptor = context.generateTupleDesc();
|
||||
tupleDescriptor.setTable(table);
|
||||
@ -2153,4 +2231,16 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
statsErrorEstimator.updateLegacyPlanIdToPhysicalPlan(planNode, physicalPlan);
|
||||
}
|
||||
}
|
||||
|
||||
private SlotDescriptor injectRowIdColumnSlot(TupleDescriptor tupleDesc) {
|
||||
SlotDescriptor slotDesc = context.addSlotDesc(tupleDesc);
|
||||
LOG.debug("inject slot {}", slotDesc);
|
||||
String name = Column.ROWID_COL;
|
||||
Column col = new Column(name, Type.STRING, false, null, false, "", "rowid column");
|
||||
slotDesc.setType(Type.STRING);
|
||||
slotDesc.setColumn(col);
|
||||
slotDesc.setIsNullable(false);
|
||||
slotDesc.setIsMaterialized(true);
|
||||
return slotDesc;
|
||||
}
|
||||
}
|
||||
|
||||
@ -159,10 +159,6 @@ public class PlanTranslatorContext {
|
||||
return bufferedTupleForWindow;
|
||||
}
|
||||
|
||||
public void setBufferedTupleForWindow(TupleDescriptor bufferedTupleForWindow) {
|
||||
this.bufferedTupleForWindow = bufferedTupleForWindow;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create SlotDesc and add it to the mappings from expression to the stales expr.
|
||||
*/
|
||||
|
||||
@ -68,6 +68,7 @@ public class PlanPostProcessors {
|
||||
}
|
||||
builder.add(new Validator());
|
||||
builder.add(new TopNScanOpt());
|
||||
builder.add(new TwoPhaseReadOpt());
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,7 +19,6 @@ package org.apache.doris.nereids.processor.post;
|
||||
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.SlotReference;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.SortPhase;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.Filter;
|
||||
@ -36,6 +35,7 @@ import org.apache.doris.qe.ConnectContext;
|
||||
*/
|
||||
|
||||
public class TopNScanOpt extends PlanPostProcessor {
|
||||
|
||||
@Override
|
||||
public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
|
||||
topN.child().accept(this, ctx);
|
||||
@ -43,22 +43,23 @@ public class TopNScanOpt extends PlanPostProcessor {
|
||||
if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
|
||||
return topN;
|
||||
}
|
||||
long threshold = getTopNOptLimitThreshold();
|
||||
if (threshold == -1 || topN.getLimit() > threshold) {
|
||||
return topN;
|
||||
}
|
||||
if (topN.getOrderKeys().isEmpty()) {
|
||||
return topN;
|
||||
}
|
||||
Expression firstKey = topN.getOrderKeys().get(0).getExpr();
|
||||
|
||||
// topn opt
|
||||
long topNOptLimitThreshold = getTopNOptLimitThreshold();
|
||||
if (topNOptLimitThreshold == -1 || topN.getLimit() > topNOptLimitThreshold) {
|
||||
return topN;
|
||||
}
|
||||
// if firstKey's column is not present, it means the firstKey is not a original column from scan node
|
||||
// for example: "select cast(k1 as INT) as id from tbl1 order by id limit 2;" the firstKey "id" is
|
||||
// a cast expr which is not from tbl1 and its column is not present.
|
||||
// On the other hand "select k1 as id from tbl1 order by id limit 2;" the firstKey "id" is just an alias of k1
|
||||
// so its column is present which is valid for topN optimize
|
||||
// see Alias::toSlot() method to get how column info is passed around by alias of slotReference
|
||||
if (!(firstKey instanceof SlotReference) || !((SlotReference) firstKey).getColumn().isPresent()) {
|
||||
Expression firstKey = topN.getOrderKeys().get(0).getExpr();
|
||||
if (!firstKey.isColumnFromTable()) {
|
||||
return topN;
|
||||
}
|
||||
if (firstKey.getDataType().isStringLikeType()
|
||||
@ -66,15 +67,20 @@ public class TopNScanOpt extends PlanPostProcessor {
|
||||
|| firstKey.getDataType().isDoubleType()) {
|
||||
return topN;
|
||||
}
|
||||
while (child != null && (child instanceof Project || child instanceof Filter)) {
|
||||
|
||||
PhysicalOlapScan olapScan;
|
||||
while (child instanceof Project || child instanceof Filter) {
|
||||
child = child.child(0);
|
||||
}
|
||||
if (child instanceof PhysicalOlapScan) {
|
||||
PhysicalOlapScan scan = (PhysicalOlapScan) child;
|
||||
if (scan.getTable().isDupKeysOrMergeOnWrite()) {
|
||||
topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true);
|
||||
}
|
||||
if (!(child instanceof PhysicalOlapScan)) {
|
||||
return topN;
|
||||
}
|
||||
olapScan = (PhysicalOlapScan) child;
|
||||
|
||||
if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
|
||||
topN.setMutableState(PhysicalTopN.TOPN_OPT, true);
|
||||
}
|
||||
|
||||
return topN;
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,144 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.nereids.processor.post;
|
||||
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.properties.OrderKey;
|
||||
import org.apache.doris.nereids.trees.expressions.Alias;
|
||||
import org.apache.doris.nereids.trees.expressions.ExprId;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
import org.apache.doris.nereids.trees.expressions.Slot;
|
||||
import org.apache.doris.nereids.trees.plans.Plan;
|
||||
import org.apache.doris.nereids.trees.plans.SortPhase;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.Filter;
|
||||
import org.apache.doris.nereids.trees.plans.algebra.Project;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
|
||||
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* two phase read opt
|
||||
* refer to:
|
||||
* https://github.com/apache/doris/pull/15642
|
||||
* https://github.com/apache/doris/pull/16460
|
||||
* https://github.com/apache/doris/pull/16848
|
||||
*/
|
||||
|
||||
public class TwoPhaseReadOpt extends PlanPostProcessor {
|
||||
|
||||
@Override
|
||||
public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
|
||||
topN.child().accept(this, ctx);
|
||||
Plan child = topN.child();
|
||||
if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
|
||||
return topN;
|
||||
}
|
||||
if (topN.getOrderKeys().isEmpty()) {
|
||||
return topN;
|
||||
}
|
||||
|
||||
// topn opt
|
||||
long topNOptLimitThreshold = getTopNOptLimitThreshold();
|
||||
if (topNOptLimitThreshold < 0 || topN.getLimit() > topNOptLimitThreshold) {
|
||||
return topN;
|
||||
}
|
||||
if (!topN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable)) {
|
||||
return topN;
|
||||
}
|
||||
|
||||
PhysicalOlapScan olapScan;
|
||||
PhysicalProject<Plan> project = null;
|
||||
PhysicalFilter<Plan> filter = null;
|
||||
while (child instanceof Project || child instanceof Filter) {
|
||||
if (child instanceof Filter) {
|
||||
filter = (PhysicalFilter<Plan>) child;
|
||||
}
|
||||
if (child instanceof Project) {
|
||||
project = (PhysicalProject<Plan>) child;
|
||||
// TODO: remove this after fix two phase read on project core
|
||||
return topN;
|
||||
}
|
||||
child = child.child(0);
|
||||
}
|
||||
if (!(child instanceof PhysicalOlapScan)) {
|
||||
return topN;
|
||||
}
|
||||
olapScan = (PhysicalOlapScan) child;
|
||||
|
||||
// all order key must column from table
|
||||
if (!olapScan.getTable().getEnableLightSchemaChange()) {
|
||||
return topN;
|
||||
}
|
||||
|
||||
Map<ExprId, ExprId> projectRevertedMap = Maps.newHashMap();
|
||||
if (project != null) {
|
||||
for (Expression e : project.getProjects()) {
|
||||
if (e.isSlot()) {
|
||||
Slot slot = (Slot) e;
|
||||
projectRevertedMap.put(slot.getExprId(), slot.getExprId());
|
||||
} else if (e instanceof Alias) {
|
||||
Alias alias = (Alias) e;
|
||||
if (alias.child().isSlot()) {
|
||||
Slot slot = (Slot) alias.child();
|
||||
projectRevertedMap.put(alias.getExprId(), slot.getExprId());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Set<ExprId> deferredMaterializedExprIds = Sets.newHashSet(olapScan.getOutputExprIdSet());
|
||||
if (filter != null) {
|
||||
filter.getConjuncts().forEach(e -> deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds()));
|
||||
}
|
||||
topN.getOrderKeys().stream()
|
||||
.map(OrderKey::getExpr)
|
||||
.map(Slot.class::cast)
|
||||
.map(NamedExpression::getExprId)
|
||||
.map(projectRevertedMap::get)
|
||||
.filter(Objects::nonNull)
|
||||
.forEach(deferredMaterializedExprIds::remove);
|
||||
topN.getOrderKeys().stream()
|
||||
.map(OrderKey::getExpr)
|
||||
.map(Slot.class::cast)
|
||||
.map(NamedExpression::getExprId)
|
||||
.forEach(deferredMaterializedExprIds::remove);
|
||||
topN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true);
|
||||
olapScan.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, deferredMaterializedExprIds);
|
||||
|
||||
return topN;
|
||||
}
|
||||
|
||||
private long getTopNOptLimitThreshold() {
|
||||
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
|
||||
if (!ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) {
|
||||
return -1;
|
||||
}
|
||||
return ConnectContext.get().getSessionVariable().topnOptLimitThreshold;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
@ -178,6 +178,10 @@ public abstract class Expression extends AbstractTreeNode<Expression> implements
|
||||
return this instanceof Slot;
|
||||
}
|
||||
|
||||
public boolean isColumnFromTable() {
|
||||
return (this instanceof SlotReference) && ((SlotReference) this).getColumn().isPresent();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
|
||||
@ -41,6 +41,8 @@ import java.util.Optional;
|
||||
*/
|
||||
public class PhysicalOlapScan extends PhysicalRelation implements OlapScan {
|
||||
|
||||
public static final String DEFERRED_MATERIALIZED_SLOTS = "deferred_materialized_slots";
|
||||
|
||||
private final OlapTable olapTable;
|
||||
private final DistributionSpec distributionSpec;
|
||||
private final long selectedIndexId;
|
||||
|
||||
@ -39,7 +39,10 @@ import java.util.Optional;
|
||||
* Physical top-N plan.
|
||||
*/
|
||||
public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<CHILD_TYPE> implements TopN {
|
||||
public static String TOPN_RUNTIME_FILTER = "topn_runtime_filter";
|
||||
|
||||
public static final String TOPN_OPT = "topn_opt";
|
||||
public static final String TWO_PHASE_READ_OPT = "two_phase_read_opt";
|
||||
|
||||
private final long limit;
|
||||
private final long offset;
|
||||
|
||||
|
||||
@ -240,7 +240,7 @@ public class RuntimeFilterTest extends SSBTestBase {
|
||||
PhysicalPlan plan = checker.getPhysicalPlan();
|
||||
new PlanPostProcessors(checker.getCascadesContext()).process(plan);
|
||||
System.out.println(plan.treeString());
|
||||
new PhysicalPlanTranslator().translatePlan(plan, new PlanTranslatorContext(checker.getCascadesContext()));
|
||||
new PhysicalPlanTranslator(new PlanTranslatorContext(checker.getCascadesContext())).translatePlan(plan);
|
||||
RuntimeFilterContext context = checker.getCascadesContext().getRuntimeFilterContext();
|
||||
List<RuntimeFilter> filters = context.getNereidsRuntimeFilter();
|
||||
Assertions.assertEquals(filters.size(), context.getLegacyFilters().size() + context.getTargetNullCount());
|
||||
|
||||
@ -42,7 +42,7 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
|
||||
new PlanPostProcessors(checker.getCascadesContext()).process(plan);
|
||||
Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN);
|
||||
PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0);
|
||||
Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
|
||||
Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent());
|
||||
}
|
||||
|
||||
// topn rf do not apply on string-like and float column
|
||||
@ -56,6 +56,6 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
|
||||
new PlanPostProcessors(checker.getCascadesContext()).process(plan);
|
||||
Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN);
|
||||
PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0);
|
||||
Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
|
||||
Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent());
|
||||
}
|
||||
}
|
||||
|
||||
@ -96,7 +96,7 @@ public class AnalyzeSubQueryTest extends TestWithFeService implements MemoPatter
|
||||
PhysicalProperties.ANY
|
||||
);
|
||||
// Just to check whether translate will throw exception
|
||||
new PhysicalPlanTranslator().translatePlan(plan, new PlanTranslatorContext(planner.getCascadesContext()));
|
||||
new PhysicalPlanTranslator(new PlanTranslatorContext(planner.getCascadesContext())).translatePlan(plan);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -141,7 +141,7 @@ public class AnalyzeWhereSubqueryTest extends TestWithFeService implements MemoP
|
||||
PhysicalProperties.ANY
|
||||
);
|
||||
// Just to check whether translate will throw exception
|
||||
new PhysicalPlanTranslator().translatePlan(plan, new PlanTranslatorContext());
|
||||
new PhysicalPlanTranslator(new PlanTranslatorContext()).translatePlan(plan);
|
||||
} catch (Throwable t) {
|
||||
throw new IllegalStateException("Test sql failed: " + t.getMessage() + ", sql:\n" + sql, t);
|
||||
}
|
||||
|
||||
@ -133,7 +133,7 @@ public class RegisterCTETest extends TestWithFeService implements MemoPatternMat
|
||||
PhysicalProperties.ANY
|
||||
);
|
||||
// Just to check whether translate will throw exception
|
||||
new PhysicalPlanTranslator().translatePlan(plan, new PlanTranslatorContext());
|
||||
new PhysicalPlanTranslator(new PlanTranslatorContext()).translatePlan(plan);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -105,7 +105,7 @@ public class ViewTest extends TestWithFeService implements MemoPatternMatchSuppo
|
||||
PhysicalProperties.ANY
|
||||
);
|
||||
// Just to check whether translate will throw exception
|
||||
new PhysicalPlanTranslator().translatePlan(plan, new PlanTranslatorContext(planner.getCascadesContext()));
|
||||
new PhysicalPlanTranslator(new PlanTranslatorContext(planner.getCascadesContext())).translatePlan(plan);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user