[fix](Nereids): fix offset in PlanTranslator (#29789)

Current BE operator don't support `offset`, we need add offset into `ExchangeNode`
This commit is contained in:
jakevin
2024-01-15 17:34:08 +08:00
committed by yiguolei
parent 8433169e5e
commit 0ea5f0f5de
4 changed files with 61 additions and 9 deletions

View File

@ -268,13 +268,14 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
@Override
public PlanFragment visitPhysicalDistribute(PhysicalDistribute<? extends Plan> distribute,
PlanTranslatorContext context) {
PlanFragment inputFragment = distribute.child().accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(distribute.child());
Plan child = distribute.child();
PlanFragment inputFragment = child.accept(this, context);
List<List<Expr>> distributeExprLists = getDistributeExprs(child);
// TODO: why need set streaming here? should remove this.
if (inputFragment.getPlanRoot() instanceof AggregationNode
&& distribute.child() instanceof PhysicalHashAggregate
&& context.getFirstAggregateInFragment(inputFragment) == distribute.child()) {
PhysicalHashAggregate<?> hashAggregate = (PhysicalHashAggregate<?>) distribute.child();
&& child instanceof PhysicalHashAggregate
&& context.getFirstAggregateInFragment(inputFragment) == child) {
PhysicalHashAggregate<?> hashAggregate = (PhysicalHashAggregate<?>) child;
if (hashAggregate.getAggPhase() == AggPhase.LOCAL
&& hashAggregate.getAggMode() == AggMode.INPUT_TO_BUFFER) {
AggregationNode aggregationNode = (AggregationNode) inputFragment.getPlanRoot();
@ -285,23 +286,26 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
ExchangeNode exchangeNode = new ExchangeNode(context.nextPlanNodeId(), inputFragment.getPlanRoot());
updateLegacyPlanIdToPhysicalPlan(exchangeNode, distribute);
List<ExprId> validOutputIds = distribute.getOutputExprIds();
if (distribute.child() instanceof PhysicalHashAggregate) {
if (child instanceof PhysicalHashAggregate) {
// we must add group by keys to output list,
// otherwise we could not process aggregate's output without group by keys
List<ExprId> keys = ((PhysicalHashAggregate<?>) distribute.child()).getGroupByExpressions().stream()
List<ExprId> keys = ((PhysicalHashAggregate<?>) child).getGroupByExpressions().stream()
.filter(SlotReference.class::isInstance)
.map(SlotReference.class::cast)
.map(SlotReference::getExprId)
.collect(Collectors.toList());
keys.addAll(validOutputIds);
validOutputIds = keys;
} else if (child instanceof PhysicalLimit && ((PhysicalLimit<?>) child).getPhase().isGlobal()) {
// because sort already contains Offset, we don't need to handle PhysicalTopN
exchangeNode.setOffset(((PhysicalLimit<?>) child).getOffset());
}
if (inputFragment instanceof MultiCastPlanFragment) {
// TODO: remove this logic when we split to multi-window in logical window to physical window conversion
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) inputFragment.getSink();
DataStreamSink dataStreamSink = multiCastDataSink.getDataStreamSinks().get(
multiCastDataSink.getDataStreamSinks().size() - 1);
if (!(distribute.child() instanceof PhysicalProject)) {
if (!(child instanceof PhysicalProject)) {
List<Expr> projectionExprs = new ArrayList<>();
PhysicalCTEConsumer consumer = getCTEConsumerChild(distribute);
Preconditions.checkState(consumer != null, "consumer not found");
@ -1591,7 +1595,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
PlanFragment inputFragment = physicalLimit.child(0).accept(this, context);
PlanNode child = inputFragment.getPlanRoot();
child.setLimit(MergeLimits.mergeLimit(physicalLimit.getLimit(), physicalLimit.getOffset(), child.getLimit()));
child.setOffset(MergeLimits.mergeOffset(physicalLimit.getOffset(), child.getOffset()));
// TODO: plan node don't support limit
// child.setOffset(MergeLimits.mergeOffset(physicalLimit.getOffset(), child.getOffset()));
updateLegacyPlanIdToPhysicalPlan(child, physicalLimit);
return inputFragment;
}

View File

@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.processor.post;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.properties.DistributionSpecGather;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
/**
* Offset just can be in exchangeNode.
* So, `offset` action is after `limit` action.
* So, `limit` should update with `offset + limit`
*/
public class AddOffsetIntoDistribute extends PlanPostProcessor {
@Override
public Plan visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, CascadesContext context) {
if (limit.getPhase().isLocal() || limit.getOffset() == 0) {
return limit;
}
return new PhysicalDistribute<>(DistributionSpecGather.INSTANCE,
limit.withLimit(limit.getLimit() + limit.getOffset()));
}
}

View File

@ -61,6 +61,7 @@ public class PlanPostProcessors {
builder.add(new PushDownFilterThroughProject());
builder.add(new MergeProjectPostProcessor());
builder.add(new RecomputeLogicalPropertiesProcessor());
builder.add(new AddOffsetIntoDistribute());
builder.add(new TopNScanOpt());
// after generate rf, DO NOT replace PLAN NODE
builder.add(new FragmentProcessor());

View File

@ -101,6 +101,11 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_
return phase == LimitPhase.GLOBAL;
}
public Plan withLimit(long limit) {
return new PhysicalLimit<>(limit, offset, phase, groupExpression, getLogicalProperties(),
physicalProperties, statistics, children.get(0));
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);