[Feature](Nereids): support cte for update and delete statements of Nereids (#23384)
This commit is contained in:
@ -45,11 +45,11 @@ statement
|
||||
(WITH LABEL labelName=identifier)? cols=identifierList? // label and columns define
|
||||
(LEFT_BRACKET hints=identifierSeq RIGHT_BRACKET)? // hint define
|
||||
query #insertIntoQuery
|
||||
| explain? UPDATE tableName=multipartIdentifier tableAlias
|
||||
| explain? cte? UPDATE tableName=multipartIdentifier tableAlias
|
||||
SET updateAssignmentSeq
|
||||
fromClause?
|
||||
whereClause #update
|
||||
| explain? DELETE FROM tableName=multipartIdentifier tableAlias
|
||||
| explain? cte? DELETE FROM tableName=multipartIdentifier tableAlias
|
||||
(PARTITION partition=identifierList)?
|
||||
(USING relation (COMMA relation)*)
|
||||
whereClause #delete
|
||||
|
||||
@ -394,8 +394,12 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
|
||||
if (ctx.tableAlias().strictIdentifier() != null) {
|
||||
tableAlias = ctx.tableAlias().getText();
|
||||
}
|
||||
Optional<LogicalPlan> cte = Optional.empty();
|
||||
if (ctx.cte() != null) {
|
||||
cte = Optional.ofNullable(withCte(query, ctx.cte()));
|
||||
}
|
||||
return withExplain(new UpdateCommand(visitMultipartIdentifier(ctx.tableName), tableAlias,
|
||||
visitUpdateAssignmentSeq(ctx.updateAssignmentSeq()), query), ctx.explain());
|
||||
visitUpdateAssignmentSeq(ctx.updateAssignmentSeq()), query, cte), ctx.explain());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -412,8 +416,11 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
|
||||
if (ctx.tableAlias().strictIdentifier() != null) {
|
||||
tableAlias = ctx.tableAlias().getText();
|
||||
}
|
||||
return withExplain(new DeleteCommand(tableName, tableAlias, partitions, query),
|
||||
ctx.explain());
|
||||
Optional<LogicalPlan> cte = Optional.empty();
|
||||
if (ctx.cte() != null) {
|
||||
cte = Optional.ofNullable(withCte(query, ctx.cte()));
|
||||
}
|
||||
return withExplain(new DeleteCommand(tableName, tableAlias, partitions, query, cte), ctx.explain());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -53,17 +53,19 @@ public class DeleteCommand extends Command implements ForwardWithSync, Explainab
|
||||
private final List<String> partitions;
|
||||
private LogicalPlan logicalQuery;
|
||||
private OlapTable targetTable;
|
||||
private final Optional<LogicalPlan> cte;
|
||||
|
||||
/**
|
||||
* constructor
|
||||
*/
|
||||
public DeleteCommand(List<String> nameParts, String tableAlias, List<String> partitions,
|
||||
LogicalPlan logicalQuery) {
|
||||
LogicalPlan logicalQuery, Optional<LogicalPlan> cte) {
|
||||
super(PlanType.DELETE_COMMAND);
|
||||
this.nameParts = Utils.copyRequiredList(nameParts);
|
||||
this.tableAlias = tableAlias;
|
||||
this.partitions = Utils.copyRequiredList(partitions);
|
||||
this.logicalQuery = logicalQuery;
|
||||
this.cte = cte;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -110,13 +112,13 @@ public class DeleteCommand extends Command implements ForwardWithSync, Explainab
|
||||
}
|
||||
|
||||
logicalQuery = new LogicalProject<>(selectLists, logicalQuery);
|
||||
|
||||
boolean isPartialUpdate = false;
|
||||
if (targetTable.getEnableUniqueKeyMergeOnWrite()
|
||||
&& cols.size() < targetTable.getColumns().size()) {
|
||||
isPartialUpdate = true;
|
||||
if (cte.isPresent()) {
|
||||
logicalQuery = ((LogicalPlan) cte.get().withChildren(logicalQuery));
|
||||
}
|
||||
|
||||
boolean isPartialUpdate = targetTable.getEnableUniqueKeyMergeOnWrite()
|
||||
&& cols.size() < targetTable.getColumns().size();
|
||||
|
||||
// make UnboundTableSink
|
||||
return new UnboundOlapTableSink<>(nameParts, cols, ImmutableList.of(),
|
||||
partitions, isPartialUpdate, logicalQuery);
|
||||
|
||||
@ -71,17 +71,19 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab
|
||||
private final @Nullable String tableAlias;
|
||||
private final LogicalPlan logicalQuery;
|
||||
private OlapTable targetTable;
|
||||
private final Optional<LogicalPlan> cte;
|
||||
|
||||
/**
|
||||
* constructor
|
||||
*/
|
||||
public UpdateCommand(List<String> nameParts, @Nullable String tableAlias, List<EqualTo> assignments,
|
||||
LogicalPlan logicalQuery) {
|
||||
LogicalPlan logicalQuery, Optional<LogicalPlan> cte) {
|
||||
super(PlanType.UPDATE_COMMAND);
|
||||
this.nameParts = Utils.copyRequiredList(nameParts);
|
||||
this.assignments = Utils.copyRequiredList(assignments);
|
||||
this.tableAlias = tableAlias;
|
||||
this.logicalQuery = Objects.requireNonNull(logicalQuery, "logicalQuery is required in update command");
|
||||
this.cte = cte;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -117,6 +119,9 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab
|
||||
}
|
||||
|
||||
logicalQuery = new LogicalProject<>(selectItems, logicalQuery);
|
||||
if (cte.isPresent()) {
|
||||
logicalQuery = ((LogicalPlan) cte.get().withChildren(logicalQuery));
|
||||
}
|
||||
|
||||
// make UnboundTableSink
|
||||
return new UnboundOlapTableSink<>(nameParts, ImmutableList.of(), ImmutableList.of(),
|
||||
|
||||
@ -70,7 +70,7 @@ public class DeleteCommandTest extends TestWithFeService implements PlanPatternM
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFromClauseUpdate() throws AnalysisException {
|
||||
public void testFromClauseDelete() throws AnalysisException {
|
||||
String sql = "delete from t1 a using src join t2 on src.k1 = t2.k1 where t2.k1 = a.k1";
|
||||
LogicalPlan parsed = new NereidsParser().parseSingle(sql);
|
||||
Assertions.assertTrue(parsed instanceof DeleteCommand);
|
||||
|
||||
15
regression-test/data/nereids_p0/delete/delete_cte.out
Normal file
15
regression-test/data/nereids_p0/delete/delete_cte.out
Normal file
@ -0,0 +1,15 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !sql --
|
||||
1 \N 2 1 1.0 \N
|
||||
1 10 1 1 1.0 2000-01-01
|
||||
2 \N 4 2 2.0 \N
|
||||
2 20 2 2 2.0 2000-01-02
|
||||
3 \N 6 3 3.0 \N
|
||||
3 30 3 3 3.0 2000-01-03
|
||||
|
||||
-- !sql --
|
||||
2 \N 4 2 2.0 \N
|
||||
2 20 2 2 2.0 2000-01-02
|
||||
3 \N 6 3 3.0 \N
|
||||
3 30 3 3 3.0 2000-01-03
|
||||
|
||||
16
regression-test/data/nereids_p0/update/update_cte.out
Normal file
16
regression-test/data/nereids_p0/update/update_cte.out
Normal file
@ -0,0 +1,16 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !sql --
|
||||
1 10 1 1 1.0 2000-01-01
|
||||
2 20 2 2 2.0 2000-01-02
|
||||
3 30 5 3 3.0 2000-01-03
|
||||
|
||||
-- !sql --
|
||||
1 10 2 1 2.0 2000-01-01
|
||||
2 20 2 2 2.0 2000-01-02
|
||||
3 30 5 3 3.0 2000-01-03
|
||||
|
||||
-- !sql --
|
||||
1 10 10 1 1000.0 2000-01-01
|
||||
2 20 2 2 2.0 2000-01-02
|
||||
3 30 5 3 3.0 2000-01-03
|
||||
|
||||
108
regression-test/suites/nereids_p0/delete/delete_cte.groovy
Normal file
108
regression-test/suites/nereids_p0/delete/delete_cte.groovy
Normal file
@ -0,0 +1,108 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite('nereids_delete_cte') {
|
||||
def t1 = 't1_cte'
|
||||
def t2 = 't2_cte'
|
||||
def t3 = 't3_cte'
|
||||
|
||||
sql "drop table if exists ${t1}"
|
||||
sql """
|
||||
create table ${t1} (
|
||||
id int,
|
||||
id1 int,
|
||||
c1 bigint,
|
||||
c2 string,
|
||||
c3 double,
|
||||
c4 date
|
||||
) unique key (id, id1)
|
||||
distributed by hash(id, id1)
|
||||
properties(
|
||||
"replication_num"="1",
|
||||
"function_column.sequence_col" = "c4",
|
||||
"enable_unique_key_merge_on_write" = "true"
|
||||
);
|
||||
"""
|
||||
|
||||
sql "drop table if exists ${t2}"
|
||||
sql """
|
||||
create table ${t2} (
|
||||
id int,
|
||||
c1 bigint,
|
||||
c2 string,
|
||||
c3 double,
|
||||
c4 date
|
||||
) unique key (id)
|
||||
distributed by hash(id)
|
||||
properties(
|
||||
"replication_num"="1"
|
||||
);
|
||||
"""
|
||||
|
||||
sql "drop table if exists ${t3}"
|
||||
sql """
|
||||
create table ${t3} (
|
||||
id int
|
||||
) distributed by hash(id)
|
||||
properties(
|
||||
"replication_num"="1"
|
||||
);
|
||||
"""
|
||||
|
||||
sql """
|
||||
INSERT INTO ${t1} VALUES
|
||||
(1, 10, 1, '1', 1.0, '2000-01-01'),
|
||||
(2, 20, 2, '2', 2.0, '2000-01-02'),
|
||||
(3, 30, 3, '3', 3.0, '2000-01-03');
|
||||
"""
|
||||
|
||||
sql """
|
||||
|
||||
INSERT INTO ${t2} VALUES
|
||||
(1, 10, '10', 10.0, '2000-01-10'),
|
||||
(2, 20, '20', 20.0, '2000-01-20'),
|
||||
(3, 30, '30', 30.0, '2000-01-30'),
|
||||
(4, 4, '4', 4.0, '2000-01-04'),
|
||||
(5, 5, '5', 5.0, '2000-01-05');
|
||||
"""
|
||||
|
||||
sql """
|
||||
INSERT INTO ${t3} VALUES
|
||||
(1),
|
||||
(4),
|
||||
(5);
|
||||
"""
|
||||
|
||||
sql "set enable_nereids_planner=true"
|
||||
sql "set enable_fallback_to_original_planner=false"
|
||||
sql "set enable_nereids_dml=true"
|
||||
|
||||
sql "insert into ${t1}(id, c1, c2, c3) select id, c1 * 2, c2, c3 from ${t1}"
|
||||
sql "insert into ${t2}(id, c1, c2, c3) select id, c1, c2 * 2, c3 from ${t2}"
|
||||
sql "insert into ${t2}(c1, c3) select c1 + 1, c3 + 1 from (select id, c1, c3 from ${t1} order by id, c1 limit 10) ${t1}, ${t3}"
|
||||
|
||||
qt_sql "select * from ${t1} order by id, id1"
|
||||
|
||||
sql """
|
||||
with cte as (select * from ${t3})
|
||||
delete from ${t1}
|
||||
using ${t2} join cte on ${t2}.id = cte.id
|
||||
where ${t1}.id = ${t2}.id;
|
||||
"""
|
||||
|
||||
qt_sql "select * from ${t1} order by id, id1"
|
||||
}
|
||||
109
regression-test/suites/nereids_p0/update/update_cte.groovy
Normal file
109
regression-test/suites/nereids_p0/update/update_cte.groovy
Normal file
@ -0,0 +1,109 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite('nereids_update_cte') {
|
||||
def t1 = 't1_cte'
|
||||
def t2 = 't2_cte'
|
||||
def t3 = 't3_cte'
|
||||
|
||||
sql "drop table if exists ${t1}"
|
||||
sql """
|
||||
create table ${t1} (
|
||||
id int,
|
||||
id1 int,
|
||||
c1 bigint,
|
||||
c2 string,
|
||||
c3 double,
|
||||
c4 date
|
||||
) unique key (id, id1)
|
||||
distributed by hash(id, id1)
|
||||
properties(
|
||||
"replication_num"="1",
|
||||
"function_column.sequence_col" = "c4"
|
||||
);
|
||||
"""
|
||||
|
||||
sql "drop table if exists ${t2}"
|
||||
sql """
|
||||
create table ${t2} (
|
||||
id int,
|
||||
c1 bigint,
|
||||
c2 string,
|
||||
c3 double,
|
||||
c4 date
|
||||
) unique key (id)
|
||||
distributed by hash(id)
|
||||
properties(
|
||||
"replication_num"="1"
|
||||
);
|
||||
"""
|
||||
|
||||
sql "drop table if exists ${t3}"
|
||||
sql """
|
||||
create table ${t3} (
|
||||
id int
|
||||
) distributed by hash(id)
|
||||
properties(
|
||||
"replication_num"="1"
|
||||
);
|
||||
"""
|
||||
|
||||
sql """
|
||||
INSERT INTO ${t1} VALUES
|
||||
(1, 10, 1, '1', 1.0, '2000-01-01'),
|
||||
(2, 20, 2, '2', 2.0, '2000-01-02'),
|
||||
(3, 30, 3, '3', 3.0, '2000-01-03');
|
||||
"""
|
||||
|
||||
sql """
|
||||
|
||||
INSERT INTO ${t2} VALUES
|
||||
(1, 10, '10', 10.0, '2000-01-10'),
|
||||
(2, 20, '20', 20.0, '2000-01-20'),
|
||||
(3, 30, '30', 30.0, '2000-01-30'),
|
||||
(4, 4, '4', 4.0, '2000-01-04'),
|
||||
(5, 5, '5', 5.0, '2000-01-05');
|
||||
"""
|
||||
|
||||
sql """
|
||||
INSERT INTO ${t3} VALUES
|
||||
(1),
|
||||
(4),
|
||||
(5);
|
||||
"""
|
||||
sql "set enable_nereids_planner=true"
|
||||
sql "set enable_fallback_to_original_planner=false"
|
||||
sql "set enable_nereids_dml=true"
|
||||
|
||||
sql "update ${t1} set c1 = 5 where id = 3"
|
||||
|
||||
qt_sql "select * from ${t1} order by id"
|
||||
|
||||
sql "update ${t1} set c1 = c1 + 1, c3 = c2 * 2 where id = 1"
|
||||
|
||||
qt_sql "select * from ${t1} order by id"
|
||||
|
||||
sql """
|
||||
with cte as (select * from ${t3})
|
||||
update ${t1}
|
||||
set ${t1}.c1 = ${t2}.c1, ${t1}.c3 = ${t2}.c3 * 100
|
||||
from ${t2} inner join cte on ${t2}.id = cte.id
|
||||
where ${t1}.id = ${t2}.id;
|
||||
"""
|
||||
|
||||
qt_sql "select * from ${t1} order by id"
|
||||
}
|
||||
Reference in New Issue
Block a user