[fix](Nereids) store offset of Limit in exchangeNode (#17548)

When the limit has offset, we should add an exchangeNode and store the offset in it
This commit is contained in:
谢健
2023-03-09 13:43:12 +08:00
committed by GitHub
parent 2d027282f3
commit e1ea2e1f2c
4 changed files with 51 additions and 28 deletions

View File

@ -1470,13 +1470,20 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
// Union contains oneRowRelation
if (inputFragment == null) {
return inputFragment;
return null;
}
// For case globalLimit(l, o) -> LocalLimit(l+o, 0), that is the LocalLimit has already gathered
// The globalLimit can overwrite the limit and offset, so it's still correct
PlanNode child = inputFragment.getPlanRoot();
child.setLimit(physicalLimit.getLimit());
// This case means GlobalLimit's child isn't gatherNode, which suggests the child is UNPARTITIONED
// When there is valid offset, exchangeNode should be added because other node don't support offset
if (physicalLimit.isGlobal() && physicalLimit.hasValidOffset()
&& !(child instanceof ExchangeNode)) {
inputFragment = createParentFragment(inputFragment, DataPartition.UNPARTITIONED, context);
child = inputFragment.getPlanRoot();
}
child.setOffset(physicalLimit.getOffset());
child.setLimit(physicalLimit.getLimit());
return inputFragment;
}

View File

@ -95,7 +95,7 @@ public class PushdownLimit implements RewriteRuleFactory {
return topN;
}).toRule(RuleType.PUSH_LIMIT_INTO_SORT),
logicalLimit(logicalOneRowRelation())
.then(limit -> limit.getLimit() > 0
.then(limit -> limit.getLimit() > 0 && limit.getOffset() == 0
? limit.child() : new LogicalEmptyRelation(limit.child().getOutput()))
.toRule(RuleType.PUSH_LIMIT_THROUGH_ONE_ROW_RELATION),
logicalLimit(logicalEmptyRelation())

View File

@ -67,10 +67,6 @@ public class ExchangeNode extends PlanNode {
// exchange node. Null if this exchange does not merge sorted streams
private SortInfo mergeInfo;
// Offset after which the exchange begins returning rows. Currently valid
// only if mergeInfo_ is non-null, i.e. this is a merging exchange node.
private long offset;
/**
* Create ExchangeNode that consumes output of inputNode.
* An ExchangeNode doesn't have an input node as a child, which is why we
@ -145,25 +141,6 @@ public class ExchangeNode extends PlanNode {
: MERGING_EXCHANGE_NODE;
}
/**
* This function is used to translate PhysicalLimit.
* Ignore the offset if this is not a merging exchange node.
* @param offset
*/
public void setOffset(long offset) {
if (isMergingExchange()) {
this.offset = offset;
}
}
/**
* Used by new optimizer only.
*/
@Override
public void setOffSetDirectly(long offset) {
this.offset = offset;
}
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.EXCHANGE_NODE;

View File

@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_limit") {
sql 'set enable_nereids_planner=true'
sql 'set enable_fallback_to_original_planner=false'
sql """
drop table if exists test1
"""
sql """
CREATE TABLE IF NOT EXISTS test1(
id int
)
DISTRIBUTED BY HASH(id) properties("replication_num" = "1");
"""
sql """ insert into test1 values(1) """
sql """ insert into test1 values(1) """
test {
sql "select * from test1 limit 2 offset 1"
result([[1]])
}
}