[opt](Nereids) add partial update support for delete stmt (#22184)
Currently, the new optimizer don't consider anything about partial update. This PR add the ability to convert a delete statement to a partial update insert statement for merge-on-write unique table
This commit is contained in:
@ -44,20 +44,30 @@ public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
|
||||
private final List<String> colNames;
|
||||
private final List<String> hints;
|
||||
private final List<String> partitions;
|
||||
private final boolean isPartialUpdate;
|
||||
|
||||
public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
|
||||
List<String> partitions, CHILD_TYPE child) {
|
||||
this(nameParts, colNames, hints, partitions, Optional.empty(), Optional.empty(), child);
|
||||
this(nameParts, colNames, hints, partitions, false, Optional.empty(), Optional.empty(), child);
|
||||
}
|
||||
|
||||
public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
|
||||
List<String> partitions, Optional<GroupExpression> groupExpression,
|
||||
List<String> partitions, boolean isPartialUpdate, CHILD_TYPE child) {
|
||||
this(nameParts, colNames, hints, partitions, isPartialUpdate, Optional.empty(), Optional.empty(), child);
|
||||
}
|
||||
|
||||
/**
|
||||
* constructor
|
||||
*/
|
||||
public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
|
||||
List<String> partitions, boolean isPartialUpdate, Optional<GroupExpression> groupExpression,
|
||||
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
|
||||
super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, groupExpression, logicalProperties, child);
|
||||
this.nameParts = Utils.copyRequiredList(nameParts);
|
||||
this.colNames = Utils.copyRequiredList(colNames);
|
||||
this.hints = Utils.copyRequiredList(hints);
|
||||
this.partitions = Utils.copyRequiredList(partitions);
|
||||
this.isPartialUpdate = isPartialUpdate;
|
||||
}
|
||||
|
||||
public List<String> getColNames() {
|
||||
@ -72,11 +82,15 @@ public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
|
||||
return partitions;
|
||||
}
|
||||
|
||||
public boolean isPartialUpdate() {
|
||||
return isPartialUpdate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Plan withChildren(List<Plan> children) {
|
||||
Preconditions.checkArgument(children.size() == 1, "UnboundOlapTableSink only accepts one child");
|
||||
return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, groupExpression,
|
||||
Optional.of(getLogicalProperties()), children.get(0));
|
||||
return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, isPartialUpdate,
|
||||
groupExpression, Optional.of(getLogicalProperties()), children.get(0));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -111,15 +125,15 @@ public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
|
||||
|
||||
@Override
|
||||
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
|
||||
return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, groupExpression,
|
||||
Optional.of(getLogicalProperties()), child());
|
||||
return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions,
|
||||
isPartialUpdate, groupExpression, Optional.of(getLogicalProperties()), child());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
|
||||
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
|
||||
return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, groupExpression,
|
||||
logicalProperties, children.get(0));
|
||||
return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions,
|
||||
isPartialUpdate, groupExpression, logicalProperties, children.get(0));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -340,6 +340,14 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
|
||||
olapTableSink.isSingleReplicaLoad()
|
||||
);
|
||||
|
||||
if (olapTableSink.isPartialUpdate()) {
|
||||
HashSet<String> partialUpdateCols = new HashSet<String>();
|
||||
for (Column col : olapTableSink.getCols()) {
|
||||
partialUpdateCols.add(col.getName());
|
||||
}
|
||||
sink.setPartialUpdateInputColumns(true, partialUpdateCols);
|
||||
}
|
||||
|
||||
rootFragment.setSink(sink);
|
||||
|
||||
rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);
|
||||
|
||||
@ -75,6 +75,7 @@ public class BindInsertTargetTable extends OneAnalysisRuleFactory {
|
||||
table,
|
||||
bindTargetColumns(table, sink.getColNames()),
|
||||
bindPartitionIds(table, sink.getPartitions()),
|
||||
sink.isPartialUpdate(),
|
||||
sink.child());
|
||||
|
||||
// we need to insert all the columns of the target table although some columns are not mentions.
|
||||
|
||||
@ -39,6 +39,7 @@ public class LogicalOlapTableSinkToPhysicalOlapTableSink extends OneImplementati
|
||||
sink.getPartitionIds(),
|
||||
sink.getCols(),
|
||||
ctx.connectContext.getSessionVariable().isEnableSingleReplicaInsert(),
|
||||
sink.isPartialUpdate(),
|
||||
Optional.empty(),
|
||||
sink.getLogicalProperties(),
|
||||
sink.child());
|
||||
|
||||
@ -111,9 +111,15 @@ public class DeleteCommand extends Command implements ForwardWithSync, Explainab
|
||||
|
||||
logicalQuery = new LogicalProject<>(selectLists, logicalQuery);
|
||||
|
||||
boolean isPartialUpdate = false;
|
||||
if (((OlapTable) targetTable).getEnableUniqueKeyMergeOnWrite()
|
||||
&& cols.size() < targetTable.getColumns().size()) {
|
||||
isPartialUpdate = true;
|
||||
}
|
||||
|
||||
// make UnboundTableSink
|
||||
return new UnboundOlapTableSink<>(nameParts, cols, ImmutableList.of(),
|
||||
partitions, logicalQuery);
|
||||
partitions, isPartialUpdate, logicalQuery);
|
||||
}
|
||||
|
||||
public LogicalPlan getLogicalQuery() {
|
||||
|
||||
@ -46,27 +46,32 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
|
||||
private final OlapTable targetTable;
|
||||
private final List<Column> cols;
|
||||
private final List<Long> partitionIds;
|
||||
private final boolean isPartialUpdate;
|
||||
|
||||
public LogicalOlapTableSink(Database database, OlapTable targetTable, List<Column> cols, List<Long> partitionIds,
|
||||
CHILD_TYPE child) {
|
||||
this(database, targetTable, cols, partitionIds, Optional.empty(), Optional.empty(), child);
|
||||
boolean isPartialUpdate, CHILD_TYPE child) {
|
||||
this(database, targetTable, cols, partitionIds, isPartialUpdate, Optional.empty(), Optional.empty(), child);
|
||||
}
|
||||
|
||||
/**
|
||||
* constructor
|
||||
*/
|
||||
public LogicalOlapTableSink(Database database, OlapTable targetTable, List<Column> cols,
|
||||
List<Long> partitionIds, Optional<GroupExpression> groupExpression,
|
||||
List<Long> partitionIds, boolean isPartialUpdate, Optional<GroupExpression> groupExpression,
|
||||
Optional<LogicalProperties> logicalProperties,
|
||||
CHILD_TYPE child) {
|
||||
super(PlanType.LOGICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child);
|
||||
this.database = Objects.requireNonNull(database, "database != null in LogicalOlapTableSink");
|
||||
this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalOlapTableSink");
|
||||
this.cols = Utils.copyRequiredList(cols);
|
||||
this.isPartialUpdate = isPartialUpdate;
|
||||
this.partitionIds = Utils.copyRequiredList(partitionIds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Plan withChildren(List<Plan> children) {
|
||||
Preconditions.checkArgument(children.size() == 1, "LogicalOlapTableSink only accepts one child");
|
||||
return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds,
|
||||
return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, isPartialUpdate,
|
||||
Optional.empty(), Optional.empty(), children.get(0));
|
||||
}
|
||||
|
||||
@ -86,6 +91,10 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
|
||||
return partitionIds;
|
||||
}
|
||||
|
||||
public boolean isPartialUpdate() {
|
||||
return isPartialUpdate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
@ -95,7 +104,8 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
|
||||
return false;
|
||||
}
|
||||
LogicalOlapTableSink<?> sink = (LogicalOlapTableSink<?>) o;
|
||||
return Objects.equals(database, sink.database)
|
||||
return isPartialUpdate == sink.isPartialUpdate()
|
||||
&& Objects.equals(database, sink.database)
|
||||
&& Objects.equals(targetTable, sink.targetTable)
|
||||
&& Objects.equals(partitionIds, sink.partitionIds)
|
||||
&& Objects.equals(cols, sink.cols);
|
||||
@ -103,7 +113,7 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(database, targetTable, partitionIds, cols);
|
||||
return Objects.hash(database, targetTable, partitionIds, cols, isPartialUpdate);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -118,14 +128,14 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
|
||||
|
||||
@Override
|
||||
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
|
||||
return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, groupExpression,
|
||||
return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, isPartialUpdate, groupExpression,
|
||||
Optional.of(getLogicalProperties()), child());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
|
||||
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
|
||||
return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, groupExpression,
|
||||
return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, isPartialUpdate, groupExpression,
|
||||
logicalProperties, children.get(0));
|
||||
}
|
||||
|
||||
|
||||
@ -55,26 +55,35 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
|
||||
private final List<Column> cols;
|
||||
private final List<Long> partitionIds;
|
||||
private final boolean singleReplicaLoad;
|
||||
private final boolean isPartialUpdate;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds,
|
||||
List<Column> cols, boolean singleReplicaLoad, Optional<GroupExpression> groupExpression,
|
||||
LogicalProperties logicalProperties, CHILD_TYPE child) {
|
||||
super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child);
|
||||
this.database = Objects.requireNonNull(database, "database != null in PhysicalOlapTableSink");
|
||||
this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in PhysicalOlapTableSink");
|
||||
this.cols = Utils.copyRequiredList(cols);
|
||||
this.partitionIds = Utils.copyRequiredList(partitionIds);
|
||||
this.singleReplicaLoad = singleReplicaLoad;
|
||||
List<Column> cols, boolean singleReplicaLoad, boolean isPartialUpdate, LogicalProperties logicalProperties,
|
||||
CHILD_TYPE child) {
|
||||
this(database, targetTable, partitionIds, cols, singleReplicaLoad, isPartialUpdate,
|
||||
Optional.empty(), logicalProperties, child);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds,
|
||||
List<Column> cols, boolean singleReplicaLoad, Optional<GroupExpression> groupExpression,
|
||||
List<Column> cols, boolean singleReplicaLoad, boolean isPartialUpdate,
|
||||
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) {
|
||||
super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child);
|
||||
this.database = Objects.requireNonNull(database, "database != null in PhysicalOlapTableSink");
|
||||
this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in PhysicalOlapTableSink");
|
||||
this.cols = Utils.copyRequiredList(cols);
|
||||
this.partitionIds = Utils.copyRequiredList(partitionIds);
|
||||
this.singleReplicaLoad = singleReplicaLoad;
|
||||
this.isPartialUpdate = isPartialUpdate;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols,
|
||||
boolean singleReplicaLoad, boolean isPartialUpdate, Optional<GroupExpression> groupExpression,
|
||||
LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics,
|
||||
CHILD_TYPE child) {
|
||||
super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, physicalProperties,
|
||||
@ -84,6 +93,7 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
|
||||
this.cols = Utils.copyRequiredList(cols);
|
||||
this.partitionIds = Utils.copyRequiredList(partitionIds);
|
||||
this.singleReplicaLoad = singleReplicaLoad;
|
||||
this.isPartialUpdate = isPartialUpdate;
|
||||
}
|
||||
|
||||
public Database getDatabase() {
|
||||
@ -106,11 +116,15 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
|
||||
return singleReplicaLoad;
|
||||
}
|
||||
|
||||
public boolean isPartialUpdate() {
|
||||
return isPartialUpdate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Plan withChildren(List<Plan> children) {
|
||||
Preconditions.checkArgument(children.size() == 1, "PhysicalOlapTableSink only accepts one child");
|
||||
return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols,
|
||||
singleReplicaLoad, groupExpression, getLogicalProperties(), physicalProperties,
|
||||
singleReplicaLoad, isPartialUpdate, groupExpression, getLogicalProperties(), physicalProperties,
|
||||
statistics, children.get(0));
|
||||
}
|
||||
|
||||
@ -124,6 +138,7 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
|
||||
}
|
||||
PhysicalOlapTableSink<?> that = (PhysicalOlapTableSink<?>) o;
|
||||
return singleReplicaLoad == that.singleReplicaLoad
|
||||
&& isPartialUpdate == that.isPartialUpdate
|
||||
&& Objects.equals(database, that.database)
|
||||
&& Objects.equals(targetTable, that.targetTable)
|
||||
&& Objects.equals(cols, that.cols)
|
||||
@ -132,7 +147,7 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(database, targetTable, cols, partitionIds, singleReplicaLoad);
|
||||
return Objects.hash(database, targetTable, cols, partitionIds, singleReplicaLoad, isPartialUpdate);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -166,20 +181,20 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
|
||||
@Override
|
||||
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
|
||||
return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad,
|
||||
groupExpression, getLogicalProperties(), child());
|
||||
isPartialUpdate, groupExpression, getLogicalProperties(), child());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
|
||||
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
|
||||
return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad,
|
||||
groupExpression, logicalProperties.get(), children.get(0));
|
||||
isPartialUpdate, groupExpression, logicalProperties.get(), children.get(0));
|
||||
}
|
||||
|
||||
@Override
|
||||
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
|
||||
return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad,
|
||||
groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
|
||||
isPartialUpdate, groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@ -0,0 +1,26 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !sql --
|
||||
1 1
|
||||
2 2
|
||||
3 3
|
||||
4 4
|
||||
5 5
|
||||
|
||||
-- !sql --
|
||||
4 4
|
||||
5 5
|
||||
|
||||
-- !sql_skip_delete_predicate --
|
||||
4 4
|
||||
5 5
|
||||
|
||||
-- !sql --
|
||||
1 1 0
|
||||
1 1 1
|
||||
2 2 0
|
||||
2 2 1
|
||||
3 3 0
|
||||
3 3 1
|
||||
4 4 0
|
||||
5 5 0
|
||||
|
||||
@ -0,0 +1,64 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite('nereids_delete_mow_partial_update') {
|
||||
sql 'set enable_nereids_planner=true'
|
||||
sql 'set enable_fallback_to_original_planner=false'
|
||||
sql "set experimental_enable_nereids_planner=true;"
|
||||
sql 'set enable_nereids_dml=true'
|
||||
|
||||
def tableName1 = "nereids_delete_mow_partial_update1"
|
||||
sql "DROP TABLE IF EXISTS ${tableName1};"
|
||||
|
||||
sql """ CREATE TABLE IF NOT EXISTS ${tableName1} (
|
||||
`uid` BIGINT NULL,
|
||||
`v1` BIGINT NULL
|
||||
)UNIQUE KEY(uid)
|
||||
DISTRIBUTED BY HASH(uid) BUCKETS 3
|
||||
PROPERTIES (
|
||||
"enable_unique_key_merge_on_write" = "true",
|
||||
"disable_auto_compaction" = "true",
|
||||
"replication_num" = "1"
|
||||
);"""
|
||||
def tableName2 = "nereids_delete_mow_partial_update2"
|
||||
sql "DROP TABLE IF EXISTS ${tableName2};"
|
||||
|
||||
sql """ CREATE TABLE IF NOT EXISTS ${tableName2} (
|
||||
`uid` BIGINT NULL
|
||||
) UNIQUE KEY(uid)
|
||||
DISTRIBUTED BY HASH(uid) BUCKETS 3
|
||||
PROPERTIES (
|
||||
"enable_unique_key_merge_on_write" = "true",
|
||||
"disable_auto_compaction" = "true",
|
||||
"replication_num" = "1"
|
||||
);"""
|
||||
|
||||
sql "insert into ${tableName1} values(1,1),(2,2),(3,3),(4,4),(5,5);"
|
||||
qt_sql "select * from ${tableName1} order by uid;"
|
||||
sql "insert into ${tableName2} values(1),(2),(3);"
|
||||
sql "delete from ${tableName1} A using ${tableName2} B where A.uid=B.uid;"
|
||||
qt_sql "select * from ${tableName1} order by uid;"
|
||||
// when using parital update insert stmt for delete stmt, it will use delete bitmap or delete sign rather than
|
||||
// delete predicate to "delete" the rows
|
||||
sql "set skip_delete_predicate=true;"
|
||||
qt_sql_skip_delete_predicate "select * from ${tableName1} order by uid;"
|
||||
|
||||
sql "set skip_delete_sign=true;"
|
||||
sql "set skip_storage_engine_merge=true;"
|
||||
sql "set skip_delete_bitmap=true;"
|
||||
qt_sql "select uid,v1,__DORIS_DELETE_SIGN__ from ${tableName1} order by uid;"
|
||||
}
|
||||
Reference in New Issue
Block a user