diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java index e5a6741895..1bd4e994ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSink.java @@ -27,6 +27,7 @@ import org.apache.doris.nereids.trees.plans.BlockFuncDepsPropagation; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; @@ -50,36 +51,36 @@ public class UnboundTableSink extends LogicalSink partitions; private final boolean isPartialUpdate; - private final boolean isFromNativeInsertStmt; + private final DMLCommandType dmlCommandType; public UnboundTableSink(List nameParts, List colNames, List hints, List partitions, CHILD_TYPE child) { this(nameParts, colNames, hints, false, partitions, - false, false, Optional.empty(), Optional.empty(), child); + false, DMLCommandType.NONE, Optional.empty(), Optional.empty(), child); } public UnboundTableSink(List nameParts, List colNames, List hints, boolean temporaryPartition, List partitions, CHILD_TYPE child) { this(nameParts, colNames, hints, temporaryPartition, partitions, - false, false, Optional.empty(), Optional.empty(), child); + false, DMLCommandType.NONE, Optional.empty(), Optional.empty(), child); } public UnboundTableSink(List nameParts, List colNames, List hints, List partitions, boolean isPartialUpdate, CHILD_TYPE child) { - this(nameParts, colNames, hints, false, partitions, isPartialUpdate, false, + this(nameParts, colNames, hints, false, partitions, isPartialUpdate, DMLCommandType.NONE, Optional.empty(), Optional.empty(), child); } public UnboundTableSink(List nameParts, List colNames, List hints, boolean temporaryPartition, List partitions, boolean isPartialUpdate, CHILD_TYPE child) { - this(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, false, + this(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, DMLCommandType.NONE, Optional.empty(), Optional.empty(), child); } public UnboundTableSink(List nameParts, List colNames, List hints, boolean temporaryPartition, List partitions, - boolean isPartialUpdate, boolean isFromNativeInsertStmt, CHILD_TYPE child) { - this(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, isFromNativeInsertStmt, + boolean isPartialUpdate, DMLCommandType dmlCommandType, CHILD_TYPE child) { + this(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, dmlCommandType, Optional.empty(), Optional.empty(), child); } @@ -88,7 +89,7 @@ public class UnboundTableSink extends LogicalSink nameParts, List colNames, List hints, boolean temporaryPartition, List partitions, - boolean isPartialUpdate, boolean isFromNativeInsertStmt, + boolean isPartialUpdate, DMLCommandType dmlCommandType, Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, logicalProperties, child); @@ -98,7 +99,7 @@ public class UnboundTableSink extends LogicalSink getColNames() { @@ -125,15 +126,15 @@ public class UnboundTableSink extends LogicalSink children) { Preconditions.checkArgument(children.size() == 1, "UnboundOlapTableSink only accepts one child"); return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, - isFromNativeInsertStmt, groupExpression, Optional.empty(), children.get(0)); + dmlCommandType, groupExpression, Optional.empty(), children.get(0)); } @Override @@ -169,14 +170,14 @@ public class UnboundTableSink extends LogicalSink groupExpression) { return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, isPartialUpdate, - isFromNativeInsertStmt, groupExpression, Optional.of(getLogicalProperties()), child()); + dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions, - isPartialUpdate, isFromNativeInsertStmt, groupExpression, logicalProperties, children.get(0)); + isPartialUpdate, dmlCommandType, groupExpression, logicalProperties, children.get(0)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 6b644bc723..eba033e816 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -346,6 +346,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc; import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition; import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo; import org.apache.doris.nereids.trees.plans.commands.info.CreateTableInfo; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.commands.info.DefaultValue; import org.apache.doris.nereids.trees.plans.commands.info.DistributionDescriptor; import org.apache.doris.nereids.trees.plans.commands.info.DropMTMVInfo; @@ -495,7 +496,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor { temporaryPartition, partitions, ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate(), - true, + DMLCommandType.INSERT, plan); LogicalPlan command; if (isOverwrite) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index ca2108c908..ae86fdd768 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -46,6 +46,7 @@ import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink; import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; @@ -101,7 +102,7 @@ public class BindSink implements AnalysisRuleFactory { .map(NamedExpression.class::cast) .collect(ImmutableList.toImmutableList()), isPartialUpdate, - sink.isFromNativeInsertStmt(), + sink.getDMLCommandType(), child); if (isPartialUpdate) { @@ -110,7 +111,7 @@ public class BindSink implements AnalysisRuleFactory { throw new AnalysisException("Partial update is only allowed on " + "unique table with merge-on-write enabled."); } - if (sink.getColNames().isEmpty() && sink.isFromNativeInsertStmt()) { + if (sink.getColNames().isEmpty() && sink.getDMLCommandType() == DMLCommandType.INSERT) { throw new AnalysisException("You must explicitly specify the columns to be updated when " + "updating partial columns using the INSERT statement."); } @@ -168,7 +169,13 @@ public class BindSink implements AnalysisRuleFactory { } } - if (!haveInputSeqCol && !isPartialUpdate) { + // Don't require user to provide sequence column for partial updates, + // including the following cases: + // 1. it's a load job with `partial_columns=true` + // 2. UPDATE and DELETE, planner will automatically add these hidden columns + if (!haveInputSeqCol && !isPartialUpdate && ( + boundSink.getDmlCommandType() != DMLCommandType.UPDATE + && boundSink.getDmlCommandType() != DMLCommandType.DELETE)) { if (!seqColInTable.isPresent() || seqColInTable.get().getDefaultValue() == null || !seqColInTable.get().getDefaultValue() .equals(DefaultValue.CURRENT_TIMESTAMP)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java index 2c727ebb8b..37fefecb9e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapTableSinkToPhysicalOlapTableSink.java @@ -41,7 +41,7 @@ public class LogicalOlapTableSinkToPhysicalOlapTableSink extends OneImplementati sink.getOutputExprs(), ctx.connectContext.getSessionVariable().isEnableSingleReplicaInsert(), sink.isPartialUpdate(), - sink.isFromNativeInsertStmt(), + sink.getDmlCommandType(), Optional.empty(), sink.getLogicalProperties(), sink.child()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteCommand.java index 92ac505e42..8c0568724b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/DeleteCommand.java @@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral; import org.apache.doris.nereids.trees.plans.Explainable; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -121,7 +122,7 @@ public class DeleteCommand extends Command implements ForwardWithSync, Explainab // make UnboundTableSink return new UnboundTableSink<>(nameParts, cols, ImmutableList.of(), - partitions, isPartialUpdate, logicalQuery); + false, partitions, isPartialUpdate, DMLCommandType.DELETE, logicalQuery); } public LogicalPlan getLogicalQuery() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java index 9b16497d3c..2e86771828 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertExecutor.java @@ -474,7 +474,12 @@ public class InsertExecutor { .setTimeout((int) timeoutSecond) .setTimezone(timeZone) .setSendBatchParallelism(sendBatchParallelism) - .setTrimDoubleQuotes(true); + .setTrimDoubleQuotes(true) + .setSequenceCol(columns.stream() + .filter(c -> Column.SEQUENCE_COL.equalsIgnoreCase(c.getName())) + .map(Column::getName) + .findFirst() + .orElse(null)); // execute begin txn InsertStreamTxnExecutor executor = new InsertStreamTxnExecutor(txnEntry); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java index a14f0f4285..d555bede4c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java @@ -36,6 +36,7 @@ import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Explainable; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -156,7 +157,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, physicalOlapTableSink.getTargetTable(), label, planner); insertExecutor.beginTransaction(); insertExecutor.finalizeSink(sink, physicalOlapTableSink.isPartialUpdate(), - physicalOlapTableSink.isFromNativeInsertStmt()); + physicalOlapTableSink.getDmlCommandType() == DMLCommandType.INSERT); } finally { targetTableIf.readUnlock(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertOverwriteTableCommand.java index 47fee19c00..84d3ffc39c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertOverwriteTableCommand.java @@ -199,7 +199,7 @@ public class InsertOverwriteTableCommand extends Command implements ForwardWithS true, tempPartitionNames, sink.isPartialUpdate(), - sink.isFromNativeInsertStmt(), + sink.getDMLCommandType(), (LogicalPlan) (sink.child(0))); new InsertIntoTableCommand(copySink, labelName).run(ctx, executor); if (ctx.getState().getStateType() == MysqlStateType.ERR) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java index ac62faa1cd..ed4133ffd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/LoadCommand.java @@ -44,6 +44,7 @@ import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.commands.info.BulkLoadDataDesc; import org.apache.doris.nereids.trees.plans.commands.info.BulkStorageDesc; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalCheckPolicy; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; @@ -226,7 +227,7 @@ public class LoadCommand extends Command implements ForwardWithSync { boolean isPartialUpdate = olapTable.getEnableUniqueKeyMergeOnWrite() && sinkCols.size() < olapTable.getColumns().size(); return new UnboundTableSink<>(dataDesc.getNameParts(), sinkCols, ImmutableList.of(), - dataDesc.getPartitionNames(), isPartialUpdate, tvfLogicalPlan); + false, dataDesc.getPartitionNames(), isPartialUpdate, DMLCommandType.LOAD, tvfLogicalPlan); } private static void fillDeleteOnColumn(BulkLoadDataDesc dataDesc, OlapTable olapTable, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java index 6c9608122e..28a78021e0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java @@ -33,6 +33,7 @@ import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.plans.Explainable; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; @@ -137,7 +138,7 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab // make UnboundTableSink return new UnboundTableSink<>(nameParts, ImmutableList.of(), ImmutableList.of(), - ImmutableList.of(), isPartialUpdate, logicalQuery); + false, ImmutableList.of(), isPartialUpdate, DMLCommandType.UPDATE, logicalQuery); } private void checkTable(ConnectContext ctx) throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/DMLCommandType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/DMLCommandType.java new file mode 100644 index 0000000000..18d8179abe --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/DMLCommandType.java @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.info; + +/** + * Type of DML Command. + * For Unique Key table, we need to process some + * hidden columns in different way for different DML type. + */ +public enum DMLCommandType { + // Not a DML command + NONE, + // for INSERT INTO or INSERT INTO SELECT + INSERT, + // for UPDATE + UPDATE, + // for DELETE + DELETE, + // for all other load jobs, including Stream Load, Broker Load, S3 Load + // Routine Load etc. + LOAD +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java index 664ac8f993..a1147a9617 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapTableSink.java @@ -27,6 +27,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.PropagateFuncDeps; import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; @@ -48,12 +49,12 @@ public class LogicalOlapTableSink extends LogicalSink cols; private final List partitionIds; private final boolean isPartialUpdate; - private final boolean isFromNativeInsertStmt; + private final DMLCommandType dmlCommandType; public LogicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, - List outputExprs, boolean isPartialUpdate, boolean isFromNativeInsertStmt, + List outputExprs, boolean isPartialUpdate, DMLCommandType dmlCommandType, CHILD_TYPE child) { - this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, isFromNativeInsertStmt, + this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, dmlCommandType, Optional.empty(), Optional.empty(), child); } @@ -62,14 +63,14 @@ public class LogicalOlapTableSink extends LogicalSink cols, List partitionIds, List outputExprs, boolean isPartialUpdate, - boolean isFromNativeInsertStmt, Optional groupExpression, + DMLCommandType dmlCommandType, Optional groupExpression, Optional logicalProperties, CHILD_TYPE child) { super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, child); this.database = Objects.requireNonNull(database, "database != null in LogicalOlapTableSink"); this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalOlapTableSink"); this.cols = Utils.copyRequiredList(cols); this.isPartialUpdate = isPartialUpdate; - this.isFromNativeInsertStmt = isFromNativeInsertStmt; + this.dmlCommandType = dmlCommandType; this.partitionIds = Utils.copyRequiredList(partitionIds); } @@ -78,14 +79,14 @@ public class LogicalOlapTableSink extends LogicalSink(database, targetTable, cols, partitionIds, output, isPartialUpdate, - isFromNativeInsertStmt, Optional.empty(), Optional.empty(), child); + dmlCommandType, Optional.empty(), Optional.empty(), child); } @Override public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "LogicalOlapTableSink only accepts one child"); return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, - isFromNativeInsertStmt, Optional.empty(), Optional.empty(), children.get(0)); + dmlCommandType, Optional.empty(), Optional.empty(), children.get(0)); } public Database getDatabase() { @@ -108,8 +109,8 @@ public class LogicalOlapTableSink extends LogicalSink extends LogicalSink that = (LogicalOlapTableSink) o; - return isPartialUpdate == that.isPartialUpdate && isFromNativeInsertStmt == that.isFromNativeInsertStmt + return isPartialUpdate == that.isPartialUpdate && dmlCommandType == that.dmlCommandType && Objects.equals(database, that.database) && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols) && Objects.equals(partitionIds, that.partitionIds); @@ -133,7 +134,7 @@ public class LogicalOlapTableSink extends LogicalSink extends LogicalSink extends LogicalSink groupExpression) { return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, - isFromNativeInsertStmt, groupExpression, Optional.of(getLogicalProperties()), child()); + dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, - isFromNativeInsertStmt, groupExpression, logicalProperties, children.get(0)); + dmlCommandType, groupExpression, logicalProperties, children.get(0)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java index 736c9d2a60..11a85ed100 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapTableSink.java @@ -34,6 +34,7 @@ import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Sink; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; import org.apache.doris.statistics.Statistics; @@ -60,17 +61,17 @@ public class PhysicalOlapTableSink extends PhysicalSink private final List partitionIds; private final boolean singleReplicaLoad; private final boolean isPartialUpdate; - private final boolean isFromNativeInsertStmt; + private final DMLCommandType dmlCommandType; /** * Constructor */ public PhysicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, List outputExprs, boolean singleReplicaLoad, - boolean isPartialUpdate, boolean isFromNativeInsertStmt, + boolean isPartialUpdate, DMLCommandType dmlCommandType, Optional groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { this(database, targetTable, cols, partitionIds, outputExprs, - singleReplicaLoad, isPartialUpdate, isFromNativeInsertStmt, + singleReplicaLoad, isPartialUpdate, dmlCommandType, groupExpression, logicalProperties, PhysicalProperties.GATHER, null, child); } @@ -79,7 +80,7 @@ public class PhysicalOlapTableSink extends PhysicalSink */ public PhysicalOlapTableSink(Database database, OlapTable targetTable, List cols, List partitionIds, List outputExprs, boolean singleReplicaLoad, - boolean isPartialUpdate, boolean isFromNativeInsertStmt, + boolean isPartialUpdate, DMLCommandType dmlCommandType, Optional groupExpression, LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { super(PlanType.PHYSICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, @@ -90,7 +91,7 @@ public class PhysicalOlapTableSink extends PhysicalSink this.partitionIds = Utils.copyRequiredList(partitionIds); this.singleReplicaLoad = singleReplicaLoad; this.isPartialUpdate = isPartialUpdate; - this.isFromNativeInsertStmt = isFromNativeInsertStmt; + this.dmlCommandType = dmlCommandType; } public Database getDatabase() { @@ -117,15 +118,15 @@ public class PhysicalOlapTableSink extends PhysicalSink return isPartialUpdate; } - public boolean isFromNativeInsertStmt() { - return isFromNativeInsertStmt; + public DMLCommandType getDmlCommandType() { + return dmlCommandType; } @Override public Plan withChildren(List children) { Preconditions.checkArgument(children.size() == 1, "PhysicalOlapTableSink only accepts one child"); return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - singleReplicaLoad, isPartialUpdate, isFromNativeInsertStmt, groupExpression, + singleReplicaLoad, isPartialUpdate, dmlCommandType, groupExpression, getLogicalProperties(), physicalProperties, statistics, children.get(0)); } @@ -140,7 +141,7 @@ public class PhysicalOlapTableSink extends PhysicalSink PhysicalOlapTableSink that = (PhysicalOlapTableSink) o; return singleReplicaLoad == that.singleReplicaLoad && isPartialUpdate == that.isPartialUpdate - && isFromNativeInsertStmt == that.isFromNativeInsertStmt + && dmlCommandType == that.dmlCommandType && Objects.equals(database, that.database) && Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols) @@ -150,7 +151,7 @@ public class PhysicalOlapTableSink extends PhysicalSink @Override public int hashCode() { return Objects.hash(database, targetTable, cols, partitionIds, singleReplicaLoad, - isPartialUpdate, isFromNativeInsertStmt); + isPartialUpdate, dmlCommandType); } @Override @@ -163,7 +164,7 @@ public class PhysicalOlapTableSink extends PhysicalSink "partitionIds", partitionIds, "singleReplicaLoad", singleReplicaLoad, "isPartialUpdate", isPartialUpdate, - "isFromNativeInsertStmt", isFromNativeInsertStmt + "dmlCommandType", dmlCommandType ); } @@ -193,20 +194,20 @@ public class PhysicalOlapTableSink extends PhysicalSink @Override public Plan withGroupExpression(Optional groupExpression) { return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, singleReplicaLoad, - isPartialUpdate, isFromNativeInsertStmt, groupExpression, getLogicalProperties(), child()); + isPartialUpdate, dmlCommandType, groupExpression, getLogicalProperties(), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional groupExpression, Optional logicalProperties, List children) { return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, singleReplicaLoad, - isPartialUpdate, isFromNativeInsertStmt, groupExpression, logicalProperties.get(), children.get(0)); + isPartialUpdate, dmlCommandType, groupExpression, logicalProperties.get(), children.get(0)); } @Override public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, singleReplicaLoad, - isPartialUpdate, isFromNativeInsertStmt, groupExpression, getLogicalProperties(), + isPartialUpdate, dmlCommandType, groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); } @@ -247,7 +248,7 @@ public class PhysicalOlapTableSink extends PhysicalSink @Override public PhysicalOlapTableSink resetLogicalProperties() { return new PhysicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, singleReplicaLoad, - isPartialUpdate, isFromNativeInsertStmt, groupExpression, + isPartialUpdate, dmlCommandType, groupExpression, null, physicalProperties, statistics, child()); } } diff --git a/regression-test/data/data_model_p0/unique/test_unique_table_new_sequence.out b/regression-test/data/data_model_p0/unique/test_unique_table_new_sequence.out index 5bb6122519..5c37168a5e 100644 --- a/regression-test/data/data_model_p0/unique/test_unique_table_new_sequence.out +++ b/regression-test/data/data_model_p0/unique/test_unique_table_new_sequence.out @@ -70,3 +70,74 @@ __DORIS_SEQUENCE_COL__ INT Yes false \N REPLACE 2 20 20 20 20 0 3 20 3 3 3 3 3 0 2 3 +-- !all -- +1 4 11 12 13 +2 5 12 13 14 +3 6 13 14 15 + +-- !count -- +3 + +-- !part -- +1 2 15 +2 5 12 +3 6 13 + +-- !all -- +1 2 15 16 17 +2 5 12 13 14 +3 6 13 14 15 + +-- !count -- +3 + +-- !part -- +1 10 15 +2 5 14 +3 6 11 + +-- !all -- +1 10 15 16 17 +2 5 14 13 14 +3 6 11 14 15 + +-- !count -- +4 + +-- !part -- +1 10 15 +15 8 19 +2 5 14 +3 6 11 + +-- !all -- +1 10 15 16 17 0 4 15 +15 8 19 20 21 0 7 19 +2 5 14 13 14 0 5 12 +3 6 11 14 15 0 6 13 + +-- !desc -- +k1 INT Yes true \N +v1 TINYINT Yes false \N REPLACE +v2 INT Yes false \N REPLACE +v3 INT Yes false \N REPLACE +v4 INT Yes false \N REPLACE +__DORIS_DELETE_SIGN__ TINYINT No false 0 REPLACE +__DORIS_VERSION_COL__ BIGINT No false 0 REPLACE +__DORIS_SEQUENCE_COL__ INT Yes false \N REPLACE + +-- !1 -- +1 1 1 1 1 0 2 1 +2 2 2 2 2 0 2 2 +3 3 3 3 3 0 2 3 + +-- !2 -- +1 1 1 1 1 0 2 1 +2 20 20 20 20 0 3 20 +3 3 3 3 3 0 2 3 + +-- !3 -- +1 1 1 1 1 0 2 1 +2 20 20 20 20 0 3 20 +3 3 3 3 3 0 2 3 + diff --git a/regression-test/data/data_model_p0/unique/test_unique_table_sequence.out b/regression-test/data/data_model_p0/unique/test_unique_table_sequence.out index 4f1d633250..5197721c92 100644 --- a/regression-test/data/data_model_p0/unique/test_unique_table_sequence.out +++ b/regression-test/data/data_model_p0/unique/test_unique_table_sequence.out @@ -60,3 +60,64 @@ 2 20 20 20 20 0 3 20 3 3 3 3 3 0 2 3 +-- !all -- +1 4 11 12 13 +2 5 12 13 14 +3 6 13 14 15 + +-- !count -- +3 + +-- !part -- +1 2 15 +2 5 12 +3 6 13 + +-- !all -- +1 2 15 16 17 +2 5 12 13 14 +3 6 13 14 15 + +-- !count -- +3 + +-- !part -- +1 10 15 +2 5 14 +3 6 11 + +-- !all -- +1 10 15 16 17 +2 5 14 13 14 +3 6 11 14 15 + +-- !count -- +4 + +-- !part -- +1 10 15 +15 8 19 +2 5 14 +3 6 11 + +-- !all -- +1 10 15 16 17 0 4 15 +15 8 19 20 21 0 7 3 +2 5 14 13 14 0 5 12 +3 6 11 14 15 0 6 13 + +-- !1 -- +1 1 1 1 1 0 2 1 +2 2 2 2 2 0 2 2 +3 3 3 3 3 0 2 3 + +-- !2 -- +1 1 1 1 1 0 2 1 +2 20 20 20 20 0 3 20 +3 3 3 3 3 0 2 3 + +-- !3 -- +1 1 1 1 1 0 2 1 +2 20 20 20 20 0 3 20 +3 3 3 3 3 0 2 3 + diff --git a/regression-test/suites/data_model_p0/unique/test_unique_table_new_sequence.groovy b/regression-test/suites/data_model_p0/unique/test_unique_table_new_sequence.groovy index 861c6878e1..a2198924f3 100644 --- a/regression-test/suites/data_model_p0/unique/test_unique_table_new_sequence.groovy +++ b/regression-test/suites/data_model_p0/unique/test_unique_table_new_sequence.groovy @@ -16,167 +16,172 @@ // under the License. suite("test_unique_table_new_sequence") { - def tableName = "test_uniq_new_sequence" - sql """ DROP TABLE IF EXISTS ${tableName} """ - sql """ - CREATE TABLE IF NOT EXISTS ${tableName} ( - `k1` int NULL, - `v1` tinyint NULL, - `v2` int, - `v3` int, - `v4` int - ) ENGINE=OLAP - UNIQUE KEY(k1) - DISTRIBUTED BY HASH(`k1`) BUCKETS 3 - PROPERTIES ( - "function_column.sequence_col" = "v2", - "replication_allocation" = "tag.location.default: 1", - "light_schema_change" = "true" - ); - """ - // test streamload with seq col - streamLoad { - table "${tableName}" + for (def enable_fall_back : [false, true]) { + def tableName = "test_uniq_new_sequence" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int NULL, + `v1` tinyint NULL, + `v2` int, + `v3` int, + `v4` int + ) ENGINE=OLAP + UNIQUE KEY(k1) + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ( + "function_column.sequence_col" = "v2", + "replication_allocation" = "tag.location.default: 1", + "light_schema_change" = "true" + ); + """ + sql "set enable_fallback_to_original_planner=${enable_fall_back}" + // test streamload with seq col + streamLoad { + table "${tableName}" - set 'column_separator', ',' - set 'columns', 'k1,v1,v2,v3,v4' + set 'column_separator', ',' + set 'columns', 'k1,v1,v2,v3,v4' - file 'unique_key_data1.csv' - time 10000 // limit inflight 10s + file 'unique_key_data1.csv' + time 10000 // limit inflight 10s - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(4, json.NumberTotalRows) + assertEquals(4, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(4, json.NumberTotalRows) - assertEquals(4, json.NumberLoadedRows) - assertEquals(0, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) } - } - sql "sync" - order_qt_all "SELECT * from ${tableName}" + sql "sync" + order_qt_all "SELECT * from ${tableName}" - // test update data, using streamload with seq col - streamLoad { - table "${tableName}" + // test update data, using streamload with seq col + streamLoad { + table "${tableName}" - set 'column_separator', ',' - set 'columns', 'k1,v1,v2,v3,v4' + set 'column_separator', ',' + set 'columns', 'k1,v1,v2,v3,v4' - file 'unique_key_data2.csv' - time 10000 // limit inflight 10s + file 'unique_key_data2.csv' + time 10000 // limit inflight 10s - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(4, json.NumberTotalRows) + assertEquals(4, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(4, json.NumberTotalRows) - assertEquals(4, json.NumberLoadedRows) - assertEquals(0, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) } + sql "sync" + + qt_count "SELECT COUNT(*) from ${tableName}" + + order_qt_part "SELECT k1, v1, v2 from ${tableName}" + + order_qt_all "SELECT * from ${tableName}" + + sql "UPDATE ${tableName} SET v1 = 10 WHERE k1 = 1" + + sql "UPDATE ${tableName} SET v2 = 14 WHERE k1 = 2" + + sql "UPDATE ${tableName} SET v2 = 11 WHERE k1 = 3" + + sql "sync" + + qt_count "SELECT COUNT(*) from ${tableName}" + + order_qt_part "SELECT k1, v1, v2 from ${tableName}" + + order_qt_all "SELECT * from ${tableName}" + + // test insert into with column list, which not contains the seq mapping column v2 + test { + sql "INSERT INTO ${tableName} (k1, v1, v3, v4) values(15, 8, 20, 21)" + exception "Table ${tableName} has sequence column, need to specify the sequence column" + } + + // test insert into without column list + sql "INSERT INTO ${tableName} values(15, 8, 19, 20, 21)" + + // test insert into with column list + sql "INSERT INTO ${tableName} (k1, v1, v2, v3, v4) values(15, 9, 18, 21, 22)" + + sql "SET show_hidden_columns=true" + + sql "sync" + + qt_count "SELECT COUNT(*) from ${tableName}" + + order_qt_part "SELECT k1, v1, v2 from ${tableName}" + + order_qt_all "SELECT * from ${tableName}" + + qt_desc "desc ${tableName}" + + sql "DROP TABLE ${tableName}" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int NULL, + `v1` tinyint NULL, + `v2` int, + `v3` int, + `v4` int + ) ENGINE=OLAP + UNIQUE KEY(k1) + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ( + "function_column.sequence_col" = "v4", + "replication_allocation" = "tag.location.default: 1", + "light_schema_change" = "true" + ); + """ + + // test insert into with column list, which not contains the seq mapping column v4 + // in begin/commit + sql "begin;" + test { + sql "INSERT INTO ${tableName} (k1, v1, v2, v3) values(1,1,1,1)" + exception "Table ${tableName} has sequence column, need to specify the sequence column" + } + sql "commit;" + + // test insert into without column list, in begin/commit + sql "begin;" + sql "insert into ${tableName} values (1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3);" + sql "commit;" + + qt_1 "select * from ${tableName} order by k1;" + + sql "begin;" + sql "insert into ${tableName} (k1, v1, v2, v3, v4) values (2,20,20,20,20);" + sql "commit;" + + qt_2 "select * from ${tableName} order by k1;" + + sql "begin;" + sql "insert into ${tableName} (k1, v1, v2, v3, v4) values (3,30,30,30,1);" + sql "commit;" + + qt_3 "select * from ${tableName} order by k1" + + sql "SET show_hidden_columns=false" + + sql "DROP TABLE ${tableName}" } - sql "sync" - - qt_count "SELECT COUNT(*) from ${tableName}" - - order_qt_part "SELECT k1, v1, v2 from ${tableName}" - - order_qt_all "SELECT * from ${tableName}" - - sql "UPDATE ${tableName} SET v1 = 10 WHERE k1 = 1" - - sql "UPDATE ${tableName} SET v2 = 14 WHERE k1 = 2" - - sql "UPDATE ${tableName} SET v2 = 11 WHERE k1 = 3" - - sql "sync" - - qt_count "SELECT COUNT(*) from ${tableName}" - - order_qt_part "SELECT k1, v1, v2 from ${tableName}" - - order_qt_all "SELECT * from ${tableName}" - - // test insert into with column list, which not contains the seq mapping column v2 - test { - sql "INSERT INTO ${tableName} (k1, v1, v3, v4) values(15, 8, 20, 21)" - exception "Table ${tableName} has sequence column, need to specify the sequence column" - } - - // test insert into without column list - sql "INSERT INTO ${tableName} values(15, 8, 19, 20, 21)" - - // test insert into with column list - sql "INSERT INTO ${tableName} (k1, v1, v2, v3, v4) values(15, 9, 18, 21, 22)" - - sql "SET show_hidden_columns=true" - - sql "sync" - - qt_count "SELECT COUNT(*) from ${tableName}" - - order_qt_part "SELECT k1, v1, v2 from ${tableName}" - - order_qt_all "SELECT * from ${tableName}" - - qt_desc "desc ${tableName}" - - sql "DROP TABLE ${tableName}" - - sql """ DROP TABLE IF EXISTS ${tableName} """ - sql """ - CREATE TABLE IF NOT EXISTS ${tableName} ( - `k1` int NULL, - `v1` tinyint NULL, - `v2` int, - `v3` int, - `v4` int - ) ENGINE=OLAP - UNIQUE KEY(k1) - DISTRIBUTED BY HASH(`k1`) BUCKETS 3 - PROPERTIES ( - "function_column.sequence_col" = "v4", - "replication_allocation" = "tag.location.default: 1", - "light_schema_change" = "true" - ); - """ - - // test insert into with column list, which not contains the seq mapping column v4 - // in begin/commit - sql "begin;" - test { - sql "INSERT INTO ${tableName} (k1, v1, v2, v3) values(1,1,1,1)" - exception "Table ${tableName} has sequence column, need to specify the sequence column" - } - sql "commit;" - - // test insert into without column list, in begin/commit - sql "begin;" - sql "insert into ${tableName} values (1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3);" - sql "commit;" - - qt_1 "select * from ${tableName} order by k1;" - - sql "begin;" - sql "insert into ${tableName} (k1, v1, v2, v3, v4) values (2,20,20,20,20);" - sql "commit;" - - qt_2 "select * from ${tableName} order by k1;" - - sql "begin;" - sql "insert into ${tableName} (k1, v1, v2, v3, v4) values (3,30,30,30,1);" - sql "commit;" - - qt_3 "select * from ${tableName} order by k1" - - sql "DROP TABLE ${tableName}" } diff --git a/regression-test/suites/data_model_p0/unique/test_unique_table_sequence.groovy b/regression-test/suites/data_model_p0/unique/test_unique_table_sequence.groovy index 47ce7fde4d..61d9069199 100644 --- a/regression-test/suites/data_model_p0/unique/test_unique_table_sequence.groovy +++ b/regression-test/suites/data_model_p0/unique/test_unique_table_sequence.groovy @@ -16,182 +16,182 @@ // under the License. suite("test_unique_table_sequence") { + for (def enable_fall_back : [false, true]) { + def tableName = "test_uniq_sequence" + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int NULL, + `v1` tinyint NULL, + `v2` int, + `v3` int, + `v4` int + ) ENGINE=OLAP + UNIQUE KEY(k1) + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ( + "function_column.sequence_type" = "int", + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql "set enable_fallback_to_original_planner=${enable_fall_back}" + // test streamload with seq col + streamLoad { + table "${tableName}" - // TODO: remove this when nereids could pass this test - sql "set enable_nereids_planner=false" + set 'column_separator', ',' + set 'columns', 'k1,v1,v2,v3,v4' + set 'function_column.sequence_col', 'v2' - def tableName = "test_uniq_sequence" - sql "DROP TABLE IF EXISTS ${tableName}" - sql """ - CREATE TABLE IF NOT EXISTS ${tableName} ( - `k1` int NULL, - `v1` tinyint NULL, - `v2` int, - `v3` int, - `v4` int - ) ENGINE=OLAP - UNIQUE KEY(k1) - DISTRIBUTED BY HASH(`k1`) BUCKETS 3 - PROPERTIES ( - "function_column.sequence_type" = "int", - "replication_allocation" = "tag.location.default: 1" - ); - """ - // test streamload with seq col - streamLoad { - table "${tableName}" + file 'unique_key_data1.csv' + time 10000 // limit inflight 10s - set 'column_separator', ',' - set 'columns', 'k1,v1,v2,v3,v4' - set 'function_column.sequence_col', 'v2' - - file 'unique_key_data1.csv' - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(4, json.NumberTotalRows) + assertEquals(4, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(4, json.NumberTotalRows) - assertEquals(4, json.NumberLoadedRows) - assertEquals(0, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) } - } - sql "sync" - order_qt_all "SELECT * from ${tableName}" + sql "sync" + order_qt_all "SELECT * from ${tableName}" - // test update data, using streamload with seq col - streamLoad { - table "${tableName}" + // test update data, using streamload with seq col + streamLoad { + table "${tableName}" - set 'column_separator', ',' - set 'columns', 'k1,v1,v2,v3,v4' - set 'function_column.sequence_col', 'v2' + set 'column_separator', ',' + set 'columns', 'k1,v1,v2,v3,v4' + set 'function_column.sequence_col', 'v2' - file 'unique_key_data2.csv' - time 10000 // limit inflight 10s + file 'unique_key_data2.csv' + time 10000 // limit inflight 10s - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(4, json.NumberTotalRows) + assertEquals(4, json.NumberLoadedRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(4, json.NumberTotalRows) - assertEquals(4, json.NumberLoadedRows) - assertEquals(0, json.NumberFilteredRows) - assertEquals(0, json.NumberUnselectedRows) } + sql "sync" + + qt_count "SELECT COUNT(*) from ${tableName}" + + order_qt_part "SELECT k1, v1, v2 from ${tableName}" + + order_qt_all "SELECT * from ${tableName}" + + // test update on table with seq col + sql "UPDATE ${tableName} SET v1 = 10 WHERE k1 = 1" + + sql "UPDATE ${tableName} SET v2 = 14 WHERE k1 = 2" + + sql "UPDATE ${tableName} SET v2 = 11 WHERE k1 = 3" + + sql "sync" + + qt_count "SELECT COUNT(*) from ${tableName}" + + order_qt_part "SELECT k1, v1, v2 from ${tableName}" + + order_qt_all "SELECT * from ${tableName}" + + // test insert into without column list + test { + sql "INSERT INTO ${tableName} values(15, 8, 19, 20, 21)" + exception "Table ${tableName} has sequence column, need to specify the sequence column" + } + + // test insert into with column list + test { + sql "INSERT INTO ${tableName} (k1, v1, v2, v3, v4) values(15, 8, 19, 20, 21)" + exception "Table ${tableName} has sequence column, need to specify the sequence column" + } + + // correct way of insert into with seq col + sql "INSERT INTO ${tableName} (k1, v1, v2, v3, v4, __DORIS_SEQUENCE_COL__) values(15, 8, 19, 20, 21, 3)" + + sql "INSERT INTO ${tableName} (k1, v1, v2, v3, v4, __DORIS_SEQUENCE_COL__) values(15, 9, 18, 21, 22, 2)" + + sql "SET show_hidden_columns=true" + + sql "sync" + + qt_count "SELECT COUNT(*) from ${tableName}" + + order_qt_part "SELECT k1, v1, v2 from ${tableName}" + + order_qt_all "SELECT * from ${tableName}" + + sql "DROP TABLE ${tableName}" + + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int NULL, + `v1` tinyint NULL, + `v2` int, + `v3` int, + `v4` int + ) ENGINE=OLAP + UNIQUE KEY(k1) + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ( + "function_column.sequence_type" = "int", + "replication_allocation" = "tag.location.default: 1" + ); + """ + + // test insert into without column list, in begin/commit + sql "begin;" + test { + sql "INSERT INTO ${tableName} values(15, 8, 19, 20, 21)" + exception "Table ${tableName} has sequence column, need to specify the sequence column" + } + sql "commit;" + + // test insert into with column list, in begin/commit + sql "begin;" + test { + sql "INSERT INTO ${tableName} (k1, v1, v2, v3, v4) values(15, 8, 19, 20, 21)" + exception "Table ${tableName} has sequence column, need to specify the sequence column" + } + sql "commit;" + + sql "begin;" + sql "insert into ${tableName} (k1, v1, v2, v3, v4, __DORIS_SEQUENCE_COL__) values (1,1,1,1,1,1),(2,2,2,2,2,2),(3,3,3,3,3,3);" + sql "commit;" + + qt_1 "select * from ${tableName} order by k1;" + + sql "begin;" + sql "insert into ${tableName} (k1, v1, v2, v3, v4, __DORIS_SEQUENCE_COL__) values (2,20,20,20,20,20);" + sql "commit;" + + qt_2 "select * from ${tableName} order by k1;" + + sql "begin;" + sql "insert into ${tableName} (k1, v1, v2, v3, v4, __DORIS_SEQUENCE_COL__) values (3,30,30,30,30,1);" + sql "commit;" + + qt_3 "select * from ${tableName} order by k1" + + sql "SET show_hidden_columns=false" + + sql "DROP TABLE ${tableName}" } - sql "sync" - - qt_count "SELECT COUNT(*) from ${tableName}" - - order_qt_part "SELECT k1, v1, v2 from ${tableName}" - - order_qt_all "SELECT * from ${tableName}" - - // test update on table with seq col - sql "UPDATE ${tableName} SET v1 = 10 WHERE k1 = 1" - - sql "UPDATE ${tableName} SET v2 = 14 WHERE k1 = 2" - - sql "UPDATE ${tableName} SET v2 = 11 WHERE k1 = 3" - - sql "sync" - - qt_count "SELECT COUNT(*) from ${tableName}" - - order_qt_part "SELECT k1, v1, v2 from ${tableName}" - - order_qt_all "SELECT * from ${tableName}" - - // test insert into without column list - test { - sql "INSERT INTO ${tableName} values(15, 8, 19, 20, 21)" - exception "Table ${tableName} has sequence column, need to specify the sequence column" - } - - // test insert into with column list - test { - sql "INSERT INTO ${tableName} (k1, v1, v2, v3, v4) values(15, 8, 19, 20, 21)" - exception "Table ${tableName} has sequence column, need to specify the sequence column" - } - - // correct way of insert into with seq col - sql "INSERT INTO ${tableName} (k1, v1, v2, v3, v4, __DORIS_SEQUENCE_COL__) values(15, 8, 19, 20, 21, 3)" - - sql "INSERT INTO ${tableName} (k1, v1, v2, v3, v4, __DORIS_SEQUENCE_COL__) values(15, 9, 18, 21, 22, 2)" - - sql "SET show_hidden_columns=true" - - sql "sync" - - qt_count "SELECT COUNT(*) from ${tableName}" - - order_qt_part "SELECT k1, v1, v2 from ${tableName}" - - order_qt_all "SELECT * from ${tableName}" - - sql "DROP TABLE ${tableName}" - - sql "DROP TABLE IF EXISTS ${tableName}" - sql """ - CREATE TABLE IF NOT EXISTS ${tableName} ( - `k1` int NULL, - `v1` tinyint NULL, - `v2` int, - `v3` int, - `v4` int - ) ENGINE=OLAP - UNIQUE KEY(k1) - DISTRIBUTED BY HASH(`k1`) BUCKETS 3 - PROPERTIES ( - "function_column.sequence_type" = "int", - "replication_allocation" = "tag.location.default: 1" - ); - """ - - // test insert into without column list, in begin/commit - sql "begin;" - test { - sql "INSERT INTO ${tableName} values(15, 8, 19, 20, 21)" - exception "Table ${tableName} has sequence column, need to specify the sequence column" - } - sql "commit;" - - // test insert into with column list, in begin/commit - sql "begin;" - test { - sql "INSERT INTO ${tableName} (k1, v1, v2, v3, v4) values(15, 8, 19, 20, 21)" - exception "Table ${tableName} has sequence column, need to specify the sequence column" - } - sql "commit;" - - sql "begin;" - sql "insert into ${tableName} (k1, v1, v2, v3, v4, __DORIS_SEQUENCE_COL__) values (1,1,1,1,1,1),(2,2,2,2,2,2),(3,3,3,3,3,3);" - sql "commit;" - - qt_1 "select * from ${tableName} order by k1;" - - sql "begin;" - sql "insert into ${tableName} (k1, v1, v2, v3, v4, __DORIS_SEQUENCE_COL__) values (2,20,20,20,20,20);" - sql "commit;" - - qt_2 "select * from ${tableName} order by k1;" - - sql "begin;" - sql "insert into ${tableName} (k1, v1, v2, v3, v4, __DORIS_SEQUENCE_COL__) values (3,30,30,30,30,1);" - sql "commit;" - - qt_3 "select * from ${tableName} order by k1" - - sql "DROP TABLE ${tableName}" - }