[fix](Nereids) two phase read for topn only support simple case (#18955)

1. topn must has merge node
2. topn must the top node of plan
This commit is contained in:
morrySnow
2023-04-23 21:32:23 +08:00
committed by GitHub
parent 45d0f53529
commit 1e7ef35741
7 changed files with 49 additions and 27 deletions

View File

@ -884,7 +884,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
SortNode sortNode = translateSortNode(topN, inputFragment.getPlanRoot(), context);
sortNode.setOffset(topN.getOffset());
sortNode.setLimit(topN.getLimit());
if (topN.getMutableState(PhysicalTopN.TOPN_OPT).isPresent()) {
if (topN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent()) {
sortNode.setUseTopnOpt(true);
PlanNode child = sortNode.getChild(0);
Preconditions.checkArgument(child instanceof OlapScanNode,
@ -928,9 +928,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
});
List<Expr> sortTupleOutputList = new ArrayList<>();
List<Slot> outputList = sort.getOutput();
outputList.forEach(k -> {
sortTupleOutputList.add(ExpressionTranslator.translate(k, context));
});
outputList.forEach(k -> sortTupleOutputList.add(ExpressionTranslator.translate(k, context)));
// 2. Generate new Tuple and get current slotRef for newOrderingExprList
List<Expr> newOrderingExprList = Lists.newArrayList();
TupleDescriptor tupleDesc = generateTupleDesc(outputList, orderKeyList, newOrderingExprList, context, null);

View File

@ -18,10 +18,15 @@
package org.apache.doris.nereids.processor.post;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
/**
* PlanPostprocessor: a PlanVisitor to rewrite PhysicalPlan to new PhysicalPlan.
*/
public class PlanPostProcessor extends DefaultPlanRewriter<CascadesContext> {
public Plan processRoot(Plan plan, CascadesContext ctx) {
return plan.accept(this, ctx);
}
}

View File

@ -47,7 +47,7 @@ public class PlanPostProcessors {
public PhysicalPlan process(PhysicalPlan physicalPlan) {
PhysicalPlan resultPlan = physicalPlan;
for (PlanPostProcessor processor : getProcessors()) {
resultPlan = (PhysicalPlan) resultPlan.accept(processor, cascadesContext);
resultPlan = (PhysicalPlan) processor.processRoot(resultPlan, cascadesContext);
}
return resultPlan;
}

View File

@ -78,7 +78,7 @@ public class TopNScanOpt extends PlanPostProcessor {
olapScan = (PhysicalOlapScan) child;
if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
topN.setMutableState(PhysicalTopN.TOPN_OPT, true);
topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true);
}
return topN;

View File

@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.algebra.Filter;
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
@ -52,23 +53,40 @@ import java.util.Set;
public class TwoPhaseReadOpt extends PlanPostProcessor {
@Override
public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
topN.child().accept(this, ctx);
Plan child = topN.child();
if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
return topN;
public Plan processRoot(Plan plan, CascadesContext ctx) {
if (plan instanceof PhysicalTopN) {
PhysicalTopN<Plan> physicalTopN = (PhysicalTopN<Plan>) plan;
if (physicalTopN.getSortPhase() == SortPhase.MERGE_SORT) {
return plan.accept(this, ctx);
}
}
if (topN.getOrderKeys().isEmpty()) {
return topN;
return plan;
}
@Override
public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> mergeTopN, CascadesContext ctx) {
mergeTopN.child().accept(this, ctx);
Plan child = mergeTopN.child();
if (mergeTopN.getSortPhase() != SortPhase.MERGE_SORT || !(mergeTopN.child() instanceof PhysicalDistribute)) {
return mergeTopN;
}
PhysicalDistribute<Plan> distribute = (PhysicalDistribute<Plan>) mergeTopN.child();
if (!(distribute.child() instanceof PhysicalTopN)) {
return mergeTopN;
}
PhysicalTopN<Plan> localTopN = (PhysicalTopN<Plan>) distribute.child();
if (localTopN.getOrderKeys().isEmpty()) {
return mergeTopN;
}
// topn opt
long topNOptLimitThreshold = getTopNOptLimitThreshold();
if (topNOptLimitThreshold < 0 || topN.getLimit() > topNOptLimitThreshold) {
return topN;
if (topNOptLimitThreshold < 0 || mergeTopN.getLimit() > topNOptLimitThreshold) {
return mergeTopN;
}
if (!topN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable)) {
return topN;
if (!localTopN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable)) {
return mergeTopN;
}
PhysicalOlapScan olapScan;
@ -81,18 +99,18 @@ public class TwoPhaseReadOpt extends PlanPostProcessor {
if (child instanceof Project) {
project = (PhysicalProject<Plan>) child;
// TODO: remove this after fix two phase read on project core
return topN;
return mergeTopN;
}
child = child.child(0);
}
if (!(child instanceof PhysicalOlapScan)) {
return topN;
return mergeTopN;
}
olapScan = (PhysicalOlapScan) child;
// all order key must column from table
if (!olapScan.getTable().getEnableLightSchemaChange()) {
return topN;
return mergeTopN;
}
Map<ExprId, ExprId> projectRevertedMap = Maps.newHashMap();
@ -114,22 +132,23 @@ public class TwoPhaseReadOpt extends PlanPostProcessor {
if (filter != null) {
filter.getConjuncts().forEach(e -> deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds()));
}
topN.getOrderKeys().stream()
localTopN.getOrderKeys().stream()
.map(OrderKey::getExpr)
.map(Slot.class::cast)
.map(NamedExpression::getExprId)
.map(projectRevertedMap::get)
.filter(Objects::nonNull)
.forEach(deferredMaterializedExprIds::remove);
topN.getOrderKeys().stream()
localTopN.getOrderKeys().stream()
.map(OrderKey::getExpr)
.map(Slot.class::cast)
.map(NamedExpression::getExprId)
.forEach(deferredMaterializedExprIds::remove);
topN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true);
localTopN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true);
mergeTopN.setMutableState(PhysicalTopN.TWO_PHASE_READ_OPT, true);
olapScan.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, deferredMaterializedExprIds);
return topN;
return mergeTopN;
}
private long getTopNOptLimitThreshold() {

View File

@ -40,7 +40,7 @@ import java.util.Optional;
*/
public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<CHILD_TYPE> implements TopN {
public static final String TOPN_OPT = "topn_opt";
public static final String TOPN_RUNTIME_FILTER = "topn_runtime_filter";
public static final String TWO_PHASE_READ_OPT = "two_phase_read_opt";
private final long limit;