diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index e9d9c152aa..c3080fd14d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -1361,7 +1361,7 @@ public class NativeInsertStmt extends InsertStmt { if (hasEmptyTargetColumns) { return; } - boolean hasMissingColExceptAutoInc = false; + boolean hasMissingColExceptAutoIncKey = false; for (Column col : olapTable.getFullSchema()) { boolean exists = false; for (Column insertCol : targetColumns) { @@ -1374,16 +1374,16 @@ public class NativeInsertStmt extends InsertStmt { break; } } - if (!exists && !col.isAutoInc()) { - if (col.isKey()) { + if (!exists) { + if (col.isKey() && !col.isAutoInc()) { throw new UserException("Partial update should include all key columns, missing: " + col.getName()); } - if (col.isVisible()) { - hasMissingColExceptAutoInc = true; + if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) { + hasMissingColExceptAutoIncKey = true; } } } - if (!hasMissingColExceptAutoInc) { + if (!hasMissingColExceptAutoIncKey) { return; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java index 6a283ca023..48ea98ff9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java @@ -276,21 +276,19 @@ public class InsertUtils { if (unboundLogicalSink.getColNames().isEmpty()) { ((UnboundTableSink) unboundLogicalSink).setPartialUpdate(false); } else { - boolean hasMissingColExceptAutoInc = false; + boolean hasMissingColExceptAutoIncKey = false; for (Column col : olapTable.getFullSchema()) { Optional insertCol = unboundLogicalSink.getColNames().stream() .filter(c -> c.equalsIgnoreCase(col.getName())).findFirst(); - if (!col.isAutoInc() && !insertCol.isPresent()) { - if (col.isKey()) { - throw new AnalysisException("Partial update should include all key columns," - + " missing: " + col.getName()); - } - if (col.isVisible()) { - hasMissingColExceptAutoInc = true; - } + if (col.isKey() && !col.isAutoInc() && !insertCol.isPresent()) { + throw new AnalysisException("Partial update should include all key columns," + + " missing: " + col.getName()); + } + if (!(col.isAutoInc() && col.isKey()) && !insertCol.isPresent() && col.isVisible()) { + hasMissingColExceptAutoIncKey = true; } } - if (!hasMissingColExceptAutoInc) { + if (!hasMissingColExceptAutoIncKey) { ((UnboundTableSink) unboundLogicalSink).setPartialUpdate(false); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 3b012b173a..d89f7b5579 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -148,6 +148,13 @@ public class StreamLoadPlanner { if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) { throw new UserException("Only unique key merge on write support partial update"); } + + // try to convert to upsert if only has missing auto-increment key column + boolean hasMissingColExceptAutoIncKey = false; + if (taskInfo.getColumnExprDescs().descs.isEmpty()) { + isPartialUpdate = false; + } + HashSet partialUpdateInputColumns = new HashSet<>(); if (isPartialUpdate) { for (Column col : destTable.getFullSchema()) { @@ -172,14 +179,23 @@ public class StreamLoadPlanner { break; } } - if (col.isKey() && !existInExpr) { - throw new UserException("Partial update should include all key columns, missing: " + col.getName()); + if (!existInExpr) { + if (col.isKey() && !col.isAutoInc()) { + throw new UserException("Partial update should include all key columns, missing: " + + col.getName()); + } + if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) { + hasMissingColExceptAutoIncKey = true; + } } } if (taskInfo.getMergeType() == LoadTask.MergeType.DELETE) { partialUpdateInputColumns.add(Column.DELETE_SIGN); } } + if (isPartialUpdate && !hasMissingColExceptAutoIncKey) { + isPartialUpdate = false; + } // here we should be full schema to fill the descriptor table for (Column col : destTable.getFullSchema()) { if (isPartialUpdate && !partialUpdateInputColumns.contains(col.getName())) { @@ -247,7 +263,7 @@ public class StreamLoadPlanner { // The load id will pass to csv reader to find the stream load context from new load stream manager fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, BrokerDesc.createForStreamLoad(), fileGroup, fileStatus, taskInfo.isStrictMode(), taskInfo.getFileType(), taskInfo.getHiddenColumns(), - taskInfo.isPartialUpdate()); + isPartialUpdate); scanNode = fileScanNode; scanNode.init(analyzer); @@ -383,6 +399,13 @@ public class StreamLoadPlanner { if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) { throw new UserException("Only unique key merge on write support partial update"); } + + // try to convert to upsert if only has missing auto-increment key column + boolean hasMissingColExceptAutoIncKey = false; + if (taskInfo.getColumnExprDescs().descs.isEmpty()) { + isPartialUpdate = false; + } + HashSet partialUpdateInputColumns = new HashSet<>(); if (isPartialUpdate) { for (Column col : destTable.getFullSchema()) { @@ -407,14 +430,23 @@ public class StreamLoadPlanner { break; } } - if (col.isKey() && !existInExpr) { - throw new UserException("Partial update should include all key columns, missing: " + col.getName()); + if (!existInExpr) { + if (col.isKey() && !col.isAutoInc()) { + throw new UserException("Partial update should include all key columns, missing: " + + col.getName()); + } + if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) { + hasMissingColExceptAutoIncKey = true; + } } } if (taskInfo.getMergeType() == LoadTask.MergeType.DELETE) { partialUpdateInputColumns.add(Column.DELETE_SIGN); } } + if (isPartialUpdate && !hasMissingColExceptAutoIncKey) { + isPartialUpdate = false; + } // here we should be full schema to fill the descriptor table for (Column col : destTable.getFullSchema()) { if (isPartialUpdate && !partialUpdateInputColumns.contains(col.getName())) { diff --git a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc1.csv b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc1.csv new file mode 100644 index 0000000000..63b02b8a30 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc1.csv @@ -0,0 +1,2 @@ +doris3 +doris4 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc2.csv b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc2.csv new file mode 100644 index 0000000000..737c9a056f --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc2.csv @@ -0,0 +1,2 @@ +102,doris8 +103,doris9 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc3.csv b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc3.csv new file mode 100644 index 0000000000..c06e9e00d0 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc3.csv @@ -0,0 +1,2 @@ +104,"doris10" +105,"doris11" \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc4.csv b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc4.csv new file mode 100644 index 0000000000..3a227dba5f --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/partial_update_autoinc4.csv @@ -0,0 +1,2 @@ +2,888,888 +3,888,888 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out index 380575499e..d157f501a8 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.out @@ -2,64 +2,72 @@ -- !select_1 -- doris1 doris2 +doris3 +doris4 -- !select_2 -- -2 +4 -- !select_3 -- +"doris10" +"doris11" doris1 doris2 doris3 doris4 +doris5 +doris7 +doris8 +doris9 -- !select_4 -- -4 +10 + +-- !select_5 -- +1 10 10 10 +2 20 20 20 +3 30 30 30 +4 40 40 40 + +-- !select_6 -- +1 99 99 10 +2 888 888 20 +3 888 888 30 +4 40 40 40 -- !select_1 -- doris1 doris2 +doris3 +doris4 -- !select_2 -- -2 +4 -- !select_3 -- +"doris10" +"doris11" doris1 doris2 doris3 doris4 +doris5 +doris7 +doris8 +doris9 -- !select_4 -- -4 +10 --- !select_1 -- -doris1 -doris2 +-- !select_5 -- +1 10 10 10 +2 20 20 20 +3 30 30 30 +4 40 40 40 --- !select_2 -- -2 - --- !select_3 -- -doris1 -doris2 -doris3 -doris4 - --- !select_4 -- -4 - --- !select_1 -- -doris1 -doris2 - --- !select_2 -- -2 - --- !select_3 -- -doris1 -doris2 -doris3 -doris4 - --- !select_4 -- -4 +-- !select_6 -- +1 99 99 10 +2 888 888 20 +3 888 888 30 +4 40 40 40 diff --git a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy index 7d1efdc978..8598d791e0 100644 --- a/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy +++ b/regression-test/suites/fault_injection_p0/test_delete_from_timeout.groovy @@ -53,7 +53,7 @@ suite("test_delete_from_timeout","nonConcurrent") { GetDebugPoint().disableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail") } - sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """ + sql """delete from ${tableName} where col1 = "false" and col3 = "-25"; """ t1.join() qt_sql "select * from ${tableName} order by col1, col2, col3;" diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy index d0d1ecf954..ec46939b2f 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_auto_inc.groovy @@ -1,4 +1,3 @@ - // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -20,48 +19,100 @@ suite("test_partial_update_auto_inc") { String db = context.config.getDbNameByFile(context.file) sql "select 1;" // to create database - for (def use_mow : [false, true]) { - for (def use_nereids_planner : [false, true]) { - logger.info("current params: use_mow: ${use_mow}, use_nereids_planner: ${use_nereids_planner}") - connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { - sql "use ${db};" + for (def use_nereids_planner : [false, true]) { + logger.info("current params: use_nereids_planner: ${use_nereids_planner}") + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql "use ${db};" - if (use_nereids_planner) { - sql """ set enable_nereids_dml = true; """ - sql """ set enable_nereids_planner=true; """ - sql """ set enable_fallback_to_original_planner=false; """ - } else { - sql """ set enable_nereids_dml = false; """ - sql """ set enable_nereids_planner = false; """ - } - - // create table - sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc """ - sql """ CREATE TABLE test_primary_key_partial_update_auto_inc ( - `id` BIGINT NOT NULL AUTO_INCREMENT, - `name` varchar(65533) NOT NULL COMMENT "用户姓名" ) - UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 - PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "${use_mow}"); """ - - sql """ set enable_unique_key_partial_update=true; """ - sql """ insert into test_primary_key_partial_update_auto_inc(name) values("doris1"); """ - sql """ set enable_unique_key_partial_update=false; """ - sql """ insert into test_primary_key_partial_update_auto_inc(name) values("doris2"); """ - sql "sync" - - qt_select_1 """ select name from test_primary_key_partial_update_auto_inc order by name; """ - qt_select_2 """ select count(distinct id) from test_primary_key_partial_update_auto_inc; """ - - sql """ set enable_unique_key_partial_update=true; """ - sql """ insert into test_primary_key_partial_update_auto_inc values(100,"doris3"); """ - sql """ set enable_unique_key_partial_update=false; """ - sql """ insert into test_primary_key_partial_update_auto_inc values(101, "doris4"); """ - sql "sync" - qt_select_3 """ select name from test_primary_key_partial_update_auto_inc order by name; """ - qt_select_4 """ select count(distinct id) from test_primary_key_partial_update_auto_inc; """ - - sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc """ + if (use_nereids_planner) { + sql """ set enable_nereids_planner=true; """ + sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_planner = false; """ } + + sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc """ + sql """ CREATE TABLE test_primary_key_partial_update_auto_inc ( + `id` BIGINT NOT NULL AUTO_INCREMENT, + `name` varchar(65533) NOT NULL COMMENT "用户姓名" ) + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); """ + + sql """ set enable_unique_key_partial_update=true; """ + sql "sync" + // insert stmt only misses auto-inc key column + sql """ insert into test_primary_key_partial_update_auto_inc(name) values("doris1"); """ + sql """ set enable_unique_key_partial_update=false; """ + sql "sync" + sql """ insert into test_primary_key_partial_update_auto_inc(name) values("doris2"); """ + // stream load only misses auto-inc key column + streamLoad { + table "test_primary_key_partial_update_auto_inc" + set 'partial_columns', 'true' + set 'column_separator', ',' + set 'columns', 'name' + file 'partial_update_autoinc1.csv' + time 10000 + } + qt_select_1 """ select name from test_primary_key_partial_update_auto_inc order by name; """ + qt_select_2 """ select count(distinct id) from test_primary_key_partial_update_auto_inc; """ + + sql """ set enable_unique_key_partial_update=true; """ + sql "sync" + // insert stmt withou column list + sql """ insert into test_primary_key_partial_update_auto_inc values(100,"doris5"); """ + // insert stmt, column list include all visible columns + sql """ insert into test_primary_key_partial_update_auto_inc(id,name) values(102,"doris6"); """ + sql """ set enable_unique_key_partial_update=false; """ + sql "sync" + sql """ insert into test_primary_key_partial_update_auto_inc values(101, "doris7"); """ + // stream load withou column list + streamLoad { + table "test_primary_key_partial_update_auto_inc" + set 'partial_columns', 'true' + set 'column_separator', ',' + file 'partial_update_autoinc2.csv' + time 10000 + } + // stream load, column list include all visible columns + streamLoad { + table "test_primary_key_partial_update_auto_inc" + set 'partial_columns', 'true' + set 'column_separator', ',' + set 'columns', 'id,name' + file 'partial_update_autoinc3.csv' + time 10000 + } + qt_select_3 """ select name from test_primary_key_partial_update_auto_inc order by name; """ + qt_select_4 """ select count(distinct id) from test_primary_key_partial_update_auto_inc; """ + sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc """ + + + sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc2 """ + sql """ CREATE TABLE test_primary_key_partial_update_auto_inc2 ( + `id` BIGINT NOT NULL, + `c1` int, + `c2` int, + `cid` BIGINT NOT NULL AUTO_INCREMENT) + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); """ + sql "insert into test_primary_key_partial_update_auto_inc2 values(1,10,10,10),(2,20,20,20),(3,30,30,30),(4,40,40,40);" + order_qt_select_5 "select * from test_primary_key_partial_update_auto_inc2" + sql """ set enable_unique_key_partial_update=true; """ + sql "sync;" + // insert stmt only misses auto-inc value column, its value should not change when do partial update + sql "insert into test_primary_key_partial_update_auto_inc2(id,c1,c2) values(1,99,99),(2,99,99);" + // stream load only misses auto-inc value column, its value should not change when do partial update + streamLoad { + table "test_primary_key_partial_update_auto_inc2" + set 'partial_columns', 'true' + set 'column_separator', ',' + set 'columns', 'id,c1,c2' + file 'partial_update_autoinc4.csv' + time 10000 + } + order_qt_select_6 "select * from test_primary_key_partial_update_auto_inc2" + sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc2 """ } } }