From 609761567c24dc1d31ea49bc74afdf46cc45f1b7 Mon Sep 17 00:00:00 2001 From: abmdocrt Date: Fri, 8 Mar 2024 13:47:56 +0800 Subject: [PATCH] [Fix](partial-update) Fix wrong column number passing to BE when partial and enable nereids (#31461) * Problem: Inconsistent behavior occurs when executing partial column update `UPDATE` statements and `INSERT` statements on merge-on-write tables with the Nereids optimizer enabled. The number of columns passed to BE differs; `UPDATE` operations incorrectly pass all columns, while `INSERT` operations correctly pass only the updated columns. Reason: The Nereids optimizer does not handle partial column update `UPDATE` statements properly. The processing logic for `UPDATE` statements rewrites them as equivalent `INSERT` statements, which are then processed according to the logic of `INSERT` statements. For example, assuming a MoW table structure with columns k1, k2, v1, v2, the correct rewrite should be: * `UPDATE` table t1 set v1 = v1 + 1 where k1 = 1 and k2 = 2 * => * `INSERT` into table (v1) select v1 + 1 from table t1 where k1 = 1 and k2 = 2 However, the actual rewriting process does not consider the logic for partial column updates, leading to all columns being included in the `INSERT` statement, i.e., the result is: * `INSERT` into table (k1, k2, v1, v2) select k1, k2, v1 + 1, v2 from table t1 where k1 = 1 and k2 = 2 This results in `UPDATE` operations incorrectly passing all columns to BE. Solution: Having analyzed the cause, the solution is straightforward: when rewriting partial column update `UPDATE` statements to `INSERT` statements, only retain the updated columns and all key columns (as partial column updates must include all key columns). Additionally, this PR includes error injection cases to verify the number of columns passed to BE is correct. * 2 * 3 * 4 * 5 --- be/src/olap/rowset_builder.cpp | 6 ++ .../trees/plans/commands/UpdateCommand.java | 44 ++++++++-- ...tial_update_column_num_fault_injection.out | 11 +++ .../test_auto_partition_behavior.out | 15 ---- .../data/update/test_unique_table_update.out | 26 ++++++ .../data/update/test_update_mow.out | 15 ++++ ...l_update_column_num_fault_injection.groovy | 51 +++++++++++ .../test_auto_partition_behavior.groovy | 7 -- .../update/test_unique_table_update.groovy | 84 +++++++++++++++++++ .../suites/update/test_update_mow.groovy | 32 +++++++ 10 files changed, 263 insertions(+), 28 deletions(-) create mode 100644 regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out create mode 100644 regression-test/data/update/test_unique_table_update.out create mode 100644 regression-test/suites/fault_injection_p0/test_partial_update_column_num_fault_injection.groovy create mode 100644 regression-test/suites/update/test_unique_table_update.groovy diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp index 8e9d87ec25..248aaba5c9 100644 --- a/be/src/olap/rowset_builder.cpp +++ b/be/src/olap/rowset_builder.cpp @@ -184,6 +184,12 @@ Status RowsetBuilder::init() { RETURN_IF_ERROR(prepare_txn()); + DBUG_EXECUTE_IF("BaseRowsetBuilder::init.check_partial_update_column_num", { + if (_req.table_schema_param->partial_update_input_columns().size() != + dp->param("column_num")) { + return Status::InternalError("partial update input column num wrong!"); + }; + }) // build tablet schema in request level _build_current_tablet_schema(_req.index_id, _req.table_schema_param.get(), *_tablet->tablet_schema()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java index 133fc36a1d..cde07c957a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java @@ -49,6 +49,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -103,10 +104,17 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab checkTable(ctx); Map colNameToExpression = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + Map partialUpdateColNameToExpression = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); for (EqualTo equalTo : assignments) { List nameParts = ((UnboundSlot) equalTo.left()).getNameParts(); checkAssignmentColumn(ctx, nameParts); colNameToExpression.put(nameParts.get(nameParts.size() - 1), equalTo.right()); + partialUpdateColNameToExpression.put(nameParts.get(nameParts.size() - 1), equalTo.right()); + } + // check if any key in update clause + if (targetTable.getFullSchema().stream().filter(Column::isKey) + .anyMatch(column -> partialUpdateColNameToExpression.containsKey(column.getName()))) { + throw new AnalysisException("Only value columns of unique table could be updated"); } List selectItems = Lists.newArrayList(); String tableName = tableAlias != null ? tableAlias : targetTable.getName(); @@ -147,16 +155,40 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab + String.join(", ", colNameToExpression.keySet())); } - logicalQuery = new LogicalProject<>(selectItems, logicalQuery); + boolean isPartialUpdate = targetTable.getEnableUniqueKeyMergeOnWrite() + && selectItems.size() < targetTable.getColumns().size() + && !targetTable.hasVariantColumns() && targetTable.getSequenceCol() == null + && partialUpdateColNameToExpression.size() <= targetTable.getFullSchema().size() * 3 / 10; + + List partialUpdateColNames = new ArrayList<>(); + List partialUpdateSelectItems = new ArrayList<>(); + if (isPartialUpdate) { + for (Column column : targetTable.getFullSchema()) { + Expression expr = new NereidsParser().parseExpression(tableName + "." + column.getName()); + boolean existInExpr = false; + for (String colName : partialUpdateColNameToExpression.keySet()) { + if (colName.equalsIgnoreCase(column.getName())) { + expr = partialUpdateColNameToExpression.get(column.getName()); + existInExpr = true; + break; + } + } + if (column.isKey() || existInExpr) { + partialUpdateSelectItems.add(expr instanceof UnboundSlot + ? ((NamedExpression) expr) + : new UnboundAlias(expr)); + partialUpdateColNames.add(column.getName()); + } + } + } + + logicalQuery = new LogicalProject<>(isPartialUpdate ? partialUpdateSelectItems : selectItems, logicalQuery); if (cte.isPresent()) { logicalQuery = ((LogicalPlan) cte.get().withChildren(logicalQuery)); } - boolean isPartialUpdate = targetTable.getEnableUniqueKeyMergeOnWrite() - && selectItems.size() < targetTable.getColumns().size() - && !targetTable.hasVariantColumns(); - // make UnboundTableSink - return new UnboundTableSink<>(nameParts, ImmutableList.of(), ImmutableList.of(), + return new UnboundTableSink<>(nameParts, isPartialUpdate ? partialUpdateColNames : ImmutableList.of(), + ImmutableList.of(), false, ImmutableList.of(), isPartialUpdate, DMLCommandType.UPDATE, logicalQuery); } diff --git a/regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out b/regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out new file mode 100644 index 0000000000..3d9ecbcede --- /dev/null +++ b/regression-test/data/fault_injection_p0/test_partial_update_column_num_fault_injection.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_1 -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !select_2 -- +1 1 1 1 1 +2 1 1 2 2 +3 3 3 3 3 + diff --git a/regression-test/data/partition_p0/auto_partition/test_auto_partition_behavior.out b/regression-test/data/partition_p0/auto_partition/test_auto_partition_behavior.out index d24d4db21d..5fe25cce3d 100644 --- a/regression-test/data/partition_p0/auto_partition/test_auto_partition_behavior.out +++ b/regression-test/data/partition_p0/auto_partition/test_auto_partition_behavior.out @@ -26,21 +26,6 @@ xxX 3 Xxx 3 xxX 3 --- !sql4 -- - 2 - --- !sql5 -- -1 - --- !sql6 -- - ! - - -- -- - --- -modified -xxX - -- !sql1 -- diff --git a/regression-test/data/update/test_unique_table_update.out b/regression-test/data/update/test_unique_table_update.out new file mode 100644 index 0000000000..d0c0dda6ad --- /dev/null +++ b/regression-test/data/update/test_unique_table_update.out @@ -0,0 +1,26 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_1 -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !select_2 -- +1 1 1 1 1 +2 2 2 2 2 +3 3 3 3 3 + +-- !select_3 -- +1 1 1 1 1 +2 1 1 2 2 +3 3 3 3 3 + +-- !select_4 -- +1 1 1 1 1 +2 1 1 2 2 +3 3 3 3 3 + +-- !select_5 -- +1 1 1 1 1 +2 1 1 2 2 +3 3 3 3 3 + diff --git a/regression-test/data/update/test_update_mow.out b/regression-test/data/update/test_update_mow.out index ed072a5c0a..0cc8c44855 100644 --- a/regression-test/data/update/test_update_mow.out +++ b/regression-test/data/update/test_update_mow.out @@ -38,3 +38,18 @@ a 1 2023-11-12T00:00 test1 999 b 2 2023-11-12T00:00 test2 2 c 3 2022-01-01T00:00 update value 3 +-- !sql -- +a 1 2023-11-12T00:00 test1 1 +b 2 2023-11-12T00:00 test2 2 +c 3 2023-11-12T00:00 test3 3 + +-- !sql -- +a 1 2023-11-12T00:00 test1 999 +b 2 2023-11-12T00:00 test2 2 +c 3 2023-11-12T00:00 test3 3 + +-- !sql -- +a 1 2023-11-12T00:00 test1 999 +b 2 2023-11-12T00:00 test2 2 +c 3 2022-01-01T00:00 update value 3 + diff --git a/regression-test/suites/fault_injection_p0/test_partial_update_column_num_fault_injection.groovy b/regression-test/suites/fault_injection_p0/test_partial_update_column_num_fault_injection.groovy new file mode 100644 index 0000000000..50973fb897 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_partial_update_column_num_fault_injection.groovy @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_partial_update_column_num_fault_injection","nonConcurrent") { + + + def tableName = "test_partial_update_column_num_fault_injection" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE TABLE ${tableName} + (k bigint, v1 string, v2 string, v3 string, v4 string ) + UNIQUE KEY(k) + DISTRIBUTED BY HASH (k) + BUCKETS 32 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write"="true"); + """ + + GetDebugPoint().clearDebugPointsForAllBEs() + + try { + sql "insert into ${tableName} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3);" + qt_select_1 "select * from ${tableName} order by k;" + sql "set enable_fallback_to_original_planner=false;" + GetDebugPoint().enableDebugPointForAllBEs("BaseRowsetBuilder::init.check_partial_update_column_num", [column_num: 3]) + sql "update ${tableName} set v1=1, v2=1 where k=2;" + } catch (Exception e) { + logger.info(e.getMessage()) + AssertTrue(false) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("BaseRowsetBuilder::init.check_partial_update_column_num") + qt_select_2 "select * from ${tableName} order by k;" + } +} \ No newline at end of file diff --git a/regression-test/suites/partition_p0/auto_partition/test_auto_partition_behavior.groovy b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_behavior.groovy index 02623bb5c9..a2429214d6 100644 --- a/regression-test/suites/partition_p0/auto_partition/test_auto_partition_behavior.groovy +++ b/regression-test/suites/partition_p0/auto_partition/test_auto_partition_behavior.groovy @@ -60,13 +60,6 @@ suite("test_auto_partition_behavior") { result = sql "show partitions from unique_table" assertEquals(result.size(), 9) qt_sql3 """ select str,length(str) from unique_table order by `str` """ - // modify value - sql """ update unique_table set str = "modified" where str in (" ", " ") """ // only " " - qt_sql4 """ select str,length(str) from unique_table where str = ' ' order by `str` """ // modified - qt_sql5 """ select count() from unique_table where str = 'modified' """ - // crop - qt_sql6 """ select str from unique_table where ((str > ' ! ' || str = 'modified') && str != 'Xxx') order by str """ - /// duplicate key table sql "drop table if exists dup_table" diff --git a/regression-test/suites/update/test_unique_table_update.groovy b/regression-test/suites/update/test_unique_table_update.groovy new file mode 100644 index 0000000000..d886b4fcdf --- /dev/null +++ b/regression-test/suites/update/test_unique_table_update.groovy @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_unique_table_update","nonConcurrent") { + + + def tableName = "test_unique_table_update" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + sql """ + CREATE TABLE ${tableName} + (k bigint, v1 string, v2 string, v3 string, v4 string ) + UNIQUE KEY(k) + DISTRIBUTED BY HASH (k) + BUCKETS 32 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write"="true"); + """ + + sql "insert into ${tableName} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3);" + qt_select_1 "select * from ${tableName} order by k;" + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + + // update key is not allowed + try { + sql "update ${tableName} set k=1, v1=1, v2=1 where k=2;" + assertTrue(false) + } catch (Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains("Only value columns of unique table could be updated")) + } finally { + qt_select_2 "select * from ${tableName} order by k;" + } + + // update value is allowed + try { + sql "update ${tableName} set v1=1, v2=1 where k=2;" + } catch (Exception e) { + logger.info(e.getMessage()) + assertTrue(false) + } finally { + qt_select_3 "select * from ${tableName} order by k;" + } + + sql "set enable_nereids_planner=false" + // update key is not allowed + try { + sql "update ${tableName} set k=1, v1=1, v2=1 where k=2;" + assertTrue(false) + } catch (Exception e) { + logger.info(e.getMessage()) + assertTrue(e.getMessage().contains("Only value columns of unique table could be updated")) + } finally { + qt_select_4 "select * from ${tableName} order by k;" + } + + // update key is allowed + try { + sql "update ${tableName} set v1=1, v2=1 where k=2;" + } catch (Exception e) { + logger.info(e.getMessage()) + assertTrue(false) + } finally { + qt_select_5 "select * from ${tableName} order by k;" + } +} \ No newline at end of file diff --git a/regression-test/suites/update/test_update_mow.groovy b/regression-test/suites/update/test_update_mow.groovy index 27a663484f..6b1462f69b 100644 --- a/regression-test/suites/update/test_update_mow.groovy +++ b/regression-test/suites/update/test_update_mow.groovy @@ -16,6 +16,8 @@ // under the License. suite("test_update_mow", "p0") { + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" def tbName1 = "test_update_mow_1" def tbName2 = "test_update_mow_2" def tbName3 = "test_update_mow_3" @@ -127,4 +129,34 @@ suite("test_update_mow", "p0") { qt_sql "select * from ${tableName5} order by k1,k2" sql "DROP TABLE IF EXISTS ${tableName5}" + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "sync" + def tableName6 = "test_update_mow_6" + sql "DROP TABLE IF EXISTS ${tableName6}" + sql """ CREATE TABLE ${tableName6} ( + k1 varchar(100) NOT NULL, + k2 int(11) NOT NULL, + v1 datetime NULL, + v2 varchar(100) NULL, + v3 int NULL) ENGINE=OLAP UNIQUE KEY(k1, k2) COMMENT 'OLAP' + DISTRIBUTED BY HASH(k1, k2) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "enable_unique_key_merge_on_write" = "true", + "light_schema_change" = "true", + "store_row_column" = "true", + "enable_single_replica_compaction" = "false");""" + sql """insert into ${tableName6} values + ("a",1,"2023-11-12 00:00:00","test1",1), + ("b",2,"2023-11-12 00:00:00","test2",2), + ("c",3,"2023-11-12 00:00:00","test3",3);""" + qt_sql "select * from ${tableName6} order by k1,k2" + sql """update ${tableName6} set v3=999 where k1="a" and k2=1;""" + qt_sql "select * from ${tableName6} order by k1,k2" + sql """update ${tableName6} set v2="update value", v1="2022-01-01 00:00:00" where k1="c" and k2=3;""" + qt_sql "select * from ${tableName6} order by k1,k2" + + sql "DROP TABLE IF EXISTS ${tableName6}" }