[refactor](Nereids) mv top-n two phase read rule from post processor to rewriter (#22487)

use three new plan node to represent defer materialize of TopN.
Example:

```
-- SQL
select * from t1 order by c1 limit 10;

-- PLAN
+------------------------------------------+
| Explain String                           |
+------------------------------------------+
| PhysicalDeferMaterializeResultSink       |
| --PhysicalDeferMaterializeTopN           |
| ----PhysicalDistribute                   |
| ------PhysicalDeferMaterializeTopN       |
| --------PhysicalDeferMaterializeOlapScan |
+------------------------------------------+
```
This commit is contained in:
morrySnow
2023-08-03 14:28:13 +08:00
committed by GitHub
parent 4f9969ce1e
commit 3961b8df76
40 changed files with 1559 additions and 390 deletions

View File

@ -57,8 +57,11 @@ import org.apache.doris.statistics.MVAnalysisTask;
import org.apache.doris.statistics.OlapAnalysisTask;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TCompressionType;
import org.apache.doris.thrift.TFetchOption;
import org.apache.doris.thrift.TOlapTable;
import org.apache.doris.thrift.TPrimitiveType;
import org.apache.doris.thrift.TSortType;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
@ -2234,4 +2237,43 @@ public class OlapTable extends Table {
public AutoIncrementGenerator getAutoIncrementGenerator() {
return autoIncrementGenerator;
}
/**
* generate two phase read fetch option from this olap table.
*
* @param selectedIndexId the index want to scan
*/
public TFetchOption generateTwoPhaseReadOption(long selectedIndexId) {
TFetchOption fetchOption = new TFetchOption();
fetchOption.setFetchRowStore(this.storeRowColumn());
fetchOption.setUseTwoPhaseFetch(true);
fetchOption.setNodesInfo(SystemInfoService.createAliveNodesInfo());
if (!this.storeRowColumn()) {
List<TColumn> columnsDesc = Lists.newArrayList();
getColumnDesc(selectedIndexId, columnsDesc, null, null);
fetchOption.setColumnDesc(columnsDesc);
}
return fetchOption;
}
public void getColumnDesc(long selectedIndexId, List<TColumn> columnsDesc, List<String> keyColumnNames,
List<TPrimitiveType> keyColumnTypes) {
if (selectedIndexId != -1) {
for (Column col : this.getSchemaByIndexId(selectedIndexId, true)) {
TColumn tColumn = col.toThrift();
col.setIndexFlag(tColumn, this);
if (columnsDesc != null) {
columnsDesc.add(tColumn);
}
if ((Util.showHiddenColumns() || (!Util.showHiddenColumns() && col.isVisible())) && col.isKey()) {
if (keyColumnNames != null) {
keyColumnNames.add(col.getName());
}
if (keyColumnTypes != null) {
keyColumnTypes.add(col.getDataType().toThrift());
}
}
}
}
}
}

View File

@ -24,6 +24,8 @@ import org.apache.doris.nereids.properties.DistributionSpecHash;
import org.apache.doris.nereids.properties.DistributionSpecReplicated;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
@ -99,6 +101,12 @@ class CostModelV1 extends PlanVisitor<Cost, PlanContext> {
return CostV1.ofCpu(statistics.getRowCount());
}
@Override
public Cost visitPhysicalDeferMaterializeOlapScan(PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan,
PlanContext context) {
return visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context);
}
public Cost visitPhysicalSchemaScan(PhysicalSchemaScan physicalSchemaScan, PlanContext context) {
Statistics statistics = context.getStatisticsWithCheck();
return CostV1.ofCpu(statistics.getRowCount());
@ -167,6 +175,12 @@ class CostModelV1 extends PlanVisitor<Cost, PlanContext> {
childStatistics.getRowCount());
}
@Override
public Cost visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> topN,
PlanContext context) {
return visitPhysicalTopN(topN.getPhysicalTopN(), context);
}
@Override
public Cost visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN, PlanContext context) {
Statistics statistics = context.getStatisticsWithCheck();

View File

@ -91,6 +91,9 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
@ -165,9 +168,7 @@ import org.apache.doris.planner.external.iceberg.IcebergScanNode;
import org.apache.doris.planner.external.jdbc.JdbcScanNode;
import org.apache.doris.planner.external.paimon.PaimonScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.tablefunction.TableValuedFunctionIf;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TFetchOption;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.thrift.TPushAggOp;
@ -228,19 +229,10 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
*/
public PlanFragment translatePlan(PhysicalPlan physicalPlan) {
PlanFragment rootFragment = physicalPlan.accept(this, context);
// TODO: why we need if? we should always set output expr?
// OlapSink? maybe OlapSink should not set output exprs by it self
if (rootFragment.getOutputExprs() == null) {
List<Expr> outputExprs = Lists.newArrayList();
physicalPlan.getOutput().stream().map(Slot::getExprId)
.forEach(exprId -> outputExprs.add(context.findSlotRef(exprId)));
rootFragment.setOutputExprs(outputExprs);
}
for (PlanFragment fragment : context.getPlanFragments()) {
fragment.finalize(null);
}
setResultSinkFetchOptionIfNeed();
List<Expr> outputExprs = Lists.newArrayList();
physicalPlan.getOutput().stream().map(Slot::getExprId)
.forEach(exprId -> outputExprs.add(context.findSlotRef(exprId)));
rootFragment.setOutputExprs(outputExprs);
Collections.reverse(context.getPlanFragments());
// TODO: maybe we need to trans nullable directly? and then we could remove call computeMemLayout
context.getDescTable().computeMemLayout();
@ -311,6 +303,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
} else {
inputFragment.setDestination(exchangeNode);
inputFragment.setOutputPartition(dataPartition);
DataStreamSink streamSink = new DataStreamSink(exchangeNode.getId());
streamSink.setOutputPartition(dataPartition);
inputFragment.setSink(streamSink);
}
context.addPlanFragment(parentFragment);
@ -324,13 +319,26 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
@Override
public PlanFragment visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink,
PlanTranslatorContext context) {
return physicalResultSink.child().accept(this, context);
PlanFragment planFragment = physicalResultSink.child().accept(this, context);
planFragment.setSink(new ResultSink(planFragment.getPlanRoot().getId()));
return planFragment;
}
@Override
public PlanFragment visitPhysicalDeferMaterializeResultSink(
PhysicalDeferMaterializeResultSink<? extends Plan> sink,
PlanTranslatorContext context) {
PlanFragment planFragment = visitPhysicalResultSink(sink.getPhysicalResultSink(), context);
TFetchOption fetchOption = sink.getOlapTable().generateTwoPhaseReadOption(sink.getSelectedIndexId());
((ResultSink) planFragment.getSink()).setFetchOption(fetchOption);
return planFragment;
}
@Override
public PlanFragment visitPhysicalOlapTableSink(PhysicalOlapTableSink<? extends Plan> olapTableSink,
PlanTranslatorContext context) {
PlanFragment rootFragment = olapTableSink.child().accept(this, context);
rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
TupleDescriptor olapTuple = context.generateTupleDesc();
List<Column> targetTableColumns = olapTableSink.getTargetTable().getFullSchema();
@ -341,26 +349,21 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
slotDesc.setColumn(column);
slotDesc.setIsNullable(column.isAllowNull());
}
OlapTableSink sink = new OlapTableSink(
olapTableSink.getTargetTable(),
olapTuple,
olapTableSink.getPartitionIds().isEmpty() ? null : olapTableSink.getPartitionIds(),
olapTableSink.isSingleReplicaLoad()
);
if (olapTableSink.isPartialUpdate()) {
HashSet<String> partialUpdateCols = new HashSet<String>();
HashSet<String> partialUpdateCols = new HashSet<>();
for (Column col : olapTableSink.getCols()) {
partialUpdateCols.add(col.getName());
}
sink.setPartialUpdateInputColumns(true, partialUpdateCols);
}
rootFragment.setSink(sink);
rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
return rootFragment;
}
@ -379,6 +382,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
.forEach(exprId -> outputExprs.add(context.findSlotRef(exprId)));
rootFragment.setOutputExprs(outputExprs);
// TODO: should not call legacy planner analyze in Nereids
try {
outFile.analyze(null, outputExprs,
fileSink.getOutput().stream().map(NamedExpression::getName).collect(Collectors.toList()));
@ -515,24 +519,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
@Override
public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTranslatorContext context) {
// deferred materialized slots used for topn opt.
Set<ExprId> deferredMaterializedExprIds = olapScan
.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS)
.map(s -> (Set<ExprId>) s)
.orElse(Collections.emptySet());
List<Slot> slots = olapScan.getOutput();
OlapTable olapTable = olapScan.getTable();
// generate real output tuple
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, olapTable, deferredMaterializedExprIds, context);
TupleDescriptor tupleDescriptor = generateTupleDesc(slots, olapTable, context);
// generate base index tuple because this fragment partitioned expr relay on slots of based index
if (olapScan.getSelectedIndexId() != olapScan.getTable().getBaseIndexId()) {
generateTupleDesc(olapScan.getBaseOutputs(), olapTable, deferredMaterializedExprIds, context);
}
// TODO: remove this, we should add this column in Nereids
if (olapScan.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).isPresent()) {
injectRowIdColumnSlot(tupleDescriptor);
generateTupleDesc(olapScan.getBaseOutputs(), olapTable, context);
}
OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(), tupleDescriptor, "OlapScanNode");
@ -590,6 +583,22 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
return planFragment;
}
@Override
public PlanFragment visitPhysicalDeferMaterializeOlapScan(
PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, PlanTranslatorContext context) {
PlanFragment planFragment = visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context);
OlapScanNode olapScanNode = (OlapScanNode) planFragment.getPlanRoot();
TupleDescriptor tupleDescriptor = context.getTupleDesc(olapScanNode.getTupleId());
for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) {
if (deferMaterializeOlapScan.getDeferMaterializeSlotIds()
.contains(context.findExprId(slotDescriptor.getId()))) {
slotDescriptor.setNeedMaterialize(false);
}
}
context.createSlotDesc(tupleDescriptor, deferMaterializeOlapScan.getColumnIdSlot());
return planFragment;
}
@Override
public PlanFragment visitPhysicalOneRowRelation(PhysicalOneRowRelation oneRowRelation,
PlanTranslatorContext context) {
@ -1734,6 +1743,26 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
return inputFragment;
}
@Override
public PlanFragment visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> topN,
PlanTranslatorContext context) {
PlanFragment planFragment = visitPhysicalTopN(topN.getPhysicalTopN(), context);
if (planFragment.getPlanRoot() instanceof SortNode) {
SortNode sortNode = (SortNode) planFragment.getPlanRoot();
sortNode.setUseTwoPhaseReadOpt(true);
sortNode.getSortInfo().setUseTwoPhaseRead();
TupleDescriptor tupleDescriptor = sortNode.getSortInfo().getSortTupleDescriptor();
for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) {
if (topN.getDeferMaterializeSlotIds()
.contains(context.findExprId(slotDescriptor.getId()))) {
slotDescriptor.setNeedMaterialize(false);
}
}
}
return planFragment;
}
@Override
public PlanFragment visitPhysicalRepeat(PhysicalRepeat<? extends Plan> repeat, PlanTranslatorContext context) {
PlanFragment inputPlanFragment = repeat.child(0).accept(this, context);
@ -1907,12 +1936,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
private SortNode translateSortNode(AbstractPhysicalSort<? extends Plan> sort, PlanNode childNode,
PlanTranslatorContext context) {
Set<ExprId> deferredMaterializedExprIds = sort
.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS)
.map(s -> (Set<ExprId>) s)
.orElse(Collections.emptySet());
TupleDescriptor sortTuple = generateTupleDesc(sort.child().getOutput(),
null, deferredMaterializedExprIds, context);
TupleDescriptor sortTuple = generateTupleDesc(sort.child().getOutput(), null, context);
List<Expr> orderingExprs = Lists.newArrayList();
List<Boolean> ascOrders = Lists.newArrayList();
List<Boolean> nullsFirstParams = Lists.newArrayList();
@ -1924,11 +1948,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
});
SortInfo sortInfo = new SortInfo(orderingExprs, ascOrders, nullsFirstParams, sortTuple);
SortNode sortNode = new SortNode(context.nextPlanNodeId(), childNode, sortInfo, sort instanceof PhysicalTopN);
if (sort.getMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS).isPresent()) {
sortNode.setUseTwoPhaseReadOpt(true);
sortNode.getSortInfo().setUseTwoPhaseRead();
injectRowIdColumnSlot(sortNode.getSortInfo().getSortTupleDescriptor());
}
if (sort.getStats() != null) {
sortNode.setCardinality((long) sort.getStats().getRowCount());
}
@ -1974,19 +1993,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
updateLegacyPlanIdToPhysicalPlan(planNode, filter);
}
private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf table,
Set<ExprId> deferredMaterializedExprIds, PlanTranslatorContext context) {
TupleDescriptor tupleDescriptor = context.generateTupleDesc();
tupleDescriptor.setTable(table);
for (Slot slot : slotList) {
SlotDescriptor slotDescriptor = context.createSlotDesc(tupleDescriptor, (SlotReference) slot, table);
if (deferredMaterializedExprIds.contains(slot.getExprId())) {
slotDescriptor.setNeedMaterialize(false);
}
}
return tupleDescriptor;
}
private TupleDescriptor generateTupleDesc(List<Slot> slotList, TableIf table, PlanTranslatorContext context) {
TupleDescriptor tupleDescriptor = context.generateTupleDesc();
tupleDescriptor.setTable(table);
@ -2172,71 +2178,19 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
slotDesc.setIsMaterialized(true);
}
/**
* We use two phase read to optimize sql like: select * from tbl [where xxx = ???] [order by column1] [limit n]
* in the first phase, we add an extra column `RowId` to Block, and sort blocks in TopN nodes
* in the second phase, we have n rows, we do a fetch rpc to get all rowids data for the n rows
* and reconstruct the final block
*/
private void setResultSinkFetchOptionIfNeed() {
boolean needFetch = false;
// Only single olap table should be fetched
OlapTable fetchOlapTable = null;
OlapScanNode scanNode = null;
for (PlanFragment fragment : context.getPlanFragments()) {
PlanNode node = fragment.getPlanRoot();
PlanNode parent = null;
// OlapScanNode is the last node.
// So, just get the last two node and check if they are SortNode and OlapScan.
while (node.getChildren().size() != 0) {
parent = node;
node = node.getChildren().get(0);
}
// case1: general topn optimized query
if ((node instanceof OlapScanNode) && (parent instanceof SortNode)) {
SortNode sortNode = (SortNode) parent;
scanNode = (OlapScanNode) node;
if (sortNode.getUseTwoPhaseReadOpt()) {
needFetch = true;
fetchOlapTable = scanNode.getOlapTable();
break;
}
}
}
for (PlanFragment fragment : context.getPlanFragments()) {
if (needFetch && fragment.getSink() instanceof ResultSink) {
TFetchOption fetchOption = new TFetchOption();
fetchOption.setFetchRowStore(fetchOlapTable.storeRowColumn());
fetchOption.setUseTwoPhaseFetch(true);
fetchOption.setNodesInfo(SystemInfoService.createAliveNodesInfo());
if (!fetchOlapTable.storeRowColumn()) {
// Set column desc for each column
List<TColumn> columnsDesc = new ArrayList<>();
scanNode.getColumnDesc(columnsDesc, null, null);
fetchOption.setColumnDesc(columnsDesc);
}
((ResultSink) fragment.getSink()).setFetchOption(fetchOption);
break;
}
}
}
/**
* topN opt: using storage data ordering to accelerate topn operation.
* refer pr: optimize topn query if order by columns is prefix of sort keys of table (#10694)
*/
private boolean checkPushSort(SortNode sortNode, OlapTable olapTable) {
// Ensure limit is less then threshold
// Ensure limit is less than threshold
if (sortNode.getLimit() <= 0
|| sortNode.getLimit() > ConnectContext.get().getSessionVariable().topnOptLimitThreshold) {
return false;
}
// Ensure all isAscOrder is same, ande length != 0.
// Can't be zorder.
if (sortNode.getSortInfo().getIsAscOrder().stream().distinct().count() != 1
|| olapTable.isZOrderSort()) {
// Ensure all isAscOrder is same, ande length != 0. Can't be z-order.
if (sortNode.getSortInfo().getIsAscOrder().stream().distinct().count() != 1 || olapTable.isZOrderSort()) {
return false;
}

View File

@ -46,6 +46,7 @@ import org.apache.doris.nereids.rules.rewrite.CollectProjectAboveConsumer;
import org.apache.doris.nereids.rules.rewrite.ColumnPruning;
import org.apache.doris.nereids.rules.rewrite.ConvertInnerOrCrossJoin;
import org.apache.doris.nereids.rules.rewrite.CountDistinctRewrite;
import org.apache.doris.nereids.rules.rewrite.DeferMaterializeTopNResult;
import org.apache.doris.nereids.rules.rewrite.EliminateAggregate;
import org.apache.doris.nereids.rules.rewrite.EliminateDedupJoinCondition;
import org.apache.doris.nereids.rules.rewrite.EliminateFilter;
@ -283,6 +284,9 @@ public class Rewriter extends AbstractBatchJobExecutor {
bottomUp(RuleSet.PUSH_DOWN_FILTERS),
custom(RuleType.ELIMINATE_UNNECESSARY_PROJECT, EliminateUnnecessaryProject::new)
),
topic("topn optimize",
topDown(new DeferMaterializeTopNResult())
),
// this rule batch must keep at the end of rewrite to do some plan check
topic("Final rewrite and check",
custom(RuleType.ENSURE_PROJECT_ON_TOP_JOIN, EnsureProjectOnTopJoin::new),

View File

@ -71,7 +71,6 @@ public class PlanPostProcessors {
builder.add(new Validator());
builder.add(new RecomputeLogicalPropertiesProcessor());
builder.add(new TopNScanOpt());
builder.add(new TwoPhaseReadOpt());
return builder.build();
}
}

View File

@ -22,22 +22,23 @@ import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.algebra.Filter;
import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.qe.ConnectContext;
/**
* topN opt
* refer to:
* https://github.com/apache/doris/pull/15558
* https://github.com/apache/doris/pull/15663
* <a href="https://github.com/apache/doris/pull/15558">...</a>
* <a href="https://github.com/apache/doris/pull/15663">...</a>
*/
public class TopNScanOpt extends PlanPostProcessor {
@Override
public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
public PhysicalTopN<? extends Plan> visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) {
topN.child().accept(this, ctx);
Plan child = topN.child();
if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
@ -52,7 +53,7 @@ public class TopNScanOpt extends PlanPostProcessor {
if (topNOptLimitThreshold == -1 || topN.getLimit() > topNOptLimitThreshold) {
return topN;
}
// if firstKey's column is not present, it means the firstKey is not a original column from scan node
// if firstKey's column is not present, it means the firstKey is not an original column from scan node
// for example: "select cast(k1 as INT) as id from tbl1 order by id limit 2;" the firstKey "id" is
// a cast expr which is not from tbl1 and its column is not present.
// On the other hand "select k1 as id from tbl1 order by id limit 2;" the firstKey "id" is just an alias of k1
@ -68,14 +69,14 @@ public class TopNScanOpt extends PlanPostProcessor {
return topN;
}
PhysicalOlapScan olapScan;
OlapScan olapScan;
while (child instanceof Project || child instanceof Filter) {
child = child.child(0);
}
if (!(child instanceof PhysicalOlapScan)) {
if (!(child instanceof OlapScan)) {
return topN;
}
olapScan = (PhysicalOlapScan) child;
olapScan = (OlapScan) child;
if (olapScan.getTable().isDupKeysOrMergeOnWrite()) {
topN.setMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER, true);
@ -84,6 +85,12 @@ public class TopNScanOpt extends PlanPostProcessor {
return topN;
}
@Override
public Plan visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> topN,
CascadesContext context) {
return topN.withPhysicalTopN(visitPhysicalTopN(topN.getPhysicalTopN(), context));
}
private long getTopNOptLimitThreshold() {
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
return ConnectContext.get().getSessionVariable().topnOptLimitThreshold;

View File

@ -1,163 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.processor.post;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.algebra.Filter;
import org.apache.doris.nereids.trees.plans.algebra.Project;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* two phase read opt
* refer to:
* https://github.com/apache/doris/pull/15642
* https://github.com/apache/doris/pull/16460
* https://github.com/apache/doris/pull/16848
*/
public class TwoPhaseReadOpt extends PlanPostProcessor {
@Override
public Plan processRoot(Plan plan, CascadesContext ctx) {
if (plan instanceof PhysicalTopN) {
PhysicalTopN<Plan> physicalTopN = (PhysicalTopN<Plan>) plan;
if (physicalTopN.getSortPhase() == SortPhase.MERGE_SORT) {
return plan.accept(this, ctx);
}
}
return plan;
}
@Override
public PhysicalTopN visitPhysicalTopN(PhysicalTopN<? extends Plan> mergeTopN, CascadesContext ctx) {
mergeTopN.child().accept(this, ctx);
if (mergeTopN.getSortPhase() != SortPhase.MERGE_SORT || !(mergeTopN.child() instanceof PhysicalDistribute)) {
return mergeTopN;
}
PhysicalDistribute<Plan> distribute = (PhysicalDistribute<Plan>) mergeTopN.child();
if (!(distribute.child() instanceof PhysicalTopN)) {
return mergeTopN;
}
PhysicalTopN<Plan> localTopN = (PhysicalTopN<Plan>) distribute.child();
if (localTopN.getOrderKeys().isEmpty()) {
return mergeTopN;
}
// topn opt
long topNOptLimitThreshold = getTopNOptLimitThreshold();
if (topNOptLimitThreshold < 0 || mergeTopN.getLimit() > topNOptLimitThreshold) {
return mergeTopN;
}
if (!localTopN.getOrderKeys().stream().map(OrderKey::getExpr).allMatch(Expression::isColumnFromTable)) {
return mergeTopN;
}
PhysicalOlapScan olapScan;
PhysicalProject<Plan> project = null;
PhysicalFilter<Plan> filter = null;
Plan child = localTopN.child();
while (child instanceof Project || child instanceof Filter) {
if (child instanceof Filter) {
filter = (PhysicalFilter<Plan>) child;
}
if (child instanceof Project) {
project = (PhysicalProject<Plan>) child;
// TODO: remove this after fix two phase read on project core
return mergeTopN;
}
child = child.child(0);
}
if (!(child instanceof PhysicalOlapScan)) {
return mergeTopN;
}
olapScan = (PhysicalOlapScan) child;
// all order key must column from table
if (!olapScan.getTable().getEnableLightSchemaChange()) {
return mergeTopN;
}
Map<ExprId, ExprId> projectRevertedMap = Maps.newHashMap();
if (project != null) {
for (Expression e : project.getProjects()) {
if (e.isSlot()) {
Slot slot = (Slot) e;
projectRevertedMap.put(slot.getExprId(), slot.getExprId());
} else if (e instanceof Alias) {
Alias alias = (Alias) e;
if (alias.child().isSlot()) {
Slot slot = (Slot) alias.child();
projectRevertedMap.put(alias.getExprId(), slot.getExprId());
}
}
}
}
Set<ExprId> deferredMaterializedExprIds = Sets.newHashSet(olapScan.getOutputExprIdSet());
if (filter != null) {
filter.getConjuncts().forEach(e -> deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds()));
}
localTopN.getOrderKeys().stream()
.map(OrderKey::getExpr)
.map(Slot.class::cast)
.map(NamedExpression::getExprId)
.map(projectRevertedMap::get)
.filter(Objects::nonNull)
.forEach(deferredMaterializedExprIds::remove);
localTopN.getOrderKeys().stream()
.map(OrderKey::getExpr)
.map(Slot.class::cast)
.map(NamedExpression::getExprId)
.forEach(deferredMaterializedExprIds::remove);
olapScan.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, deferredMaterializedExprIds);
localTopN.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, deferredMaterializedExprIds);
mergeTopN.setMutableState(PhysicalOlapScan.DEFERRED_MATERIALIZED_SLOTS, deferredMaterializedExprIds);
return mergeTopN;
}
private long getTopNOptLimitThreshold() {
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
if (!ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) {
return -1;
}
return ConnectContext.get().getSessionVariable().topnOptLimitThreshold;
}
return -1;
}
}

View File

@ -34,6 +34,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
@ -141,6 +142,12 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties,
return new PhysicalProperties(olapScan.getDistributionSpec());
}
@Override
public PhysicalProperties visitPhysicalDeferMaterializeOlapScan(
PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, PlanContext context) {
return visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context);
}
@Override
public PhysicalProperties visitPhysicalOneRowRelation(PhysicalOneRowRelation oneRowRelation, PlanContext context) {
return PhysicalProperties.GATHER;

View File

@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
@ -109,6 +110,14 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> {
return null;
}
@Override
public Void visitPhysicalDeferMaterializeResultSink(
PhysicalDeferMaterializeResultSink<? extends Plan> sink,
PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
return null;
}
/* ********************************************************************************************
* Other Node, in lexicographical order
* ******************************************************************************************** */

View File

@ -45,6 +45,9 @@ import org.apache.doris.nereids.rules.implementation.LogicalAssertNumRowsToPhysi
import org.apache.doris.nereids.rules.implementation.LogicalCTEAnchorToPhysicalCTEAnchor;
import org.apache.doris.nereids.rules.implementation.LogicalCTEConsumerToPhysicalCTEConsumer;
import org.apache.doris.nereids.rules.implementation.LogicalCTEProducerToPhysicalCTEProducer;
import org.apache.doris.nereids.rules.implementation.LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.rules.implementation.LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink;
import org.apache.doris.nereids.rules.implementation.LogicalDeferMaterializeTopNToPhysicalDeferMaterializeTopN;
import org.apache.doris.nereids.rules.implementation.LogicalEmptyRelationToPhysicalEmptyRelation;
import org.apache.doris.nereids.rules.implementation.LogicalEsScanToPhysicalEsScan;
import org.apache.doris.nereids.rules.implementation.LogicalExceptToPhysicalExcept;
@ -143,6 +146,7 @@ public class RuleSet {
.add(new LogicalJoinToHashJoin())
.add(new LogicalJoinToNestedLoopJoin())
.add(new LogicalOlapScanToPhysicalOlapScan())
.add(new LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan())
.add(new LogicalSchemaScanToPhysicalSchemaScan())
.add(new LogicalFileScanToPhysicalFileScan())
.add(new LogicalJdbcScanToPhysicalJdbcScan())
@ -152,6 +156,7 @@ public class RuleSet {
.add(new LogicalWindowToPhysicalWindow())
.add(new LogicalSortToPhysicalQuickSort())
.add(new LogicalTopNToPhysicalTopN())
.add(new LogicalDeferMaterializeTopNToPhysicalDeferMaterializeTopN())
.add(new LogicalPartitionTopNToPhysicalPartitionTopN())
.add(new LogicalAssertNumRowsToPhysicalAssertNumRows())
.add(new LogicalOneRowRelationToPhysicalOneRowRelation())
@ -165,6 +170,7 @@ public class RuleSet {
.add(new LogicalOlapTableSinkToPhysicalOlapTableSink())
.add(new LogicalFileSinkToPhysicalFileSink())
.add(new LogicalResultSinkToPhysicalResultSink())
.add(new LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink())
.build();
public static final List<Rule> ZIG_ZAG_TREE_JOIN_REORDER = planRuleFactories()

View File

@ -248,6 +248,9 @@ public enum RuleType {
COLLECT_PROJECT_ABOVE_FILTER_CONSUMER(RuleTypeClass.REWRITE),
REWRITE_SENTINEL(RuleTypeClass.REWRITE),
// topn opts
DEFER_MATERIALIZE_TOP_N_RESULT(RuleTypeClass.REWRITE),
// exploration rules
TEST_EXPLORATION(RuleTypeClass.EXPLORATION),
OR_EXPANSION(RuleTypeClass.EXPLORATION),
@ -295,16 +298,19 @@ public enum RuleType {
LOGICAL_CTE_ANCHOR_TO_PHYSICAL_CTE_ANCHOR_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_SORT_TO_PHYSICAL_QUICK_SORT_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_TOP_N_TO_PHYSICAL_TOP_N_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_DEFER_MATERIALIZE_TOP_N_TO_PHYSICAL_DEFER_MATERIALIZE_TOP_N_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_PARTITION_TOP_N_TO_PHYSICAL_PARTITION_TOP_N_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_EMPTY_RELATION_TO_PHYSICAL_EMPTY_RELATION_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_LIMIT_TO_PHYSICAL_LIMIT_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_OLAP_SCAN_TO_PHYSICAL_OLAP_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_DEFER_MATERIALIZE_OLAP_SCAN_TO_PHYSICAL_DEFER_MATERIALIZE_OLAP_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_SCHEMA_SCAN_TO_PHYSICAL_SCHEMA_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_FILE_SCAN_TO_PHYSICAL_FILE_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_JDBC_SCAN_TO_PHYSICAL_JDBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ASSERT_NUM_ROWS_TO_PHYSICAL_ASSERT_NUM_ROWS(RuleTypeClass.IMPLEMENTATION),
STORAGE_LAYER_AGGREGATE_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION),

View File

@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.implementation;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import java.util.Optional;
/**
* implement defer materialize olap scan from logical to physical
*/
public class LogicalDeferMaterializeOlapScanToPhysicalDeferMaterializeOlapScan extends OneImplementationRuleFactory {
@Override
public Rule build() {
return logicalDeferMaterializeOlapScan().thenApply(ctx -> {
LogicalDeferMaterializeOlapScan logicalDeferOlapScan = ctx.root;
PhysicalOlapScan physicalOlapScan = (PhysicalOlapScan) new LogicalOlapScanToPhysicalOlapScan().build()
.transform(logicalDeferOlapScan.getLogicalOlapScan(), ctx.cascadesContext).get(0);
return new PhysicalDeferMaterializeOlapScan(physicalOlapScan,
logicalDeferOlapScan.getDeferMaterializeSlotIds(),
logicalDeferOlapScan.getColumnIdSlot(),
Optional.empty(),
logicalDeferOlapScan.getLogicalProperties());
}).toRule(RuleType.LOGICAL_DEFER_MATERIALIZE_OLAP_SCAN_TO_PHYSICAL_DEFER_MATERIALIZE_OLAP_SCAN_RULE);
}
}

View File

@ -0,0 +1,48 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.implementation;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
import java.util.Optional;
/**
* implement defer materialize result sink from logical to physical
*/
public class LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink
extends OneImplementationRuleFactory {
@Override
public Rule build() {
return logicalDeferMaterializeResultSink().thenApply(ctx -> {
LogicalDeferMaterializeResultSink<? extends Plan> sink = ctx.root;
PhysicalResultSink<? extends Plan> physicalResultSink
= (PhysicalResultSink<? extends Plan>) new LogicalResultSinkToPhysicalResultSink()
.build()
.transform(sink.getLogicalResultSink(), ctx.cascadesContext)
.get(0);
return new PhysicalDeferMaterializeResultSink<>(
physicalResultSink, sink.getOlapTable(), sink.getSelectedIndexId(),
Optional.empty(), sink.getLogicalProperties(), sink.child());
}).toRule(RuleType.LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE);
}
}

View File

@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.implementation;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import java.util.Optional;
/**
* implement defer materialize top n from logical to physical
*/
public class LogicalDeferMaterializeTopNToPhysicalDeferMaterializeTopN extends OneImplementationRuleFactory {
@Override
public Rule build() {
return logicalDeferMaterializeTopN().thenApply(ctx -> {
LogicalDeferMaterializeTopN<? extends Plan> topN = ctx.root;
PhysicalTopN<? extends Plan> physicalTopN = (PhysicalTopN<? extends Plan>) new LogicalTopNToPhysicalTopN()
.build()
.transform(topN.getLogicalTopN(), ctx.cascadesContext)
.get(0);
return wrap(physicalTopN, topN, wrap((PhysicalTopN<? extends Plan>) physicalTopN.child(), topN,
((PhysicalTopN<?>) physicalTopN.child()).child()));
}).toRule(RuleType.LOGICAL_DEFER_MATERIALIZE_TOP_N_TO_PHYSICAL_DEFER_MATERIALIZE_TOP_N_RULE);
}
private PhysicalDeferMaterializeTopN<? extends Plan> wrap(PhysicalTopN<? extends Plan> physicalTopN,
LogicalDeferMaterializeTopN<? extends Plan> logicalWrapped, Plan child) {
return new PhysicalDeferMaterializeTopN<>(physicalTopN,
logicalWrapped.getDeferMaterializeSlotIds(), logicalWrapped.getColumnIdSlot(),
Optional.empty(), logicalWrapped.getLogicalProperties(), child);
}
}

View File

@ -0,0 +1,113 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.rules.rewrite;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Type;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/**
* rewrite simple top n query to defer materialize slot not use for sort or predicate
*/
public class DeferMaterializeTopNResult implements RewriteRuleFactory {
@Override
public List<Rule> buildRules() {
return ImmutableList.of(
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(logicalTopN(logicalOlapScan()))
.when(r -> r.child().getLimit() < getTopNOptLimitThreshold())
.whenNot(r -> r.child().getOrderKeys().isEmpty())
.when(r -> r.child().getOrderKeys().stream().map(OrderKey::getExpr)
.allMatch(Expression::isColumnFromTable))
.when(r -> r.child().child().getTable().getEnableLightSchemaChange())
.then(r -> deferMaterialize(r, r.child(), Optional.empty(), r.child().child()))
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
logicalResultSink(logicalTopN(logicalFilter(logicalOlapScan())))
.when(r -> r.child().getLimit() < getTopNOptLimitThreshold())
.whenNot(r -> r.child().getOrderKeys().isEmpty())
.when(r -> r.child().getOrderKeys().stream().map(OrderKey::getExpr)
.allMatch(Expression::isColumnFromTable))
.when(r -> r.child().child().child().getTable().getEnableLightSchemaChange())
.then(r -> {
LogicalFilter<LogicalOlapScan> filter = r.child().child();
return deferMaterialize(r, r.child(), Optional.of(filter), filter.child());
})
)
);
}
private Plan deferMaterialize(LogicalResultSink<? extends Plan> logicalResultSink,
LogicalTopN<? extends Plan> logicalTopN, Optional<LogicalFilter<? extends Plan>> logicalFilter,
LogicalOlapScan logicalOlapScan) {
Column rowId = new Column(Column.ROWID_COL, Type.STRING, false, null, false, "", "rowid column");
SlotReference columnId = SlotReference.fromColumn(rowId, logicalOlapScan.getQualifier());
Set<ExprId> deferredMaterializedExprIds = Sets.newHashSet(logicalOlapScan.getOutputExprIdSet());
logicalFilter.ifPresent(filter -> filter.getConjuncts()
.forEach(e -> deferredMaterializedExprIds.removeAll(e.getInputSlotExprIds())));
logicalTopN.getOrderKeys().stream()
.map(OrderKey::getExpr)
.map(Slot.class::cast)
.map(NamedExpression::getExprId)
.filter(Objects::nonNull)
.forEach(deferredMaterializedExprIds::remove);
LogicalDeferMaterializeOlapScan deferOlapScan = new LogicalDeferMaterializeOlapScan(
logicalOlapScan, deferredMaterializedExprIds, columnId);
Plan root = logicalFilter.map(f -> f.withChildren(deferOlapScan)).orElse(deferOlapScan);
root = new LogicalDeferMaterializeTopN<>((LogicalTopN<? extends Plan>) logicalTopN.withChildren(root),
deferredMaterializedExprIds, columnId);
root = logicalResultSink.withChildren(root);
return new LogicalDeferMaterializeResultSink<>((LogicalResultSink<? extends Plan>) root,
logicalOlapScan.getTable(), logicalOlapScan.getSelectedIndexId());
}
private long getTopNOptLimitThreshold() {
if (ConnectContext.get() != null && ConnectContext.get().getSessionVariable() != null) {
if (!ConnectContext.get().getSessionVariable().enableTwoPhaseReadOpt) {
return -1;
}
return ConnectContext.get().getSessionVariable().topnOptLimitThreshold;
}
return -1;
}
}

View File

@ -57,6 +57,8 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
@ -83,6 +85,8 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
@ -283,6 +287,12 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
return computeCatalogRelation(olapScan);
}
@Override
public Statistics visitLogicalDeferMaterializeOlapScan(LogicalDeferMaterializeOlapScan deferMaterializeOlapScan,
Void context) {
return computeCatalogRelation(deferMaterializeOlapScan.getLogicalOlapScan());
}
@Override
public Statistics visitLogicalSchemaScan(LogicalSchemaScan schemaScan, Void context) {
return computeCatalogRelation(schemaScan);
@ -326,6 +336,11 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
return computeTopN(topN);
}
@Override
public Statistics visitLogicalDeferMaterializeTopN(LogicalDeferMaterializeTopN<? extends Plan> topN, Void context) {
return computeTopN(topN.getLogicalTopN());
}
@Override
public Statistics visitLogicalPartitionTopN(LogicalPartitionTopN<? extends Plan> partitionTopN, Void context) {
return computePartitionTopN(partitionTopN);
@ -410,6 +425,12 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
return computeCatalogRelation(olapScan);
}
@Override
public Statistics visitPhysicalDeferMaterializeOlapScan(PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan,
Void context) {
return computeCatalogRelation(deferMaterializeOlapScan.getPhysicalOlapScan());
}
@Override
public Statistics visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, Void context) {
return computeCatalogRelation(schemaScan);
@ -451,6 +472,12 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
return computeTopN(topN);
}
@Override
public Statistics visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> topN,
Void context) {
return computeTopN(topN.getPhysicalTopN());
}
@Override
public Statistics visitPhysicalHashJoin(
PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, Void context) {

View File

@ -25,6 +25,7 @@ import org.apache.doris.nereids.trees.expressions.MarkJoinSlotReference;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.OrderExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
import org.apache.doris.nereids.trees.expressions.SubqueryExpr;
import org.apache.doris.nereids.trees.expressions.functions.Function;
@ -35,6 +36,8 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
@ -171,6 +174,19 @@ public class LogicalPlanDeepCopier extends DefaultPlanRewriter<DeepCopierContext
return newOlapScan;
}
@Override
public Plan visitLogicalDeferMaterializeOlapScan(LogicalDeferMaterializeOlapScan deferMaterializeOlapScan,
DeepCopierContext context) {
LogicalOlapScan newScan = (LogicalOlapScan) visitLogicalOlapScan(
deferMaterializeOlapScan.getLogicalOlapScan(), context);
Set<ExprId> newSlotIds = deferMaterializeOlapScan.getDeferMaterializeSlotIds().stream()
.map(context.exprIdReplaceMap::get)
.collect(ImmutableSet.toImmutableSet());
SlotReference newRowId = (SlotReference) ExpressionDeepCopier.INSTANCE
.deepCopy(deferMaterializeOlapScan.getColumnIdSlot(), context);
return new LogicalDeferMaterializeOlapScan(newScan, newSlotIds, newRowId);
}
@Override
public Plan visitLogicalSchemaScan(LogicalSchemaScan schemaScan, DeepCopierContext context) {
if (context.getRelationReplaceMap().containsKey(schemaScan.getRelationId())) {
@ -263,6 +279,19 @@ public class LogicalPlanDeepCopier extends DefaultPlanRewriter<DeepCopierContext
return new LogicalTopN<>(orderKeys, topN.getLimit(), topN.getOffset(), child);
}
@Override
public Plan visitLogicalDeferMaterializeTopN(LogicalDeferMaterializeTopN<? extends Plan> topN,
DeepCopierContext context) {
LogicalTopN<? extends Plan> newTopN
= (LogicalTopN<? extends Plan>) visitLogicalTopN(topN.getLogicalTopN(), context);
Set<ExprId> newSlotIds = topN.getDeferMaterializeSlotIds().stream()
.map(context.exprIdReplaceMap::get)
.collect(ImmutableSet.toImmutableSet());
SlotReference newRowId = (SlotReference) ExpressionDeepCopier.INSTANCE
.deepCopy(topN.getColumnIdSlot(), context);
return new LogicalDeferMaterializeTopN<>(newTopN, newSlotIds, newRowId);
}
@Override
public Plan visitLogicalPartitionTopN(LogicalPartitionTopN<? extends Plan> partitionTopN,
DeepCopierContext context) {

View File

@ -0,0 +1,168 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/**
* use for defer materialize top n
*/
public class LogicalDeferMaterializeOlapScan extends LogicalCatalogRelation implements OlapScan {
private final LogicalOlapScan logicalOlapScan;
///////////////////////////////////////////////////////////////////////////
// Members for defer materialize for top-n opt.
///////////////////////////////////////////////////////////////////////////
private final Set<ExprId> deferMaterializeSlotIds;
private final SlotReference columnIdSlot;
public LogicalDeferMaterializeOlapScan(LogicalOlapScan logicalOlapScan,
Set<ExprId> deferMaterializeSlotIds, SlotReference columnIdSlot) {
this(logicalOlapScan, deferMaterializeSlotIds, columnIdSlot,
logicalOlapScan.getGroupExpression(), Optional.empty());
}
/**
* constructor
*/
public LogicalDeferMaterializeOlapScan(LogicalOlapScan logicalOlapScan,
Set<ExprId> deferMaterializeSlotIds, SlotReference columnIdSlot,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties) {
super(logicalOlapScan.getRelationId(), logicalOlapScan.getType(), logicalOlapScan.getTable(),
logicalOlapScan.getQualifier(), groupExpression, logicalProperties);
this.logicalOlapScan = Objects.requireNonNull(logicalOlapScan, "logicalOlapScan can not be null");
this.deferMaterializeSlotIds = ImmutableSet.copyOf(Objects.requireNonNull(deferMaterializeSlotIds,
"deferMaterializeSlotIds can not be null"));
this.columnIdSlot = Objects.requireNonNull(columnIdSlot, "columnIdSlot can not be null");
}
public LogicalOlapScan getLogicalOlapScan() {
return logicalOlapScan;
}
public Set<ExprId> getDeferMaterializeSlotIds() {
return deferMaterializeSlotIds;
}
public SlotReference getColumnIdSlot() {
return columnIdSlot;
}
@Override
public OlapTable getTable() {
return logicalOlapScan.getTable();
}
@Override
public long getSelectedIndexId() {
return logicalOlapScan.getSelectedIndexId();
}
@Override
public List<Long> getSelectedPartitionIds() {
return logicalOlapScan.getSelectedPartitionIds();
}
@Override
public List<Long> getSelectedTabletIds() {
return logicalOlapScan.getSelectedPartitionIds();
}
@Override
public List<Slot> computeOutput() {
return ImmutableList.<Slot>builder()
.addAll(logicalOlapScan.getOutput())
.add(columnIdSlot)
.build();
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalDeferMaterializeOlapScan(logicalOlapScan, deferMaterializeSlotIds, columnIdSlot,
groupExpression, Optional.of(getLogicalProperties()));
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
Preconditions.checkArgument(children.isEmpty(), "LogicalDeferMaterializeOlapScan should have no child");
return new LogicalDeferMaterializeOlapScan(logicalOlapScan, deferMaterializeSlotIds, columnIdSlot,
groupExpression, logicalProperties);
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.isEmpty(), "LogicalDeferMaterializeOlapScan should have no child");
return this;
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitLogicalDeferMaterializeOlapScan(this, context);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
LogicalDeferMaterializeOlapScan that = (LogicalDeferMaterializeOlapScan) o;
return Objects.equals(logicalOlapScan, that.logicalOlapScan) && Objects.equals(
deferMaterializeSlotIds, that.deferMaterializeSlotIds) && Objects.equals(columnIdSlot,
that.columnIdSlot);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), logicalOlapScan, deferMaterializeSlotIds, columnIdSlot);
}
@Override
public String toString() {
return Utils.toSqlString("LogicalDeferMaterializeOlapScan[" + id.asInt() + "]",
"olapScan", logicalOlapScan,
"deferMaterializeSlotIds", deferMaterializeSlotIds,
"columnIdSlot", columnIdSlot
);
}
}

View File

@ -0,0 +1,146 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* use for defer materialize top n
*/
public class LogicalDeferMaterializeResultSink<CHILD_TYPE extends Plan>
extends LogicalSink<CHILD_TYPE> implements Sink {
private final LogicalResultSink<? extends Plan> logicalResultSink;
private final OlapTable olapTable;
private final long selectedIndexId;
public LogicalDeferMaterializeResultSink(LogicalResultSink<CHILD_TYPE> logicalResultSink,
OlapTable olapTable, long selectedIndexId) {
this(logicalResultSink, olapTable, selectedIndexId,
Optional.empty(), Optional.empty(), logicalResultSink.child());
}
public LogicalDeferMaterializeResultSink(LogicalResultSink<? extends Plan> logicalResultSink,
OlapTable olapTable, long selectedIndexId,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(logicalResultSink.getType(), groupExpression, logicalProperties, child);
this.logicalResultSink = logicalResultSink;
this.olapTable = olapTable;
this.selectedIndexId = selectedIndexId;
}
public LogicalResultSink<? extends Plan> getLogicalResultSink() {
return logicalResultSink;
}
public OlapTable getOlapTable() {
return olapTable;
}
public long getSelectedIndexId() {
return selectedIndexId;
}
@Override
public LogicalDeferMaterializeResultSink<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"LogicalDeferMaterializeResultSink only accepts one child");
return new LogicalDeferMaterializeResultSink<>(
logicalResultSink.withChildren(ImmutableList.of(children.get(0))),
olapTable, selectedIndexId, Optional.empty(), Optional.empty(), children.get(0));
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitLogicalDeferMaterializeResultSink(this, context);
}
@Override
public List<? extends Expression> getExpressions() {
return logicalResultSink.getExpressions();
}
@Override
public LogicalDeferMaterializeResultSink<Plan> withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalDeferMaterializeResultSink<>(logicalResultSink, olapTable, selectedIndexId,
groupExpression, Optional.of(getLogicalProperties()), child());
}
@Override
public LogicalDeferMaterializeResultSink<Plan> withGroupExprLogicalPropChildren(
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties,
List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"LogicalDeferMaterializeResultSink only accepts one child");
return new LogicalDeferMaterializeResultSink<>(
logicalResultSink.withChildren(ImmutableList.of(children.get(0))),
olapTable, selectedIndexId, groupExpression, logicalProperties, children.get(0));
}
@Override
public List<Slot> computeOutput() {
return child().getOutput();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
LogicalDeferMaterializeResultSink<?> that = (LogicalDeferMaterializeResultSink<?>) o;
return selectedIndexId == that.selectedIndexId && Objects.equals(logicalResultSink,
that.logicalResultSink) && Objects.equals(olapTable, that.olapTable);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), logicalResultSink, olapTable, selectedIndexId);
}
@Override
public String toString() {
return Utils.toSqlString("LogicalDeferMaterializeResultSink[" + id.asInt() + "]",
"logicalResultSink", logicalResultSink,
"olapTable", olapTable,
"selectedIndexId", selectedIndexId
);
}
}

View File

@ -0,0 +1,171 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.logical;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.TopN;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/**
* use for defer materialize top n
*/
public class LogicalDeferMaterializeTopN<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> implements TopN {
private final LogicalTopN<? extends Plan> logicalTopN;
///////////////////////////////////////////////////////////////////////////
// Members for defer materialize for top-n opt.
///////////////////////////////////////////////////////////////////////////
private final Set<ExprId> deferMaterializeSlotIds;
private final SlotReference columnIdSlot;
public LogicalDeferMaterializeTopN(LogicalTopN<CHILD_TYPE> logicalTopN,
Set<ExprId> deferMaterializeSlotIds, SlotReference columnIdSlot) {
super(PlanType.LOGICAL_TOP_N, logicalTopN.getGroupExpression(),
Optional.of(logicalTopN.getLogicalProperties()), logicalTopN.child());
this.logicalTopN = logicalTopN;
this.deferMaterializeSlotIds = deferMaterializeSlotIds;
this.columnIdSlot = columnIdSlot;
}
public LogicalDeferMaterializeTopN(LogicalTopN<? extends Plan> logicalTopN,
Set<ExprId> deferMaterializeSlotIds, SlotReference columnIdSlot,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(PlanType.LOGICAL_TOP_N, groupExpression, logicalProperties, child);
this.logicalTopN = logicalTopN;
this.deferMaterializeSlotIds = deferMaterializeSlotIds;
this.columnIdSlot = columnIdSlot;
}
public LogicalTopN<? extends Plan> getLogicalTopN() {
return logicalTopN;
}
public Set<ExprId> getDeferMaterializeSlotIds() {
return deferMaterializeSlotIds;
}
public SlotReference getColumnIdSlot() {
return columnIdSlot;
}
@Override
public List<OrderKey> getOrderKeys() {
return logicalTopN.getOrderKeys();
}
@Override
public long getOffset() {
return logicalTopN.getOffset();
}
@Override
public long getLimit() {
return logicalTopN.getLimit();
}
@Override
public List<? extends Expression> getExpressions() {
return ImmutableList.<Expression>builder()
.addAll(logicalTopN.getExpressions())
.add(columnIdSlot).build();
}
@Override
public List<Slot> computeOutput() {
return logicalTopN.getOutput().stream()
.filter(s -> !(s.getExprId().equals(columnIdSlot.getExprId())))
.collect(ImmutableList.toImmutableList());
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitLogicalDeferMaterializeTopN(this, context);
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalDeferMaterializeTopN<>(logicalTopN, deferMaterializeSlotIds, columnIdSlot,
groupExpression, Optional.of(getLogicalProperties()), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"LogicalDeferMaterializeTopN should have 1 child, but input is %s", children.size());
return new LogicalDeferMaterializeTopN<>(logicalTopN.withChildren(ImmutableList.of(children.get(0))),
deferMaterializeSlotIds, columnIdSlot, groupExpression, logicalProperties, children.get(0));
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"LogicalDeferMaterializeTopN should have 1 child, but input is %s", children.size());
return new LogicalDeferMaterializeTopN<>(logicalTopN.withChildren(ImmutableList.of(children.get(0))),
deferMaterializeSlotIds, columnIdSlot, Optional.empty(), Optional.empty(), children.get(0));
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
LogicalDeferMaterializeTopN<?> that = (LogicalDeferMaterializeTopN<?>) o;
return Objects.equals(logicalTopN, that.logicalTopN) && Objects.equals(deferMaterializeSlotIds,
that.deferMaterializeSlotIds) && Objects.equals(columnIdSlot, that.columnIdSlot);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), logicalTopN, deferMaterializeSlotIds, columnIdSlot);
}
@Override
public String toString() {
return Utils.toSqlString("LogicalDeferMaterializeTopN[" + id.asInt() + "]",
"logicalTopN", logicalTopN,
"deferMaterializeSlotIds", deferMaterializeSlotIds,
"columnIdSlot", columnIdSlot
);
}
}

View File

@ -141,7 +141,6 @@ public class LogicalOlapScan extends LogicalCatalogRelation implements OlapScan
List<Long> selectedTabletIds, long selectedIndexId, boolean indexSelected,
PreAggStatus preAggStatus, List<Long> specifiedPartitions,
List<String> hints, Map<String, Slot> cacheSlotWithSlotName) {
super(id, PlanType.LOGICAL_OLAP_SCAN, table, qualifier,
groupExpression, logicalProperties);
Preconditions.checkArgument(selectedPartitionIds != null, "selectedPartitionIds can not be null");

View File

@ -59,7 +59,7 @@ public class LogicalResultSink<CHILD_TYPE extends Plan> extends LogicalSink<CHIL
}
@Override
public Plan withChildren(List<Plan> children) {
public LogicalResultSink<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"LogicalResultSink's children size must be 1, but real is %s", children.size());
return new LogicalResultSink<>(outputExprs, children.get(0));
@ -76,14 +76,14 @@ public class LogicalResultSink<CHILD_TYPE extends Plan> extends LogicalSink<CHIL
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
public LogicalResultSink<Plan> withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalResultSink<>(outputExprs, groupExpression, Optional.of(getLogicalProperties()), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
public LogicalResultSink<Plan> withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "UnboundResultSink only accepts one child");
Preconditions.checkArgument(children.size() == 1, "LogicalResultSink only accepts one child");
return new LogicalResultSink<>(outputExprs, groupExpression, logicalProperties, children.get(0));
}

View File

@ -64,14 +64,17 @@ public class LogicalTopN<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYP
return child().getOutput();
}
@Override
public List<OrderKey> getOrderKeys() {
return orderKeys;
}
@Override
public long getOffset() {
return offset;
}
@Override
public long getLimit() {
return limit;
}
@ -93,7 +96,7 @@ public class LogicalTopN<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYP
if (o == null || getClass() != o.getClass()) {
return false;
}
LogicalTopN that = (LogicalTopN) o;
LogicalTopN<?> that = (LogicalTopN<?>) o;
return this.offset == that.offset && this.limit == that.limit && Objects.equals(this.orderKeys, that.orderKeys);
}
@ -104,7 +107,7 @@ public class LogicalTopN<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYP
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitLogicalTopN((LogicalTopN<Plan>) this, context);
return visitor.visitLogicalTopN(this, context);
}
@Override
@ -121,7 +124,8 @@ public class LogicalTopN<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYP
@Override
public LogicalTopN<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
Preconditions.checkArgument(children.size() == 1,
"LogicalTopN should have 1 child, but input is %s", children.size());
return new LogicalTopN<>(orderKeys, limit, offset, children.get(0));
}

View File

@ -0,0 +1,157 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.statistics.Statistics;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/**
* use for defer materialize top n
*/
public class PhysicalDeferMaterializeOlapScan extends PhysicalCatalogRelation implements OlapScan {
private final PhysicalOlapScan physicalOlapScan;
///////////////////////////////////////////////////////////////////////////
// Members for defer materialize for top-n opt.
///////////////////////////////////////////////////////////////////////////
private final Set<ExprId> deferMaterializeSlotIds;
private final SlotReference columnIdSlot;
public PhysicalDeferMaterializeOlapScan(PhysicalOlapScan physicalOlapScan,
Set<ExprId> deferMaterializeSlotIds, SlotReference columnIdSlot,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties) {
this(physicalOlapScan, deferMaterializeSlotIds, columnIdSlot, groupExpression, logicalProperties, null, null);
}
/**
* constructor
*/
public PhysicalDeferMaterializeOlapScan(PhysicalOlapScan physicalOlapScan,
Set<ExprId> deferMaterializeSlotIds, SlotReference columnIdSlot,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
PhysicalProperties physicalProperties, Statistics statistics) {
super(physicalOlapScan.getRelationId(), physicalOlapScan.getType(),
physicalOlapScan.getTable(), physicalOlapScan.getQualifier(),
groupExpression, logicalProperties, physicalProperties, statistics);
this.physicalOlapScan = physicalOlapScan;
this.deferMaterializeSlotIds = deferMaterializeSlotIds;
this.columnIdSlot = columnIdSlot;
}
public PhysicalOlapScan getPhysicalOlapScan() {
return physicalOlapScan;
}
public Set<ExprId> getDeferMaterializeSlotIds() {
return deferMaterializeSlotIds;
}
public SlotReference getColumnIdSlot() {
return columnIdSlot;
}
@Override
public OlapTable getTable() {
return physicalOlapScan.getTable();
}
@Override
public long getSelectedIndexId() {
return physicalOlapScan.getSelectedIndexId();
}
@Override
public List<Long> getSelectedPartitionIds() {
return physicalOlapScan.getSelectedPartitionIds();
}
@Override
public List<Long> getSelectedTabletIds() {
return physicalOlapScan.getSelectedTabletIds();
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitPhysicalDeferMaterializeOlapScan(this, context);
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalDeferMaterializeOlapScan(physicalOlapScan, deferMaterializeSlotIds, columnIdSlot,
groupExpression, getLogicalProperties(), physicalProperties, statistics);
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalDeferMaterializeOlapScan(physicalOlapScan, deferMaterializeSlotIds, columnIdSlot,
groupExpression, logicalProperties.get(), physicalProperties, statistics);
}
@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
return new PhysicalDeferMaterializeOlapScan(physicalOlapScan, deferMaterializeSlotIds, columnIdSlot,
groupExpression, getLogicalProperties(), physicalProperties, statistics);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
PhysicalDeferMaterializeOlapScan that = (PhysicalDeferMaterializeOlapScan) o;
return Objects.equals(physicalOlapScan, that.physicalOlapScan) && Objects.equals(
deferMaterializeSlotIds, that.deferMaterializeSlotIds) && Objects.equals(columnIdSlot,
that.columnIdSlot);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), physicalOlapScan, deferMaterializeSlotIds, columnIdSlot);
}
@Override
public String toString() {
return Utils.toSqlString("PhysicalDeferMaterializeOlapScan[" + id.asInt() + "]",
"physicalOlapScan", physicalOlapScan,
"deferMaterializeSlotIds", deferMaterializeSlotIds,
"columnIdSlot", columnIdSlot
);
}
}

View File

@ -0,0 +1,167 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.statistics.Statistics;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.jetbrains.annotations.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
/**
* use for defer materialize top n
*/
public class PhysicalDeferMaterializeResultSink<CHILD_TYPE extends Plan>
extends PhysicalSink<CHILD_TYPE> implements Sink {
private final PhysicalResultSink<? extends Plan> physicalResultSink;
private final OlapTable olapTable;
private final long selectedIndexId;
public PhysicalDeferMaterializeResultSink(PhysicalResultSink<? extends Plan> physicalResultSink,
OlapTable olapTable, long selectedIndexId,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
CHILD_TYPE child) {
this(physicalResultSink, olapTable, selectedIndexId,
groupExpression, logicalProperties, PhysicalProperties.GATHER, null, child);
}
public PhysicalDeferMaterializeResultSink(PhysicalResultSink<? extends Plan> physicalResultSink,
OlapTable olapTable, long selectedIndexId,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
@Nullable PhysicalProperties physicalProperties, Statistics statistics,
CHILD_TYPE child) {
super(physicalResultSink.getType(), groupExpression, logicalProperties, physicalProperties, statistics, child);
this.physicalResultSink = physicalResultSink;
this.olapTable = olapTable;
this.selectedIndexId = selectedIndexId;
}
public PhysicalResultSink<? extends Plan> getPhysicalResultSink() {
return physicalResultSink;
}
public OlapTable getOlapTable() {
return olapTable;
}
public long getSelectedIndexId() {
return selectedIndexId;
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"PhysicalDeferMaterializeResultSink's children size must be 1, but real is %s", children.size());
return new PhysicalDeferMaterializeResultSink<>(
physicalResultSink.withChildren(ImmutableList.of(children.get(0))),
olapTable, selectedIndexId, groupExpression, getLogicalProperties(),
physicalProperties, statistics, children.get(0));
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitPhysicalDeferMaterializeResultSink(this, context);
}
@Override
public List<? extends Expression> getExpressions() {
return physicalResultSink.getExpressions();
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalDeferMaterializeResultSink<>(physicalResultSink, olapTable, selectedIndexId,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"PhysicalDeferMaterializeResultSink's children size must be 1, but real is %s", children.size());
return new PhysicalDeferMaterializeResultSink<>(
physicalResultSink.withChildren(ImmutableList.of(children.get(0))),
olapTable, selectedIndexId, groupExpression, logicalProperties.get(),
physicalProperties, statistics, children.get(0));
}
@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
return new PhysicalDeferMaterializeResultSink<>(physicalResultSink, olapTable, selectedIndexId,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}
@Override
public List<Slot> computeOutput() {
return physicalResultSink.getOutputExprs().stream()
.map(NamedExpression::toSlot)
.collect(ImmutableList.toImmutableList());
}
@Override
public PhysicalDeferMaterializeResultSink<CHILD_TYPE> resetLogicalProperties() {
return new PhysicalDeferMaterializeResultSink<>(physicalResultSink, olapTable, selectedIndexId,
groupExpression, null, physicalProperties, statistics, child());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
PhysicalDeferMaterializeResultSink<?> that = (PhysicalDeferMaterializeResultSink<?>) o;
return selectedIndexId == that.selectedIndexId && Objects.equals(physicalResultSink,
that.physicalResultSink) && Objects.equals(olapTable, that.olapTable);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), physicalResultSink, olapTable, selectedIndexId);
}
@Override
public String toString() {
return Utils.toSqlString("PhysicalDeferMaterializeResultSink[" + id.asInt() + "]",
"physicalResultSink", physicalResultSink,
"olapTable", olapTable,
"selectedIndexId", selectedIndexId
);
}
}

View File

@ -0,0 +1,176 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.physical;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.ExprId;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.TopN;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.statistics.Statistics;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
/**
* use for defer materialize top n
*/
public class PhysicalDeferMaterializeTopN<CHILD_TYPE extends Plan>
extends AbstractPhysicalSort<CHILD_TYPE> implements TopN {
private final PhysicalTopN<? extends Plan> physicalTopN;
///////////////////////////////////////////////////////////////////////////
// Members for defer materialize for top-n opt.
///////////////////////////////////////////////////////////////////////////
private final Set<ExprId> deferMaterializeSlotIds;
private final SlotReference columnIdSlot;
public PhysicalDeferMaterializeTopN(PhysicalTopN<? extends Plan> physicalTopN,
Set<ExprId> deferMaterializeSlotIds, SlotReference columnIdSlot,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) {
this(physicalTopN, deferMaterializeSlotIds, columnIdSlot,
groupExpression, logicalProperties, null, null, child);
}
public PhysicalDeferMaterializeTopN(PhysicalTopN<? extends Plan> physicalTopN,
Set<ExprId> deferMaterializeSlotIds, SlotReference columnIdSlot,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) {
super(physicalTopN.getType(), physicalTopN.getOrderKeys(), physicalTopN.getSortPhase(),
groupExpression, logicalProperties, physicalProperties, statistics, child);
this.physicalTopN = physicalTopN;
this.deferMaterializeSlotIds = deferMaterializeSlotIds;
this.columnIdSlot = columnIdSlot;
}
public PhysicalTopN<? extends Plan> getPhysicalTopN() {
return physicalTopN;
}
public Set<ExprId> getDeferMaterializeSlotIds() {
return deferMaterializeSlotIds;
}
public SlotReference getColumnIdSlot() {
return columnIdSlot;
}
@Override
public long getOffset() {
return physicalTopN.getOffset();
}
@Override
public long getLimit() {
return physicalTopN.getLimit();
}
public Plan withPhysicalTopN(PhysicalTopN<? extends Plan> physicalTopN) {
return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot, groupExpression,
getLogicalProperties(), physicalProperties, statistics, physicalTopN.child());
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"PhysicalDeferMaterializeTopN's children size must be 1, but real is %s", children.size());
return new PhysicalDeferMaterializeTopN<>(physicalTopN.withChildren(ImmutableList.of(children.get(0))),
deferMaterializeSlotIds, columnIdSlot, groupExpression, getLogicalProperties(),
physicalProperties, statistics, children.get(0));
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitPhysicalDeferMaterializeTopN(this, context);
}
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"PhysicalDeferMaterializeTopN's children size must be 1, but real is %s", children.size());
return new PhysicalDeferMaterializeTopN<>(physicalTopN.withChildren(ImmutableList.of(children.get(0))),
deferMaterializeSlotIds, columnIdSlot, groupExpression, logicalProperties.get(),
physicalProperties, statistics, children.get(0));
}
@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot,
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
}
@Override
public List<Slot> computeOutput() {
return child().getOutput();
}
@Override
public PhysicalDeferMaterializeTopN<CHILD_TYPE> resetLogicalProperties() {
return new PhysicalDeferMaterializeTopN<>(physicalTopN, deferMaterializeSlotIds, columnIdSlot,
groupExpression, null, physicalProperties, statistics, child());
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
PhysicalDeferMaterializeTopN<?> that = (PhysicalDeferMaterializeTopN<?>) o;
return Objects.equals(physicalTopN, that.physicalTopN) && Objects.equals(
deferMaterializeSlotIds, that.deferMaterializeSlotIds) && Objects.equals(columnIdSlot,
that.columnIdSlot);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), physicalTopN, deferMaterializeSlotIds, columnIdSlot);
}
@Override
public String toString() {
return Utils.toSqlString("PhysicalDeferMaterializeTopN[" + id.asInt() + "]",
"physicalTopN", physicalTopN,
"deferMaterializeSlotIds", deferMaterializeSlotIds,
"columnIdSlot", columnIdSlot
);
}
}

View File

@ -53,10 +53,7 @@ public class PhysicalFileSink<CHILD_TYPE extends Plan> extends PhysicalSink<CHIL
public PhysicalFileSink(String filePath, String format, Map<String, String> properties,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_FILE_SINK, groupExpression, logicalProperties, child);
this.filePath = filePath;
this.format = format;
this.properties = properties;
this(filePath, format, properties, groupExpression, logicalProperties, PhysicalProperties.GATHER, null, child);
}
public PhysicalFileSink(String filePath, String format, Map<String, String> properties,

View File

@ -45,8 +45,6 @@ import java.util.Optional;
*/
public class PhysicalOlapScan extends PhysicalCatalogRelation implements OlapScan {
public static final String DEFERRED_MATERIALIZED_SLOTS = "deferred_materialized_slots";
private final DistributionSpec distributionSpec;
private final long selectedIndexId;
private final ImmutableList<Long> selectedTabletIds;

View File

@ -57,26 +57,14 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
private final boolean singleReplicaLoad;
private final boolean isPartialUpdate;
public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds,
List<Column> cols, boolean singleReplicaLoad, boolean isPartialUpdate, LogicalProperties logicalProperties,
CHILD_TYPE child) {
this(database, targetTable, partitionIds, cols, singleReplicaLoad, isPartialUpdate,
Optional.empty(), logicalProperties, child);
}
/**
* Constructor
*/
public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds,
List<Column> cols, boolean singleReplicaLoad, boolean isPartialUpdate,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) {
super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child);
this.database = Objects.requireNonNull(database, "database != null in PhysicalOlapTableSink");
this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in PhysicalOlapTableSink");
this.cols = Utils.copyRequiredList(cols);
this.partitionIds = Utils.copyRequiredList(partitionIds);
this.singleReplicaLoad = singleReplicaLoad;
this.isPartialUpdate = isPartialUpdate;
public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols,
boolean singleReplicaLoad, boolean isPartialUpdate, Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties, CHILD_TYPE child) {
this(database, targetTable, partitionIds, cols, singleReplicaLoad, isPartialUpdate,
groupExpression, logicalProperties, PhysicalProperties.GATHER, null, child);
}
/**

View File

@ -24,7 +24,6 @@ import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.algebra.Relation;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.statistics.Statistics;
import com.google.common.collect.ImmutableList;
@ -76,11 +75,6 @@ public abstract class PhysicalRelation extends PhysicalLeaf implements Relation
return Objects.hash(relationId);
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitPhysicalRelation(this, context);
}
@Override
public List<? extends Expression> getExpressions() {
return ImmutableList.of();

View File

@ -45,16 +45,9 @@ public class PhysicalResultSink<CHILD_TYPE extends Plan> extends PhysicalSink<CH
private final List<NamedExpression> outputExprs;
public PhysicalResultSink(List<NamedExpression> outputExprs, LogicalProperties logicalProperties,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_RESULT_SINK, logicalProperties, child);
this.outputExprs = outputExprs;
}
public PhysicalResultSink(List<NamedExpression> outputExprs, Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties, CHILD_TYPE child) {
super(PlanType.PHYSICAL_RESULT_SINK, groupExpression, logicalProperties, child);
this.outputExprs = outputExprs;
this(outputExprs, groupExpression, logicalProperties, PhysicalProperties.GATHER, null, child);
}
public PhysicalResultSink(List<NamedExpression> outputExprs, Optional<GroupExpression> groupExpression,
@ -64,11 +57,16 @@ public class PhysicalResultSink<CHILD_TYPE extends Plan> extends PhysicalSink<CH
this.outputExprs = outputExprs;
}
public List<NamedExpression> getOutputExprs() {
return outputExprs;
}
@Override
public PhysicalResultSink<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"PhysicalResultSink's children size must be 1, but real is %s", children.size());
return new PhysicalResultSink<>(outputExprs, groupExpression, getLogicalProperties(), children.get(0));
return new PhysicalResultSink<>(outputExprs, groupExpression, getLogicalProperties(),
physicalProperties, statistics, children.get(0));
}
@Override
@ -83,13 +81,17 @@ public class PhysicalResultSink<CHILD_TYPE extends Plan> extends PhysicalSink<CH
@Override
public PhysicalResultSink<Plan> withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalResultSink<>(outputExprs, groupExpression, getLogicalProperties(), child());
return new PhysicalResultSink<>(outputExprs, groupExpression, getLogicalProperties(),
physicalProperties, statistics, child());
}
@Override
public PhysicalResultSink<Plan> withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalResultSink<>(outputExprs, groupExpression, logicalProperties.get(), child());
Preconditions.checkArgument(children.size() == 1,
"PhysicalResultSink's children size must be 1, but real is %s", children.size());
return new PhysicalResultSink<>(outputExprs, groupExpression, logicalProperties.get(),
physicalProperties, statistics, children.get(0));
}
@Override

View File

@ -30,18 +30,6 @@ import java.util.Optional;
/** abstract physical sink */
public abstract class PhysicalSink<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_TYPE> {
public PhysicalSink(PlanType type,
LogicalProperties logicalProperties, CHILD_TYPE child) {
super(type, logicalProperties, child);
}
public PhysicalSink(PlanType type,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties, CHILD_TYPE child) {
super(type, groupExpression, logicalProperties, child);
}
public PhysicalSink(PlanType type,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,

View File

@ -109,7 +109,8 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
@Override
public PhysicalTopN<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
Preconditions.checkArgument(children.size() == 1,
"PhysicalTopN's children size must be 1, but real is %s", children.size());
return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression,
getLogicalProperties(), physicalProperties, statistics, children.get(0));
}
@ -122,7 +123,8 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends AbstractPhysicalSort<
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
Preconditions.checkArgument(children.size() == 1,
"PhysicalTopN's children size must be 1, but real is %s", children.size());
return new PhysicalTopN<>(orderKeys, limit, offset, phase, groupExpression, logicalProperties.get(),
children.get(0));
}

View File

@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalExcept;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalGenerate;
@ -53,6 +54,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEAnchor;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEConsumer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCTEProducer;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalExcept;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
@ -225,6 +227,10 @@ public abstract class PlanVisitor<R, C> implements CommandVisitor<R, C>, Relatio
return visit(topN, context);
}
public R visitLogicalDeferMaterializeTopN(LogicalDeferMaterializeTopN<? extends Plan> topN, C context) {
return visit(topN, context);
}
public R visitLogicalWindow(LogicalWindow<? extends Plan> window, C context) {
return visit(window, context);
}
@ -323,6 +329,10 @@ public abstract class PlanVisitor<R, C> implements CommandVisitor<R, C>, Relatio
return visitAbstractPhysicalSort(topN, context);
}
public R visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan> topN, C context) {
return visitAbstractPhysicalSort(topN, context);
}
public R visitPhysicalWindow(PhysicalWindow<? extends Plan> window, C context) {
return visit(window, context);
}

View File

@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.visitor;
import org.apache.doris.nereids.analyzer.UnboundOneRowRelation;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalEsScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
@ -29,6 +30,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
@ -92,6 +94,11 @@ public interface RelationVisitor<R, C> {
return visitLogicalRelation(olapScan, context);
}
default R visitLogicalDeferMaterializeOlapScan(
LogicalDeferMaterializeOlapScan deferMaterializeOlapScan, C context) {
return visitLogicalRelation(deferMaterializeOlapScan, context);
}
default R visitLogicalOneRowRelation(LogicalOneRowRelation oneRowRelation, C context) {
return visitLogicalRelation(oneRowRelation, context);
}
@ -128,6 +135,11 @@ public interface RelationVisitor<R, C> {
return visitPhysicalRelation(olapScan, context);
}
default R visitPhysicalDeferMaterializeOlapScan(
PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, C context) {
return visitPhysicalRelation(deferMaterializeOlapScan, context);
}
default R visitPhysicalOneRowRelation(PhysicalOneRowRelation oneRowRelation, C context) {
return visitPhysicalRelation(oneRowRelation, context);
}

View File

@ -20,10 +20,12 @@ package org.apache.doris.nereids.trees.plans.visitor;
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.analyzer.UnboundResultSink;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeResultSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalFileSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
@ -70,6 +72,11 @@ public interface SinkVisitor<R, C> {
return visitLogicalSink(logicalResultSink, context);
}
default R visitLogicalDeferMaterializeResultSink(
LogicalDeferMaterializeResultSink<? extends Plan> logicalDeferMaterializeResultSink, C context) {
return visitLogicalSink(logicalDeferMaterializeResultSink, context);
}
// *******************************
// physical
// *******************************
@ -85,4 +92,9 @@ public interface SinkVisitor<R, C> {
default R visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, C context) {
return visitPhysicalSink(physicalResultSink, context);
}
default R visitPhysicalDeferMaterializeResultSink(
PhysicalDeferMaterializeResultSink<? extends Plan> sink, C context) {
return visitPhysicalSink(sink, context);
}
}

View File

@ -147,6 +147,10 @@ public class ExchangeNode extends PlanNode {
}
}
public SortInfo getMergeInfo() {
return mergeInfo;
}
/**
* Set the parameters used to merge sorted input streams. This can be called
* after init().

View File

@ -1281,33 +1281,12 @@ public class OlapScanNode extends ScanNode {
return shouldColoScan;
}
public void getColumnDesc(List<TColumn> columnsDesc, List<String> keyColumnNames,
List<TPrimitiveType> keyColumnTypes) {
if (selectedIndexId != -1) {
for (Column col : olapTable.getSchemaByIndexId(selectedIndexId, true)) {
TColumn tColumn = col.toThrift();
col.setIndexFlag(tColumn, olapTable);
if (columnsDesc != null) {
columnsDesc.add(tColumn);
}
if ((Util.showHiddenColumns() || (!Util.showHiddenColumns() && col.isVisible())) && col.isKey()) {
if (keyColumnNames != null) {
keyColumnNames.add(col.getName());
}
if (keyColumnTypes != null) {
keyColumnTypes.add(col.getDataType().toThrift());
}
}
}
}
}
@Override
protected void toThrift(TPlanNode msg) {
List<String> keyColumnNames = new ArrayList<String>();
List<TPrimitiveType> keyColumnTypes = new ArrayList<TPrimitiveType>();
List<TColumn> columnsDesc = new ArrayList<TColumn>();
getColumnDesc(columnsDesc, keyColumnNames, keyColumnTypes);
olapTable.getColumnDesc(selectedIndexId, columnsDesc, keyColumnNames, keyColumnTypes);
List<TOlapTableIndex> indexDesc = Lists.newArrayList();
// Add extra row id column

View File

@ -51,7 +51,6 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
import org.apache.doris.statistics.query.StatsDelta;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TFetchOption;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TRuntimeFilterMode;
@ -501,17 +500,7 @@ public class OriginalPlanner extends Planner {
}
for (PlanFragment fragment : fragments) {
if (injected && fragment.getSink() instanceof ResultSink) {
TFetchOption fetchOption = new TFetchOption();
fetchOption.setFetchRowStore(olapTable.storeRowColumn());
fetchOption.setUseTwoPhaseFetch(true);
fetchOption.setNodesInfo(Env.getCurrentSystemInfo().createAliveNodesInfo());
// TODO for row store used seperate more faster path for wide tables
if (!olapTable.storeRowColumn()) {
// Set column desc for each column
List<TColumn> columnsDesc = new ArrayList<TColumn>();
scanNode.getColumnDesc(columnsDesc, null, null);
fetchOption.setColumnDesc(columnsDesc);
}
TFetchOption fetchOption = olapTable.generateTwoPhaseReadOption(scanNode.getSelectedIndexId());
((ResultSink) fragment.getSink()).setFetchOption(fetchOption);
break;
}

View File

@ -19,6 +19,8 @@ package org.apache.doris.nereids.postprocess;
import org.apache.doris.nereids.datasets.ssb.SSBTestBase;
import org.apache.doris.nereids.processor.post.PlanPostProcessors;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.nereids.util.PlanChecker;
@ -39,10 +41,12 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
.rewrite()
.implement();
PhysicalPlan plan = checker.getPhysicalPlan();
plan = new PlanPostProcessors(checker.getCascadesContext()).process(plan);
Assertions.assertTrue(plan.children().get(0).child(0) instanceof PhysicalTopN);
PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0).child(0);
Assertions.assertTrue(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
new PlanPostProcessors(checker.getCascadesContext()).process(plan);
Assertions.assertTrue(plan.children().get(0).child(0) instanceof PhysicalDeferMaterializeTopN);
PhysicalDeferMaterializeTopN<? extends Plan> localTopN
= (PhysicalDeferMaterializeTopN<? extends Plan>) plan.child(0).child(0);
Assertions.assertTrue(localTopN.getPhysicalTopN()
.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
}
// topn rf do not apply on string-like and float column
@ -53,9 +57,11 @@ public class TopNRuntimeFilterTest extends SSBTestBase {
.rewrite()
.implement();
PhysicalPlan plan = checker.getPhysicalPlan();
plan = new PlanPostProcessors(checker.getCascadesContext()).process(plan);
Assertions.assertTrue(plan.children().get(0) instanceof PhysicalTopN);
PhysicalTopN localTopN = (PhysicalTopN) plan.children().get(0);
Assertions.assertFalse(localTopN.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
new PlanPostProcessors(checker.getCascadesContext()).process(plan);
Assertions.assertTrue(plan.children().get(0).child(0) instanceof PhysicalDeferMaterializeTopN);
PhysicalDeferMaterializeTopN<? extends Plan> localTopN
= (PhysicalDeferMaterializeTopN<? extends Plan>) plan.child(0).child(0);
Assertions.assertFalse(localTopN.getPhysicalTopN()
.getMutableState(PhysicalTopN.TOPN_RUNTIME_FILTER).isPresent());
}
}