From 4b22fc14d5036994c51c02c68697ed08a0e2aa05 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Thu, 23 Nov 2023 16:23:31 +0800 Subject: [PATCH] [Feature](update) Support `update on current_timestamp` (#25884) --- .../Create/CREATE-TABLE.md | 7 +- .../Create/CREATE-TABLE.md | 7 +- .../org/apache/doris/nereids/DorisParser.g4 | 3 +- .../java/org/apache/doris/catalog/Column.java | 36 +++- .../doris/datasource/InternalCatalog.java | 10 + .../translator/PhysicalPlanTranslator.java | 22 -- .../nereids/parser/LogicalPlanBuilder.java | 18 +- .../nereids/rules/analysis/BindSink.java | 112 +++++++--- .../trees/plans/commands/UpdateCommand.java | 10 +- .../plans/commands/info/ColumnDefinition.java | 54 ++++- .../doris/planner/StreamLoadPlanner.java | 3 + .../update_on_current_timestamp.out | 62 ++++++ .../test_update_on_current_timestamp.out | 29 +++ .../update_on_current_timestamp1.csv | 2 + .../update_on_current_timestamp2.csv | 4 + .../update_on_current_timestamp3.csv | 2 + .../update_on_current_timestamp.groovy | 195 ++++++++++++++++++ .../test_update_on_current_timestamp.groovy | 88 ++++++++ 18 files changed, 605 insertions(+), 59 deletions(-) create mode 100644 regression-test/data/nereids_p0/insert_into_table/update_on_current_timestamp.out create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/test_update_on_current_timestamp.out create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/update_on_current_timestamp1.csv create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/update_on_current_timestamp2.csv create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/update_on_current_timestamp3.csv create mode 100644 regression-test/suites/nereids_p0/insert_into_table/update_on_current_timestamp.groovy create mode 100644 regression-test/suites/unique_with_mow_p0/partial_update/test_update_on_current_timestamp.groovy diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md index 97e9eed0d3..6aa9a4cbbb 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md @@ -58,7 +58,7 @@ Column definition list: Column definition: - `column_name column_type [KEY] [aggr_type] [NULL] [AUTO_INCREMENT] [default_value] [column_comment]` + `column_name column_type [KEY] [aggr_type] [NULL] [AUTO_INCREMENT] [default_value] [on update current_timestamp] [column_comment]` * `column_type` @@ -142,6 +142,10 @@ Column definition list: dt DATETIME DEFAULT CURRENT_TIMESTAMP ``` + * `on update current_timestamp` + + To indicate that whether the value of this column should be updated to the current timestamp (`current_timestamp`) when there is an update on the row. The feature is only available on unique table with merge-on-write enabled. Columns with this feature enabled must declare a default value, and the default value must be `current_timestamp`. If the precision of the timestamp is declared here, the timestamp precision in the default value of the column must be the same as the precision declared here." + Example: ``` @@ -152,6 +156,7 @@ Column definition list: v2 BITMAP BITMAP_UNION, v3 HLL HLL_UNION, v4 INT SUM NOT NULL DEFAULT "1" COMMENT "This is column v4" + dt datetime(6) default current_timestamp(6) on update current_timestamp(6) ``` #### index_definition_list diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md index 600859009d..6a9c63f8d6 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md @@ -56,7 +56,7 @@ distribution_desc * `column_definition` 列定义: - `column_name column_type [KEY] [aggr_type] [NULL] [AUTO_INCREMENT] [default_value] [column_comment]` + `column_name column_type [KEY] [aggr_type] [NULL] [AUTO_INCREMENT] [default_value] [on update current_timestamp] [column_comment]` * `column_type` 列类型,支持以下类型: ``` @@ -129,6 +129,10 @@ distribution_desc // 只用于DATETIME类型,导入数据缺失该值时系统将赋予当前时间 dt DATETIME DEFAULT CURRENT_TIMESTAMP ``` + * `on update current_timestamp` + + 是否在该行有列更新时将该列的值更新为当前时间(`current_timestamp`)。该特性只能在开启了merge-on-write的unique表上使用,开启了这个特性的列必须声明默认值,且默认值必须为`current_timestamp`。如果此处声明了时间戳的精度,则该列默认值中的时间戳精度必须与该处的时间戳精度相同。 + 示例: @@ -140,6 +144,7 @@ distribution_desc v2 BITMAP BITMAP_UNION, v3 HLL HLL_UNION, v4 INT SUM NOT NULL DEFAULT "1" COMMENT "This is column v4" + dt datetime(6) default current_timestamp(6) on update current_timestamp(6) ``` #### index_definition_list diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 7e2f4237e1..1dca96c1ec 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -462,7 +462,8 @@ columnDef : colName=identifier type=dataType KEY? (aggType=aggTypeDef)? ((NOT NULL) | NULL)? (DEFAULT (nullValue=NULL | INTEGER_VALUE | stringValue=STRING_LITERAL - | CURRENT_TIMESTAMP (LEFT_PAREN precision=number RIGHT_PAREN)?))? + | CURRENT_TIMESTAMP (LEFT_PAREN defaultValuePrecision=number RIGHT_PAREN)?))? + (ON UPDATE CURRENT_TIMESTAMP (LEFT_PAREN onUpdateValuePrecision=number RIGHT_PAREN)?)? (COMMENT comment=STRING_LITERAL)? ; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index a85e4ec7d6..81c1edc2e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -130,6 +130,12 @@ public class Column implements Writable, GsonPostProcessable { private boolean isCompoundKey = false; + @SerializedName(value = "hasOnUpdateDefaultValue") + private boolean hasOnUpdateDefaultValue = false; + + @SerializedName(value = "onUpdateDefaultValueExprDef") + private DefaultValueExprDef onUpdateDefaultValueExprDef; + public Column() { this.name = ""; this.type = Type.NULL; @@ -170,24 +176,33 @@ public class Column implements Writable, GsonPostProcessable { public Column(String name, Type type, boolean isKey, AggregateType aggregateType, boolean isAllowNull, String defaultValue, String comment) { this(name, type, isKey, aggregateType, isAllowNull, false, defaultValue, comment, true, null, - COLUMN_UNIQUE_ID_INIT_VALUE, defaultValue); + COLUMN_UNIQUE_ID_INIT_VALUE, defaultValue, false, null); } public Column(String name, Type type, boolean isKey, AggregateType aggregateType, boolean isAllowNull, String comment, boolean visible, int colUniqueId) { - this(name, type, isKey, aggregateType, isAllowNull, false, null, comment, visible, null, colUniqueId, null); + this(name, type, isKey, aggregateType, isAllowNull, false, null, comment, visible, null, colUniqueId, null, + false, null); } public Column(String name, Type type, boolean isKey, AggregateType aggregateType, boolean isAllowNull, String defaultValue, String comment, boolean visible, DefaultValueExprDef defaultValueExprDef, int colUniqueId, String realDefaultValue) { this(name, type, isKey, aggregateType, isAllowNull, false, defaultValue, comment, visible, defaultValueExprDef, - colUniqueId, realDefaultValue); + colUniqueId, realDefaultValue, false, null); } public Column(String name, Type type, boolean isKey, AggregateType aggregateType, boolean isAllowNull, boolean isAutoInc, String defaultValue, String comment, boolean visible, DefaultValueExprDef defaultValueExprDef, int colUniqueId, String realDefaultValue) { + this(name, type, isKey, aggregateType, isAllowNull, isAutoInc, defaultValue, comment, visible, + defaultValueExprDef, colUniqueId, realDefaultValue, false, null); + } + + public Column(String name, Type type, boolean isKey, AggregateType aggregateType, boolean isAllowNull, + boolean isAutoInc, String defaultValue, String comment, boolean visible, + DefaultValueExprDef defaultValueExprDef, int colUniqueId, String realDefaultValue, + boolean hasOnUpdateDefaultValue, DefaultValueExprDef onUpdateDefaultValueExprDef) { this.name = name; if (this.name == null) { this.name = ""; @@ -212,6 +227,8 @@ public class Column implements Writable, GsonPostProcessable { this.children = new ArrayList<>(); createChildrenColumn(this.type, this); this.uniqueId = colUniqueId; + this.hasOnUpdateDefaultValue = hasOnUpdateDefaultValue; + this.onUpdateDefaultValueExprDef = onUpdateDefaultValueExprDef; if (type.isAggStateType()) { AggStateType aggState = (AggStateType) type; @@ -244,6 +261,8 @@ public class Column implements Writable, GsonPostProcessable { this.uniqueId = column.getUniqueId(); this.defineExpr = column.getDefineExpr(); this.defineName = column.getDefineName(); + this.hasOnUpdateDefaultValue = column.hasOnUpdateDefaultValue; + this.onUpdateDefaultValueExprDef = column.onUpdateDefaultValueExprDef; } public void createChildrenColumn(Type type, Column column) { @@ -489,6 +508,14 @@ public class Column implements Writable, GsonPostProcessable { } } + public boolean hasOnUpdateDefaultValue() { + return hasOnUpdateDefaultValue; + } + + public Expr getOnUpdateDefaultValueExpr() { + return onUpdateDefaultValueExprDef.getExpr(type); + } + public TColumn toThrift() { TColumn tColumn = new TColumn(); tColumn.setColumnName(removeNamePrefix(this.name)); @@ -766,6 +793,9 @@ public class Column implements Writable, GsonPostProcessable { sb.append(" DEFAULT \"").append(defaultValue).append("\""); } } + if (hasOnUpdateDefaultValue) { + sb.append(" ON UPDATE ").append(defaultValue).append(""); + } if (StringUtils.isNotBlank(comment)) { sb.append(" COMMENT '").append(getComment(true)).append("'"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 6ab7dc2588..aa8d40afc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -2166,6 +2166,16 @@ public class InternalCatalog implements CatalogIf { } olapTable.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction); + // check `update on current_timestamp` + if (!enableUniqueKeyMergeOnWrite) { + for (Column column : baseSchema) { + if (column.hasOnUpdateDefaultValue()) { + throw new DdlException("'ON UPDATE CURRENT_TIMESTAMP' is only supportted" + + " in unique table with merge-on-write enabled."); + } + } + } + // analyze bloom filter columns Set bfColumns = null; double bfFpp = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 363637d70f..b488dbe926 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -379,31 +379,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor partialUpdateCols = new HashSet<>(); boolean isPartialUpdate = olapTableSink.isPartialUpdate(); if (isPartialUpdate) { - OlapTable olapTable = olapTableSink.getTargetTable(); - if (!olapTable.getEnableUniqueKeyMergeOnWrite()) { - throw new AnalysisException("Partial update is only allowed in" - + "unique table with merge-on-write enabled."); - } - for (Column col : olapTable.getFullSchema()) { - boolean exists = false; - for (Column insertCol : olapTableSink.getCols()) { - if (insertCol.getName() != null && insertCol.getName().equals(col.getName())) { - exists = true; - break; - } - } - if (col.isKey() && !exists) { - throw new AnalysisException("Partial update should include all key columns, missing: " - + col.getName()); - } - } for (Column col : olapTableSink.getCols()) { partialUpdateCols.add(col.getName()); } - if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() != null - && partialUpdateCols.contains(olapTable.getSequenceMapCol())) { - partialUpdateCols.add(Column.SEQUENCE_COL); - } } TupleDescriptor olapTuple = context.generateTupleDesc(); List targetTableColumns = olapTableSink.getTargetTable().getFullSchema(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index bbec310e3d..9fdb4635e7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -2156,6 +2156,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor { boolean isNotNull = ctx.NOT() != null; String aggTypeString = ctx.aggType != null ? ctx.aggType.getText() : null; Optional defaultValue = Optional.empty(); + Optional onUpdateDefaultValue = Optional.empty(); if (ctx.DEFAULT() != null) { if (ctx.INTEGER_VALUE() != null) { defaultValue = Optional.of(new DefaultValue(ctx.INTEGER_VALUE().getText())); @@ -2164,14 +2165,24 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor { } else if (ctx.nullValue != null) { defaultValue = Optional.of(DefaultValue.NULL_DEFAULT_VALUE); } else if (ctx.CURRENT_TIMESTAMP() != null) { - if (ctx.precision == null) { + if (ctx.defaultValuePrecision == null) { defaultValue = Optional.of(DefaultValue.CURRENT_TIMESTAMP_DEFAULT_VALUE); } else { defaultValue = Optional.of(DefaultValue - .currentTimeStampDefaultValueWithPrecision(Long.valueOf(ctx.precision.getText()))); + .currentTimeStampDefaultValueWithPrecision( + Long.valueOf(ctx.defaultValuePrecision.getText()))); } } } + if (ctx.UPDATE() != null) { + if (ctx.onUpdateValuePrecision == null) { + onUpdateDefaultValue = Optional.of(DefaultValue.CURRENT_TIMESTAMP_DEFAULT_VALUE); + } else { + onUpdateDefaultValue = Optional.of(DefaultValue + .currentTimeStampDefaultValueWithPrecision( + Long.valueOf(ctx.onUpdateValuePrecision.getText()))); + } + } AggregateType aggType = null; if (aggTypeString != null) { try { @@ -2182,7 +2193,8 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor { } } String comment = ctx.comment != null ? ctx.comment.getText() : ""; - return new ColumnDefinition(colName, colType, isKey, aggType, !isNotNull, defaultValue, comment); + return new ColumnDefinition(colName, colType, isKey, aggType, !isNotNull, defaultValue, + onUpdateDefaultValue, comment); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java index f3c27f6c9b..79504786cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindSink.java @@ -76,40 +76,70 @@ public class BindSink implements AnalysisRuleFactory { Pair pair = bind(ctx.cascadesContext, sink); Database database = pair.first; OlapTable table = pair.second; + boolean isPartialUpdate = sink.isPartialUpdate(); LogicalPlan child = ((LogicalPlan) sink.child()); - boolean isNeedSequenceCol = child.getOutput().stream() + boolean childHasSeqCol = child.getOutput().stream() .anyMatch(slot -> slot.getName().equals(Column.SEQUENCE_COL)); - - if (sink.getColNames().isEmpty() && sink.isFromNativeInsertStmt() - && sink.isPartialUpdate()) { - throw new AnalysisException("You must explicitly specify the columns to be updated when " - + "updating partial columns using the INSERT statement."); - } + boolean needExtraSeqCol = isPartialUpdate && !childHasSeqCol && table.hasSequenceCol() + && table.getSequenceMapCol() != null + && sink.getColNames().contains(table.getSequenceMapCol()); + Pair, Integer> bindColumnsResult = + bindTargetColumns(table, sink.getColNames(), childHasSeqCol, needExtraSeqCol); + List bindColumns = bindColumnsResult.first; + int extraColumnsNum = bindColumnsResult.second; LogicalOlapTableSink boundSink = new LogicalOlapTableSink<>( database, table, - bindTargetColumns(table, sink.getColNames(), isNeedSequenceCol), + bindColumns, bindPartitionIds(table, sink.getPartitions()), child.getOutput().stream() .map(NamedExpression.class::cast) .collect(ImmutableList.toImmutableList()), - sink.isPartialUpdate(), + isPartialUpdate, sink.isFromNativeInsertStmt(), sink.child()); + if (isPartialUpdate) { + // check the necessary conditions for partial updates + if (!table.getEnableUniqueKeyMergeOnWrite()) { + throw new AnalysisException("Partial update is only allowed in" + + "unique table with merge-on-write enabled."); + } + if (sink.getColNames().isEmpty() && sink.isFromNativeInsertStmt()) { + throw new AnalysisException("You must explicitly specify the columns to be updated when " + + "updating partial columns using the INSERT statement."); + } + for (Column col : table.getFullSchema()) { + boolean exists = false; + for (Column insertCol : boundSink.getCols()) { + if (insertCol.getName().equals(col.getName())) { + exists = true; + break; + } + } + if (col.isKey() && !exists) { + throw new AnalysisException("Partial update should include all key columns, missing: " + + col.getName()); + } + } + } + // we need to insert all the columns of the target table // although some columns are not mentions. // so we add a projects to supply the default value. - if (boundSink.getCols().size() != child.getOutput().size()) { + if (boundSink.getCols().size() != child.getOutput().size() + extraColumnsNum) { throw new AnalysisException("insert into cols should be corresponding to the query output"); } try { + // in upserts, users must specify the sequence mapping column explictly + // if the target table has sequence mapping column unless the sequence mapping + // column has the a default value of CURRENT_TIMESTAMP if (table.hasSequenceCol() && table.getSequenceMapCol() != null - && !sink.getColNames().isEmpty() && !boundSink.isPartialUpdate()) { + && !sink.getColNames().isEmpty() && !isPartialUpdate) { Column seqCol = table.getFullSchema().stream() .filter(col -> col.getName().equals(table.getSequenceMapCol())) .findFirst().get(); @@ -127,7 +157,7 @@ public class BindSink implements AnalysisRuleFactory { } Map columnToChildOutput = Maps.newHashMap(); - for (int i = 0; i < boundSink.getCols().size(); ++i) { + for (int i = 0; i < child.getOutput().size(); ++i) { columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i)); } @@ -171,11 +201,24 @@ public class BindSink implements AnalysisRuleFactory { if (columnToOutput.get(seqCol.get().getName()) != null) { columnToOutput.put(column.getName(), columnToOutput.get(seqCol.get().getName())); } - } else if (sink.isPartialUpdate()) { + } else if (isPartialUpdate) { // If the current load is a partial update, the values of unmentioned // columns will be filled in SegmentWriter. And the output of sink node // should not contain these unmentioned columns, so we just skip them. - continue; + + // But if the column has 'on update value', we should unconditionally + // update the value of the column to the current timestamp whenever there + // is an update on the row + if (column.hasOnUpdateDefaultValue()) { + Expression defualtValueExpression = FunctionBinder.INSTANCE.rewrite( + new NereidsParser().parseExpression( + column.getOnUpdateDefaultValueExpr().toSqlWithoutTbl()), + new ExpressionRewriteContext(ctx.cascadesContext)); + columnToOutput.put(column.getName(), + new Alias(defualtValueExpression, column.getName())); + } else { + continue; + } } else if (column.getDefaultValue() == null) { // Otherwise, the unmentioned columns should be filled with default values // or null values @@ -291,20 +334,37 @@ public class BindSink implements AnalysisRuleFactory { }).collect(Collectors.toList()); } - private List bindTargetColumns(OlapTable table, List colsName, boolean isNeedSequenceCol) { + private Pair, Integer> bindTargetColumns(OlapTable table, List colsName, + boolean childHasSeqCol, boolean needExtraSeqCol) { // if the table set sequence column in stream load phase, the sequence map column is null, we query it. - return colsName.isEmpty() - ? table.getBaseSchema(true).stream() - .filter(c -> validColumn(c, isNeedSequenceCol)) - .collect(ImmutableList.toImmutableList()) - : colsName.stream().map(cn -> { - Column column = table.getColumn(cn); - if (column == null) { - throw new AnalysisException(String.format("column %s is not found in table %s", - cn, table.getName())); + if (colsName.isEmpty()) { + return Pair.of(table.getBaseSchema(true).stream() + .filter(c -> validColumn(c, childHasSeqCol)) + .collect(ImmutableList.toImmutableList()), 0); + } else { + int extraColumnsNum = (needExtraSeqCol ? 1 : 0); + List processedColsName = Lists.newArrayList(colsName); + for (Column col : table.getFullSchema()) { + if (col.hasOnUpdateDefaultValue()) { + Optional colName = colsName.stream().filter(c -> c.equals(col.getName())).findFirst(); + if (!colName.isPresent()) { + ++extraColumnsNum; + processedColsName.add(col.getName()); } - return column; - }).collect(ImmutableList.toImmutableList()); + } + } + if (!processedColsName.contains(Column.SEQUENCE_COL) && (childHasSeqCol || needExtraSeqCol)) { + processedColsName.add(Column.SEQUENCE_COL); + } + return Pair.of(processedColsName.stream().map(cn -> { + Column column = table.getColumn(cn); + if (column == null) { + throw new AnalysisException(String.format("column %s is not found in table %s", + cn, table.getName())); + } + return column; + }).collect(ImmutableList.toImmutableList()), extraColumnsNum); + } } private boolean isSourceAndTargetStringLikeType(DataType input, DataType target) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java index 742b2ea801..ece44cfcd9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateCommand.java @@ -25,6 +25,7 @@ import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.nereids.analyzer.UnboundOlapTableSink; import org.apache.doris.nereids.analyzer.UnboundSlot; +import org.apache.doris.nereids.parser.NereidsParser; import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.EqualTo; import org.apache.doris.nereids.trees.expressions.Expression; @@ -115,7 +116,14 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab ? ((NamedExpression) expr) : new Alias(expr)); } else { - selectItems.add(new UnboundSlot(tableName, column.getName())); + if (column.hasOnUpdateDefaultValue()) { + Expression defualtValueExpression = + new NereidsParser().parseExpression(column.getOnUpdateDefaultValueExpr() + .toSqlWithoutTbl()); + selectItems.add(new Alias(defualtValueExpression, column.getName())); + } else { + selectItems.add(new UnboundSlot(tableName, column.getName())); + } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java index fba382e914..8b2a8df338 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ColumnDefinition.java @@ -51,6 +51,7 @@ public class ColumnDefinition { private AggregateType aggType; private boolean isNullable; private Optional defaultValue; + private Optional onUpdateDefaultValue = Optional.empty(); private final String comment; private final boolean isVisible; private boolean aggTypeImplicit = false; @@ -60,6 +61,11 @@ public class ColumnDefinition { this(name, type, isKey, aggType, isNullable, defaultValue, comment, true); } + public ColumnDefinition(String name, DataType type, boolean isKey, AggregateType aggType, boolean isNullable, + Optional defaultValue, Optional onUpdateDefaultValue, String comment) { + this(name, type, isKey, aggType, isNullable, defaultValue, onUpdateDefaultValue, comment, true); + } + /** * constructor */ @@ -75,6 +81,23 @@ public class ColumnDefinition { this.isVisible = isVisible; } + /** + * constructor + */ + public ColumnDefinition(String name, DataType type, boolean isKey, AggregateType aggType, boolean isNullable, + Optional defaultValue, Optional onUpdateDefaultValue, String comment, + boolean isVisible) { + this.name = name; + this.type = type; + this.isKey = isKey; + this.aggType = aggType; + this.isNullable = isNullable; + this.defaultValue = defaultValue; + this.onUpdateDefaultValue = onUpdateDefaultValue; + this.comment = comment; + this.isVisible = isVisible; + } + public ColumnDefinition(String name, DataType type, boolean isNullable) { this(name, type, false, null, isNullable, Optional.empty(), ""); } @@ -198,6 +221,34 @@ public class ColumnDefinition { throw new AnalysisException(e.getMessage(), e); } } + if (onUpdateDefaultValue.isPresent() + && onUpdateDefaultValue.get().getValue() != null + && type.toCatalogDataType().isScalarType()) { + try { + ColumnDef.validateDefaultValue(type.toCatalogDataType(), + onUpdateDefaultValue.get().getValue(), onUpdateDefaultValue.get().getDefaultValueExprDef()); + } catch (Exception e) { + throw new AnalysisException("meet error when validating the on update value of column[" + + name + "], reason: " + e.getMessage()); + } + if (onUpdateDefaultValue.get().isCurrentTimeStamp()) { + if (!defaultValue.isPresent() || !defaultValue.get().isCurrentTimeStamp()) { + throw new AnalysisException("You must set the default value of the column[" + + name + "] to CURRENT_TIMESTAMP when using 'ON UPDATE CURRENT_TIMESTAMP'."); + } + } else if (onUpdateDefaultValue.get().isCurrentTimeStampWithPrecision()) { + if (!defaultValue.isPresent() || !defaultValue.get().isCurrentTimeStampWithPrecision()) { + throw new AnalysisException("You must set the default value of the column[" + + name + "] to CURRENT_TIMESTAMP when using 'ON UPDATE CURRENT_TIMESTAMP'."); + } + long precision1 = onUpdateDefaultValue.get().getCurrentTimeStampPrecision(); + long precision2 = defaultValue.get().getCurrentTimeStampPrecision(); + if (precision1 != precision2) { + throw new AnalysisException("The precision of the default value of column[" + + name + "] should be the same with the precision in 'ON UPDATE CURRENT_TIMESTAMP'."); + } + } + } } /** @@ -231,7 +282,8 @@ public class ColumnDefinition { Column column = new Column(name, type.toCatalogDataType(), isKey, aggType, isNullable, false, defaultValue.map(DefaultValue::getRawValue).orElse(null), comment, isVisible, defaultValue.map(DefaultValue::getDefaultValueExprDef).orElse(null), Column.COLUMN_UNIQUE_ID_INIT_VALUE, - defaultValue.map(DefaultValue::getValue).orElse(null)); + defaultValue.map(DefaultValue::getValue).orElse(null), onUpdateDefaultValue.isPresent(), + onUpdateDefaultValue.map(DefaultValue::getDefaultValueExprDef).orElse(null)); column.setAggregationTypeImplicit(aggTypeImplicit); return column; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index ce8b08086a..64e872125d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -150,6 +150,9 @@ public class StreamLoadPlanner { if (isPartialUpdate) { for (Column col : destTable.getFullSchema()) { boolean existInExpr = false; + if (col.hasOnUpdateDefaultValue()) { + partialUpdateInputColumns.add(col.getName()); + } for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) { if (importColumnDesc.getColumnName() != null && importColumnDesc.getColumnName().equals(col.getName())) { diff --git a/regression-test/data/nereids_p0/insert_into_table/update_on_current_timestamp.out b/regression-test/data/nereids_p0/insert_into_table/update_on_current_timestamp.out new file mode 100644 index 0000000000..0b600e32cd --- /dev/null +++ b/regression-test/data/nereids_p0/insert_into_table/update_on_current_timestamp.out @@ -0,0 +1,62 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 doris 1000 123 1 +2 doris2 2000 223 1 + +-- !1 -- +1 + +-- !1 -- +1 + +-- !2 -- +1 doris 1999 123 1 +2 doris2 2999 223 1 +3 unknown 3999 \N 4321 +4 unknown 4999 \N 4321 + +-- !2 -- +1 + +-- !2 -- +1 + +-- !3 -- +1 doris 1999 123 1 +2 doris2 2999 223 1 +3 unknown 3999 \N 4321 +4 unknown 4999 \N 4321 + +-- !3 -- +1 + +-- !3 -- +3 + +-- !3 -- +2 + +-- !4 -- +1 doris 3998 123 1 +2 doris2 5998 223 1 +3 unknown 3999 \N 4321 +4 unknown 4999 \N 4321 + +-- !4 -- +2 + +-- !4 -- +2 + +-- !5 -- +1 doris 3998 123 1 +2 doris2 5998 223 1 +3 unknown 3999 \N 4321 +4 unknown 4999 \N 4321 + +-- !5 -- +3 + +-- !5 -- +3 + diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_update_on_current_timestamp.out b/regression-test/data/unique_with_mow_p0/partial_update/test_update_on_current_timestamp.out new file mode 100644 index 0000000000..3b28009ccd --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_update_on_current_timestamp.out @@ -0,0 +1,29 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 doris 1000 123 1 +2 doris2 2000 223 1 + +-- !sql -- +1 + +-- !sql -- +1 doris 1999 123 1 +2 doris2 2999 223 1 +3 unknown 3999 \N 4321 +4 unknown 4999 \N 4321 + +-- !sql -- +1 + +-- !sql -- +1 doris 1999 123 1 +2 doris2 2999 223 1 +3 unknown 3999 \N 4321 +4 unknown 4999 \N 4321 + +-- !sql -- +1 + +-- !sql -- +3 + diff --git a/regression-test/data/unique_with_mow_p0/partial_update/update_on_current_timestamp1.csv b/regression-test/data/unique_with_mow_p0/partial_update/update_on_current_timestamp1.csv new file mode 100644 index 0000000000..01febb58f2 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/update_on_current_timestamp1.csv @@ -0,0 +1,2 @@ +2, doris2, 2000, 223, 1 +1, doris, 1000, 123, 1 diff --git a/regression-test/data/unique_with_mow_p0/partial_update/update_on_current_timestamp2.csv b/regression-test/data/unique_with_mow_p0/partial_update/update_on_current_timestamp2.csv new file mode 100644 index 0000000000..d8bdb17241 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/update_on_current_timestamp2.csv @@ -0,0 +1,4 @@ +2, 2999 +1, 1999 +3, 3999 +4, 4999 diff --git a/regression-test/data/unique_with_mow_p0/partial_update/update_on_current_timestamp3.csv b/regression-test/data/unique_with_mow_p0/partial_update/update_on_current_timestamp3.csv new file mode 100644 index 0000000000..4cb6c42402 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/update_on_current_timestamp3.csv @@ -0,0 +1,2 @@ +1,2000-01-01 00:00:01 +2,2000-01-02 00:00:01 diff --git a/regression-test/suites/nereids_p0/insert_into_table/update_on_current_timestamp.groovy b/regression-test/suites/nereids_p0/insert_into_table/update_on_current_timestamp.groovy new file mode 100644 index 0000000000..35a6833883 --- /dev/null +++ b/regression-test/suites/nereids_p0/insert_into_table/update_on_current_timestamp.groovy @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("nereids_update_on_current_timestamp") { + sql 'set experimental_enable_nereids_planner=true' + sql 'set enable_fallback_to_original_planner=false' + sql 'set enable_nereids_dml=true' + sql "sync;" + + + def t1 = "nereids_update_on_current_timestamp1" + sql """ DROP TABLE IF EXISTS ${t1};""" + sql """ CREATE TABLE ${t1} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) DEFAULT "unknown" COMMENT "用户姓名", + `score` int(11) NOT NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321", + `update_time` datetime default current_timestamp on update current_timestamp, + `update_time2` datetime(6) default current_timestamp(5) on update current_timestamp(5)) + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true" + );""" + + def res = sql "show create table ${t1};" + assertTrue(res.toString().containsIgnoreCase("`update_time` datetime NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP")) + assertTrue(res.toString().containsIgnoreCase("`update_time2` datetime(6) NULL DEFAULT CURRENT_TIMESTAMP(5) ON UPDATE CURRENT_TIMESTAMP(5)")) + + // set enable_unique_key_partial_update=false, it's a row update + sql "set enable_unique_key_partial_update=false;" + sql "sync;" + sql """ insert into ${t1}(id,name,score,test,dft) values + (2, "doris2", 2000, 223, 1), + (1, "doris", 1000, 123, 1);""" + qt_sql "select id,name,score,test,dft from ${t1} order by id;" + // rows with id=1 or id=2 will have the same value 't1' in `update_time` and `update_time2` + qt_1 "select count(distinct update_time) from ${t1} where update_time > '2023-10-01 00:00:00';" // 1 + qt_1 "select count(distinct update_time2) from ${t1} where update_time2 > '2023-10-01 00:00:00';" // 1 + sleep(2000) + + + // set enable_unique_key_partial_update=true, it's a partial update + // don't specify the `update_time` column + // it will be automatically updated to current_timestamp() + sql "set enable_unique_key_partial_update=true;" + sql "set enable_insert_strict=false;" + sql "sync;" + sql """ insert into ${t1}(id, score) values + (2, 2999), + (1, 1999), + (3, 3999), + (4, 4999);""" + qt_2 "select id,name,score,test,dft from ${t1} order by id;" + // the existing rows(id=1,2) and newly inserted rows(id=3,4) are updated at the same time + // so they will have the same value 't1 + 2000ms' in `update_time` and `update_time2` + qt_2 "select count(distinct update_time) from ${t1} where update_time > '2023-10-01 00:00:00';" // 1 + qt_2 "select count(distinct update_time2) from ${t1} where update_time2 > '2023-10-01 00:00:00';" // 1 + sleep(2000) + + // when user specify that column, it will be filled with the input value + sql """ insert into ${t1}(id, update_time) values + (1, "2000-01-01 00:00:01"), + (2, "2000-01-02 00:00:01");""" + qt_3 "select id,name,score,test,dft from ${t1} order by id;" + // rows with id=1,2 are updated, the value of `update_time2` will be updated to `t1 + 4000ms` + qt_3 "select count(distinct update_time) from ${t1} where update_time > '2023-10-01 00:00:00';" // 1 + qt_3 "select count(distinct update_time) from ${t1};" // 3 := (1)(2)(3,4) + qt_3 "select count(distinct update_time2) from ${t1} where update_time2 > '2023-10-01 00:00:00';" // 2 := (1,2)(3,4) + sleep(2000) + + // test update statement + sql """ update ${t1} set score = score * 2 where id < 3;""" + // rows with id=1,2 are updated, the value of `update_time`, `update_time2` will be updated to `t1 + 6000ms` + qt_4 "select id,name,score,test,dft from ${t1} order by id;" + qt_4 "select count(distinct update_time) from ${t1} where update_time > '2023-10-01 00:00:00';" // 2 := (1,2)(3,4) + qt_4 "select count(distinct update_time2) from ${t1} where update_time2 > '2023-10-01 00:00:00';" // 2 := (1,2)(3,4) + sleep(2000) + + sql """ update ${t1} set update_time = "2023-10-02 00:00:00" where id > 3;""" + // rows with id=4 are updated, the value of `update_time`, `update_time2` will be updated to `t1 + 8000ms` + qt_5 "select id,name,score,test,dft from ${t1} order by id;" + qt_5 "select count(distinct update_time) from ${t1} where update_time > '2023-10-01 00:00:00';" // 3 := (1,2)(3)(4) + qt_5 "select count(distinct update_time2) from ${t1} where update_time2 > '2023-10-01 00:00:00';" // 3 := (1,2)(3)(4) + + // illegal case 1: the default value is not current_timestamp + def illegal_t1 = "nereids_update_on_current_timestamp_illegal_1" + test { + sql """ DROP TABLE IF EXISTS ${illegal_t1} """ + sql """ CREATE TABLE ${illegal_t1} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) DEFAULT "unknown" COMMENT "用户姓名", + `score` int(11) NOT NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321", + `update_time` datetime default "2020-01-01 00:00:00" on update current_timestamp) + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true");""" + exception "You must set the default value of the column[update_time] to CURRENT_TIMESTAMP when using 'ON UPDATE CURRENT_TIMESTAMP'." + } + + // illegal case 2: the default value is not set + def illegal_t2 = "nereids_update_on_current_timestamp_illegal_2" + test { + sql """ DROP TABLE IF EXISTS ${illegal_t2} """ + sql """ CREATE TABLE ${illegal_t2} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) DEFAULT "unknown" COMMENT "用户姓名", + `score` int(11) NOT NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321", + `update_time` datetime on update current_timestamp) + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true");""" + exception "You must set the default value of the column[update_time] to CURRENT_TIMESTAMP when using 'ON UPDATE CURRENT_TIMESTAMP'." + } + + // illegal case 3: the precision of the default value is not the same + def illegal_t3 = "nereids_update_on_current_timestamp_illegal_3" + test { + sql """ DROP TABLE IF EXISTS ${illegal_t3} """ + sql """ CREATE TABLE ${illegal_t3} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) DEFAULT "unknown" COMMENT "用户姓名", + `score` int(11) NOT NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321", + `update_time` datetime(6) default current_timestamp(4) on update current_timestamp(3)) + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true");""" + exception "The precision of the default value of column[update_time] should be the same with the precision in 'ON UPDATE CURRENT_TIMESTAMP'." + } + + // illegal case 4: use 'update on current_timestamp' on incorrect table models + def illegal_t4 = "nereids_update_on_current_timestamp_illegal_4" + test { + sql """ DROP TABLE IF EXISTS ${illegal_t4} """ + sql """ CREATE TABLE ${illegal_t4} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) DEFAULT "unknown" COMMENT "用户姓名", + `score` int(11) NOT NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321", + `update_time` datetime(6) default current_timestamp(3) on update current_timestamp(3)) + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "false");""" + exception "'ON UPDATE CURRENT_TIMESTAMP' is only supportted in unique table with merge-on-write enabled." + } + + test { + sql """ DROP TABLE IF EXISTS ${illegal_t4} """ + sql """ CREATE TABLE ${illegal_t4} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) DEFAULT "unknown" COMMENT "用户姓名", + `score` int(11) NOT NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321", + `update_time` datetime default current_timestamp on update current_timestamp) + DUPLICATE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES("replication_num" = "1");""" + exception "'ON UPDATE CURRENT_TIMESTAMP' is only supportted in unique table with merge-on-write enabled." + } + + test { + sql """ DROP TABLE IF EXISTS ${illegal_t4} """ + sql """ CREATE TABLE IF NOT EXISTS ${illegal_t4} ( + k int, + `update_time` datetime(6) default current_timestamp(4) on update current_timestamp(3)) replace, + ) AGGREGATE KEY(k) DISTRIBUTED BY HASH(k) BUCKETS 1 properties("replication_num" = "1");""" + exception "Syntax error" + } +} diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_update_on_current_timestamp.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_update_on_current_timestamp.groovy new file mode 100644 index 0000000000..29f647fb7a --- /dev/null +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_update_on_current_timestamp.groovy @@ -0,0 +1,88 @@ + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mow_update_on_current_timestamp", "p0") { + sql 'set experimental_enable_nereids_planner=true' + sql 'set enable_fallback_to_original_planner=false' + sql 'set enable_nereids_dml=true' + sql "sync;" + + + def t1 = "test_mow_update_on_current_timestamp1" + sql """ DROP TABLE IF EXISTS ${t1};""" + sql """ CREATE TABLE ${t1} ( + `id` int(11) NOT NULL COMMENT "用户 ID", + `name` varchar(65533) DEFAULT "unknown" COMMENT "用户姓名", + `score` int(11) NOT NULL COMMENT "用户得分", + `test` int(11) NULL COMMENT "null test", + `dft` int(11) DEFAULT "4321", + `update_time` datetime default current_timestamp on update current_timestamp) + UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true" + );""" + + // don't set partial_columns header, it's a row update + streamLoad { + table "${t1}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'columns', 'id,name,score,test,dft' + + file 'update_on_current_timestamp1.csv' + time 10000 // limit inflight 10s + } + qt_sql "select id,name,score,test,dft from ${t1} order by id;" + qt_sql "select count(distinct update_time) from ${t1} where update_time > '2023-10-01 00:00:00';" + + + // set partial columns header, it's a partial update + // don't specify the `update_time` column + // it will be automatically updated to current_timestamp() + streamLoad { + table "${t1}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,score' + + file 'update_on_current_timestamp2.csv' + time 10000 // limit inflight 10s + } + qt_sql "select id,name,score,test,dft from ${t1} order by id;" + qt_sql "select count(distinct update_time) from ${t1} where update_time > '2023-10-01 00:00:00';" + + // when user specify that column, it will be filled with the input value + streamLoad { + table "${t1}" + + set 'column_separator', ',' + set 'format', 'csv' + set 'partial_columns', 'true' + set 'columns', 'id,update_time' + + file 'update_on_current_timestamp3.csv' + time 10000 // limit inflight 10s + } + qt_sql "select id,name,score,test,dft from ${t1} order by id;" + qt_sql "select count(distinct update_time) from ${t1} where update_time > '2023-10-01 00:00:00';" + qt_sql "select count(distinct update_time) from ${t1};" +}