From 415c6d854d5e01136abc808076b3722f70603d67 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Fri, 15 Dec 2023 00:04:29 +0800 Subject: [PATCH] [fix](partial update) Fix some bugs about partial update (#28358) --- .../doris/analysis/NativeInsertStmt.java | 2 +- .../nereids/rules/analysis/BindSink.java | 25 --------------- .../trees/plans/commands/InsertExecutor.java | 31 ++++++++++++++++++- .../insert_into_table/partial_update.groovy | 2 +- 4 files changed, 32 insertions(+), 28 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index b339b04254..2d9755e317 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -1021,7 +1021,7 @@ public class NativeInsertStmt extends InsertStmt { throw new DdlException("txn does not exist: " + transactionId); } txnState.addTableIndexes((OlapTable) targetTable); - if (!isFromDeleteOrUpdateStmt && isPartialUpdate) { + if (isPartialUpdate) { txnState.setSchemaForPartialUpdate((OlapTable) targetTable); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index ae86fdd768..4ce7350cf2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -105,31 +105,6 @@ public class BindSink implements AnalysisRuleFactory { sink.getDMLCommandType(), child); - if (isPartialUpdate) { - // check the necessary conditions for partial updates - if (!table.getEnableUniqueKeyMergeOnWrite()) { - throw new AnalysisException("Partial update is only allowed on " - + "unique table with merge-on-write enabled."); - } - if (sink.getColNames().isEmpty() && sink.getDMLCommandType() == DMLCommandType.INSERT) { - throw new AnalysisException("You must explicitly specify the columns to be updated when " - + "updating partial columns using the INSERT statement."); - } - for (Column col : table.getFullSchema()) { - boolean exists = false; - for (Column insertCol : boundSink.getCols()) { - if (insertCol.getName().equals(col.getName())) { - exists = true; - break; - } - } - if (col.isKey() && !exists) { - throw new AnalysisException("Partial update should include all key columns, missing: " - + col.getName()); - } - } - } - // we need to insert all the columns of the target table // although some columns are not mentions. // so we add a projects to supply the default value. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java index 2e86771828..bd16071104 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java @@ -23,6 +23,7 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; @@ -52,6 +53,7 @@ import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.types.DataType; @@ -91,6 +93,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; /** @@ -175,7 +178,7 @@ public class InsertExecutor { throw new AnalysisException("txn does not exist: " + txnId); } state.addTableIndexes((OlapTable) table); - if (isPartialUpdate && isFromInsert) { + if (isPartialUpdate) { state.setSchemaForPartialUpdate((OlapTable) table); } } @@ -495,6 +498,31 @@ public class InsertExecutor { */ public static Plan normalizePlan(Plan plan, TableIf table) { UnboundTableSink unboundTableSink = (UnboundTableSink) plan; + + if (table instanceof OlapTable && ((OlapTable) table).getKeysType() == KeysType.UNIQUE_KEYS + && unboundTableSink.isPartialUpdate()) { + // check the necessary conditions for partial updates + OlapTable olapTable = (OlapTable) table; + if (!olapTable.getEnableUniqueKeyMergeOnWrite()) { + throw new AnalysisException("Partial update is only allowed on " + + "unique table with merge-on-write enabled."); + } + if (unboundTableSink.getDMLCommandType() == DMLCommandType.INSERT) { + if (unboundTableSink.getColNames().isEmpty()) { + throw new AnalysisException("You must explicitly specify the columns to be updated when " + + "updating partial columns using the INSERT statement."); + } + for (Column col : olapTable.getFullSchema()) { + Optional insertCol = unboundTableSink.getColNames().stream() + .filter(c -> c.equalsIgnoreCase(col.getName())).findFirst(); + if (col.isKey() && !insertCol.isPresent()) { + throw new AnalysisException("Partial update should include all key columns, missing: " + + col.getName()); + } + } + } + } + Plan query = unboundTableSink.child(); if (!(query instanceof LogicalInlineTable)) { return plan; @@ -502,6 +530,7 @@ public class InsertExecutor { LogicalInlineTable logicalInlineTable = (LogicalInlineTable) query; ImmutableList.Builder oneRowRelationBuilder = ImmutableList.builder(); List columns = table.getBaseSchema(false); + for (List values : logicalInlineTable.getConstantExprsList()) { ImmutableList.Builder constantExprs = ImmutableList.builder(); if (values.isEmpty()) { diff --git a/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy b/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy index 8c2236afd1..5fcf4e63de 100644 --- a/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy +++ b/regression-test/suites/nereids_p0/insert_into_table/partial_update.groovy @@ -56,7 +56,7 @@ suite("nereids_partial_update_native_insert_stmt", "p0") { qt_1 """ select * from ${tableName} order by id; """ test { sql """insert into ${tableName} values(2,400),(1,200),(4,400)""" - exception "" + exception "You must explicitly specify the columns to be updated when updating partial columns using the INSERT statement." } sql "set enable_unique_key_partial_update=false;" sql "sync;"