[Fix](topn opt) only allow duplicate key or MOW model to use 2 phase read opt in nereids planner (#24485)

The fetch phase is not support aggregation at present
This commit is contained in:
lihangyu
2023-09-16 10:01:36 +08:00
committed by GitHub
parent 4dad7c94da
commit 81b6ab9b68
3 changed files with 84 additions and 0 deletions

View File

@ -60,6 +60,7 @@ public class DeferMaterializeTopNResult implements RewriteRuleFactory {
.when(r -> r.child().getOrderKeys().stream().map(OrderKey::getExpr)
.allMatch(Expression::isColumnFromTable))
.when(r -> r.child().child().getTable().getEnableLightSchemaChange())
.when(r -> r.child().child().getTable().isDupKeysOrMergeOnWrite())
.then(r -> deferMaterialize(r, r.child(), Optional.empty(), r.child().child()))
),
RuleType.DEFER_MATERIALIZE_TOP_N_RESULT.build(
@ -69,6 +70,7 @@ public class DeferMaterializeTopNResult implements RewriteRuleFactory {
.when(r -> r.child().getOrderKeys().stream().map(OrderKey::getExpr)
.allMatch(Expression::isColumnFromTable))
.when(r -> r.child().child().child().getTable().getEnableLightSchemaChange())
.when(r -> r.child().child().child().getTable().isDupKeysOrMergeOnWrite())
.then(r -> {
LogicalFilter<LogicalOlapScan> filter = r.child().child();
return deferMaterialize(r, r.child(), Optional.of(filter), filter.child());

View File

@ -101,6 +101,10 @@ suite("sort") {
qt_sql_orderby_non_overlap_desc """ select * from sort_non_overlap order by time_period desc limit 4; """
// test topn 2phase opt with light schema change
sql """set topn_opt_limit_threshold = 1024"""
sql """set enable_two_phase_read_opt= true"""
sql """ DROP TABLE if exists `sort_default_value`; """
sql """ CREATE TABLE `sort_default_value` (
`k1` int NOT NULL
@ -118,4 +122,8 @@ suite("sort") {
sql "insert into sort_default_value values (3, 0)"
sql "insert into sort_default_value values (4, null)"
qt_sql "select * from sort_default_value order by k1 limit 10"
explain {
sql("select * from sort_default_value order by k1 limit 10")
contains "OPT TWO PHASE"
}
}

View File

@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("topn_2pr_rule") {
sql """set topn_opt_limit_threshold = 1024"""
sql """set enable_two_phase_read_opt= true"""
def create_table = { table_name, key_type="DUPLICATE" ->
sql "DROP TABLE IF EXISTS ${table_name}"
value_type = "v string"
if ("${key_type}" == "AGGREGATE") {
value_type = "v string REPLACE_IF_NOT_NULL NULL"
}
sql """
CREATE TABLE IF NOT EXISTS ${table_name} (
k bigint,
${value_type}
)
${key_type} KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 1
properties("replication_num" = "1", "disable_auto_compaction" = "false");
"""
}
def verify = { table_name, key_type->
if("${key_type}" == "DUPLICATE") {
explain {
sql("select * from ${table_name} order by k limit 1;")
contains "OPT TWO PHASE"
}
explain {
sql("select * from ${table_name} where k = 1 order by k limit 1;")
contains "OPT TWO PHASE"
}
explain {
sql("select * from ${table_name} where k order by k + 1 limit 1;")
notContains "OPT TWO PHASE"
}
} else if("${key_type}" == "UNIQUE") {
explain {
sql("select * from ${table_name} order by k limit 1;")
notContains "OPT TWO PHASE"
}
} else if("${key_type}" == "AGGREGATE") {
explain {
sql("select * from ${table_name} order by k limit 1;")
notContains "OPT TWO PHASE"
}
}
}
def key_types = ["DUPLICATE", "UNIQUE", "AGGREGATE"]
for (int i = 0; i < key_types.size(); i++) {
def table_name = "topn_2pr_rule_${key_types[i]}"
create_table.call(table_name, key_types[i])
sql """insert into ${table_name} values(1, "1")"""
sql """insert into ${table_name} values(2, "2")"""
sql """insert into ${table_name} values(3, "3")"""
verify.call(table_name, key_types[i])
}
}