diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java index e11e384b2f..9bf097f994 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundOlapTableSink.java @@ -44,20 +44,30 @@ public class UnboundOlapTableSink extends LogicalSink colNames; private final List hints; private final List partitions; + private final boolean isPartialUpdate; public UnboundOlapTableSink(List nameParts, List colNames, List hints, List partitions, CHILD_TYPE child) { - this(nameParts, colNames, hints, partitions, Optional.empty(), Optional.empty(), child); + this(nameParts, colNames, hints, partitions, false, Optional.empty(), Optional.empty(), child); } public UnboundOlapTableSink(List nameParts, List colNames, List hints, - List partitions, Optional groupExpression, + List partitions, boolean isPartialUpdate, CHILD_TYPE child) { + this(nameParts, colNames, hints, partitions, isPartialUpdate, Optional.empty(), Optional.empty(), child); + } + + /** + * constructor + */ + public UnboundOlapTableSink(List nameParts, List colNames, List hints, + List partitions, boolean isPartialUpdate, Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); this.nameParts = Utils.copyRequiredList(nameParts); this.colNames = Utils.copyRequiredList(colNames); this.hints = Utils.copyRequiredList(hints); this.partitions = Utils.copyRequiredList(partitions); + this.isPartialUpdate = isPartialUpdate; } public List getColNames() { @@ -72,11 +82,15 @@ public class UnboundOlapTableSink extends LogicalSink children) { Preconditions.checkArgument(children.size() == 1, "UnboundOlapTableSink only accepts one child"); - return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, groupExpression, - Optional.of(getLogicalProperties()), children.get(0)); + return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, isPartialUpdate, + groupExpression, Optional.of(getLogicalProperties()), children.get(0)); } @Override @@ -111,15 +125,15 @@ public class UnboundOlapTableSink extends LogicalSink groupExpression) { - return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, groupExpression, - Optional.of(getLogicalProperties()), child()); + return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, + isPartialUpdate, groupExpression, Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { - return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, groupExpression, - logicalProperties, children.get(0)); + return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, + isPartialUpdate, groupExpression, logicalProperties, children.get(0)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index d7840c3ff9..dbfeade1a4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -340,6 +340,14 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor partialUpdateCols = new HashSet(); + for (Column col : olapTableSink.getCols()) { + partialUpdateCols.add(col.getName()); + } + sink.setPartialUpdateInputColumns(true, partialUpdateCols); + } + rootFragment.setSink(sink); rootFragment.setOutputPartition(DataPartition.UNPARTITIONED); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java index fc1ae2e21d..750413b919 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindInsertTargetTable.java @@ -75,6 +75,7 @@ public class BindInsertTargetTable extends OneAnalysisRuleFactory { table, bindTargetColumns(table, sink.getColNames()), bindPartitionIds(table, sink.getPartitions()), + sink.isPartialUpdate(), sink.child()); // we need to insert all the columns of the target table although some columns are not mentions. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java index 75927ed6f2..8927b8bd13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java @@ -39,6 +39,7 @@ public class LogicalOlapTableSinkToPhysicalOlapTableSink extends OneImplementati sink.getPartitionIds(), sink.getCols(), ctx.connectContext.getSessionVariable().isEnableSingleReplicaInsert(), + sink.isPartialUpdate(), Optional.empty(), sink.getLogicalProperties(), sink.child()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteCommand.java index d7216d257b..ae0900ced5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteCommand.java @@ -111,9 +111,15 @@ public class DeleteCommand extends Command implements ForwardWithSync, Explainab logicalQuery = new LogicalProject<>(selectLists, logicalQuery); + boolean isPartialUpdate = false; + if (((OlapTable) targetTable).getEnableUniqueKeyMergeOnWrite() + && cols.size() < targetTable.getColumns().size()) { + isPartialUpdate = true; + } + // make UnboundTableSink return new UnboundOlapTableSink<>(nameParts, cols, ImmutableList.of(), - partitions, logicalQuery); + partitions, isPartialUpdate, logicalQuery); } public LogicalPlan getLogicalQuery() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java index fc683a8621..631b6b6ab3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java @@ -46,27 +46,32 @@ public class LogicalOlapTableSink extends LogicalSink cols; private final List partitionIds; + private final boolean isPartialUpdate; public LogicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, - CHILD_TYPE child) { - this(database, targetTable, cols, partitionIds, Optional.empty(), Optional.empty(), child); + boolean isPartialUpdate, CHILD_TYPE child) { + this(database, targetTable, cols, partitionIds, isPartialUpdate, Optional.empty(), Optional.empty(), child); } + /** + * constructor + */ public LogicalOlapTableSink(Database database, OlapTable targetTable, List cols, - List partitionIds, Optional groupExpression, + List partitionIds, boolean isPartialUpdate, Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { super(PlanType.LOGICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); this.database = Objects.requireNonNull(database, "database != null in LogicalOlapTableSink"); this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalOlapTableSink"); this.cols = Utils.copyRequiredList(cols); + this.isPartialUpdate = isPartialUpdate; this.partitionIds = Utils.copyRequiredList(partitionIds); } @Override public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "LogicalOlapTableSink only accepts one child"); - return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, + return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, isPartialUpdate, Optional.empty(), Optional.empty(), children.get(0)); } @@ -86,6 +91,10 @@ public class LogicalOlapTableSink extends LogicalSink extends LogicalSink sink = (LogicalOlapTableSink) o; - return Objects.equals(database, sink.database) + return isPartialUpdate == sink.isPartialUpdate() + && Objects.equals(database, sink.database) && Objects.equals(targetTable, sink.targetTable) && Objects.equals(partitionIds, sink.partitionIds) && Objects.equals(cols, sink.cols); @@ -103,7 +113,7 @@ public class LogicalOlapTableSink extends LogicalSink extends LogicalSink groupExpression) { - return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, groupExpression, + return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, isPartialUpdate, groupExpression, Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { - return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, groupExpression, + return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, isPartialUpdate, groupExpression, logicalProperties, children.get(0)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java index 2dbc6b16f3..8e86c8c16d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java @@ -55,26 +55,35 @@ public class PhysicalOlapTableSink extends PhysicalSink private final List cols; private final List partitionIds; private final boolean singleReplicaLoad; + private final boolean isPartialUpdate; - /** - * Constructor - */ public PhysicalOlapTableSink(Database database, OlapTable targetTable, List partitionIds, - List cols, boolean singleReplicaLoad, Optional groupExpression, - LogicalProperties logicalProperties, CHILD_TYPE child) { - super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); - this.database = Objects.requireNonNull(database, "database != null in PhysicalOlapTableSink"); - this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in PhysicalOlapTableSink"); - this.cols = Utils.copyRequiredList(cols); - this.partitionIds = Utils.copyRequiredList(partitionIds); - this.singleReplicaLoad = singleReplicaLoad; + List cols, boolean singleReplicaLoad, boolean isPartialUpdate, LogicalProperties logicalProperties, + CHILD_TYPE child) { + this(database, targetTable, partitionIds, cols, singleReplicaLoad, isPartialUpdate, + Optional.empty(), logicalProperties, child); } /** * Constructor */ public PhysicalOlapTableSink(Database database, OlapTable targetTable, List partitionIds, - List cols, boolean singleReplicaLoad, Optional groupExpression, + List cols, boolean singleReplicaLoad, boolean isPartialUpdate, + Optional groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { + super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, child); + this.database = Objects.requireNonNull(database, "database != null in PhysicalOlapTableSink"); + this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in PhysicalOlapTableSink"); + this.cols = Utils.copyRequiredList(cols); + this.partitionIds = Utils.copyRequiredList(partitionIds); + this.singleReplicaLoad = singleReplicaLoad; + this.isPartialUpdate = isPartialUpdate; + } + + /** + * Constructor + */ + public PhysicalOlapTableSink(Database database, OlapTable targetTable, List partitionIds, List cols, + boolean singleReplicaLoad, boolean isPartialUpdate, Optional groupExpression, LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, physicalProperties, @@ -84,6 +93,7 @@ public class PhysicalOlapTableSink extends PhysicalSink this.cols = Utils.copyRequiredList(cols); this.partitionIds = Utils.copyRequiredList(partitionIds); this.singleReplicaLoad = singleReplicaLoad; + this.isPartialUpdate = isPartialUpdate; } public Database getDatabase() { @@ -106,11 +116,15 @@ public class PhysicalOlapTableSink extends PhysicalSink return singleReplicaLoad; } + public boolean isPartialUpdate() { + return isPartialUpdate; + } + @Override public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "PhysicalOlapTableSink only accepts one child"); return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, - singleReplicaLoad, groupExpression, getLogicalProperties(), physicalProperties, + singleReplicaLoad, isPartialUpdate, groupExpression, getLogicalProperties(), physicalProperties, statistics, children.get(0)); } @@ -124,6 +138,7 @@ public class PhysicalOlapTableSink extends PhysicalSink } PhysicalOlapTableSink that = (PhysicalOlapTableSink) o; return singleReplicaLoad == that.singleReplicaLoad + && isPartialUpdate == that.isPartialUpdate && Objects.equals(database, that.database) && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols) @@ -132,7 +147,7 @@ public class PhysicalOlapTableSink extends PhysicalSink @Override public int hashCode() { - return Objects.hash(database, targetTable, cols, partitionIds, singleReplicaLoad); + return Objects.hash(database, targetTable, cols, partitionIds, singleReplicaLoad, isPartialUpdate); } @Override @@ -166,20 +181,20 @@ public class PhysicalOlapTableSink extends PhysicalSink @Override public Plan withGroupExpression(Optional groupExpression) { return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad, - groupExpression, getLogicalProperties(), child()); + isPartialUpdate, groupExpression, getLogicalProperties(), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad, - groupExpression, logicalProperties.get(), children.get(0)); + isPartialUpdate, groupExpression, logicalProperties.get(), children.get(0)); } @Override public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad, - groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + isPartialUpdate, groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); } /** diff --git a/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out b/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out new file mode 100644 index 0000000000..0b7c6bf68e --- /dev/null +++ b/regression-test/data/nereids_p0/delete/delete_mow_partial_update.out @@ -0,0 +1,26 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 1 +2 2 +3 3 +4 4 +5 5 + +-- !sql -- +4 4 +5 5 + +-- !sql_skip_delete_predicate -- +4 4 +5 5 + +-- !sql -- +1 1 0 +1 1 1 +2 2 0 +2 2 1 +3 3 0 +3 3 1 +4 4 0 +5 5 0 + diff --git a/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy b/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy new file mode 100644 index 0000000000..b70bfc2986 --- /dev/null +++ b/regression-test/suites/nereids_p0/delete/delete_mow_partial_update.groovy @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite('nereids_delete_mow_partial_update') { + sql 'set enable_nereids_planner=true' + sql 'set enable_fallback_to_original_planner=false' + sql "set experimental_enable_nereids_planner=true;" + sql 'set enable_nereids_dml=true' + + def tableName1 = "nereids_delete_mow_partial_update1" + sql "DROP TABLE IF EXISTS ${tableName1};" + + sql """ CREATE TABLE IF NOT EXISTS ${tableName1} ( + `uid` BIGINT NULL, + `v1` BIGINT NULL + )UNIQUE KEY(uid) + DISTRIBUTED BY HASH(uid) BUCKETS 3 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1" + );""" + def tableName2 = "nereids_delete_mow_partial_update2" + sql "DROP TABLE IF EXISTS ${tableName2};" + + sql """ CREATE TABLE IF NOT EXISTS ${tableName2} ( + `uid` BIGINT NULL + ) UNIQUE KEY(uid) + DISTRIBUTED BY HASH(uid) BUCKETS 3 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1" + );""" + + sql "insert into ${tableName1} values(1,1),(2,2),(3,3),(4,4),(5,5);" + qt_sql "select * from ${tableName1} order by uid;" + sql "insert into ${tableName2} values(1),(2),(3);" + sql "delete from ${tableName1} A using ${tableName2} B where A.uid=B.uid;" + qt_sql "select * from ${tableName1} order by uid;" + // when using parital update insert stmt for delete stmt, it will use delete bitmap or delete sign rather than + // delete predicate to "delete" the rows + sql "set skip_delete_predicate=true;" + qt_sql_skip_delete_predicate "select * from ${tableName1} order by uid;" + + sql "set skip_delete_sign=true;" + sql "set skip_storage_engine_merge=true;" + sql "set skip_delete_bitmap=true;" + qt_sql "select uid,v1,__DORIS_DELETE_SIGN__ from ${tableName1} order by uid;" +}