[Feature](update) Support update on current_timestamp (#25884)

This commit is contained in:
bobhan1
2023-11-23 16:23:31 +08:00
committed by GitHub
parent 5d9c555dcf
commit 4b22fc14d5
18 changed files with 605 additions and 59 deletions

View File

@ -130,6 +130,12 @@ public class Column implements Writable, GsonPostProcessable {
private boolean isCompoundKey = false;
@SerializedName(value = "hasOnUpdateDefaultValue")
private boolean hasOnUpdateDefaultValue = false;
@SerializedName(value = "onUpdateDefaultValueExprDef")
private DefaultValueExprDef onUpdateDefaultValueExprDef;
public Column() {
this.name = "";
this.type = Type.NULL;
@ -170,24 +176,33 @@ public class Column implements Writable, GsonPostProcessable {
public Column(String name, Type type, boolean isKey, AggregateType aggregateType, boolean isAllowNull,
String defaultValue, String comment) {
this(name, type, isKey, aggregateType, isAllowNull, false, defaultValue, comment, true, null,
COLUMN_UNIQUE_ID_INIT_VALUE, defaultValue);
COLUMN_UNIQUE_ID_INIT_VALUE, defaultValue, false, null);
}
public Column(String name, Type type, boolean isKey, AggregateType aggregateType, boolean isAllowNull,
String comment, boolean visible, int colUniqueId) {
this(name, type, isKey, aggregateType, isAllowNull, false, null, comment, visible, null, colUniqueId, null);
this(name, type, isKey, aggregateType, isAllowNull, false, null, comment, visible, null, colUniqueId, null,
false, null);
}
public Column(String name, Type type, boolean isKey, AggregateType aggregateType, boolean isAllowNull,
String defaultValue, String comment, boolean visible, DefaultValueExprDef defaultValueExprDef,
int colUniqueId, String realDefaultValue) {
this(name, type, isKey, aggregateType, isAllowNull, false, defaultValue, comment, visible, defaultValueExprDef,
colUniqueId, realDefaultValue);
colUniqueId, realDefaultValue, false, null);
}
public Column(String name, Type type, boolean isKey, AggregateType aggregateType, boolean isAllowNull,
boolean isAutoInc, String defaultValue, String comment, boolean visible,
DefaultValueExprDef defaultValueExprDef, int colUniqueId, String realDefaultValue) {
this(name, type, isKey, aggregateType, isAllowNull, isAutoInc, defaultValue, comment, visible,
defaultValueExprDef, colUniqueId, realDefaultValue, false, null);
}
public Column(String name, Type type, boolean isKey, AggregateType aggregateType, boolean isAllowNull,
boolean isAutoInc, String defaultValue, String comment, boolean visible,
DefaultValueExprDef defaultValueExprDef, int colUniqueId, String realDefaultValue,
boolean hasOnUpdateDefaultValue, DefaultValueExprDef onUpdateDefaultValueExprDef) {
this.name = name;
if (this.name == null) {
this.name = "";
@ -212,6 +227,8 @@ public class Column implements Writable, GsonPostProcessable {
this.children = new ArrayList<>();
createChildrenColumn(this.type, this);
this.uniqueId = colUniqueId;
this.hasOnUpdateDefaultValue = hasOnUpdateDefaultValue;
this.onUpdateDefaultValueExprDef = onUpdateDefaultValueExprDef;
if (type.isAggStateType()) {
AggStateType aggState = (AggStateType) type;
@ -244,6 +261,8 @@ public class Column implements Writable, GsonPostProcessable {
this.uniqueId = column.getUniqueId();
this.defineExpr = column.getDefineExpr();
this.defineName = column.getDefineName();
this.hasOnUpdateDefaultValue = column.hasOnUpdateDefaultValue;
this.onUpdateDefaultValueExprDef = column.onUpdateDefaultValueExprDef;
}
public void createChildrenColumn(Type type, Column column) {
@ -489,6 +508,14 @@ public class Column implements Writable, GsonPostProcessable {
}
}
public boolean hasOnUpdateDefaultValue() {
return hasOnUpdateDefaultValue;
}
public Expr getOnUpdateDefaultValueExpr() {
return onUpdateDefaultValueExprDef.getExpr(type);
}
public TColumn toThrift() {
TColumn tColumn = new TColumn();
tColumn.setColumnName(removeNamePrefix(this.name));
@ -766,6 +793,9 @@ public class Column implements Writable, GsonPostProcessable {
sb.append(" DEFAULT \"").append(defaultValue).append("\"");
}
}
if (hasOnUpdateDefaultValue) {
sb.append(" ON UPDATE ").append(defaultValue).append("");
}
if (StringUtils.isNotBlank(comment)) {
sb.append(" COMMENT '").append(getComment(true)).append("'");
}

View File

@ -2166,6 +2166,16 @@ public class InternalCatalog implements CatalogIf<Database> {
}
olapTable.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction);
// check `update on current_timestamp`
if (!enableUniqueKeyMergeOnWrite) {
for (Column column : baseSchema) {
if (column.hasOnUpdateDefaultValue()) {
throw new DdlException("'ON UPDATE CURRENT_TIMESTAMP' is only supportted"
+ " in unique table with merge-on-write enabled.");
}
}
}
// analyze bloom filter columns
Set<String> bfColumns = null;
double bfFpp = 0;

View File

@ -379,31 +379,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
HashSet<String> partialUpdateCols = new HashSet<>();
boolean isPartialUpdate = olapTableSink.isPartialUpdate();
if (isPartialUpdate) {
OlapTable olapTable = olapTableSink.getTargetTable();
if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("Partial update is only allowed in"
+ "unique table with merge-on-write enabled.");
}
for (Column col : olapTable.getFullSchema()) {
boolean exists = false;
for (Column insertCol : olapTableSink.getCols()) {
if (insertCol.getName() != null && insertCol.getName().equals(col.getName())) {
exists = true;
break;
}
}
if (col.isKey() && !exists) {
throw new AnalysisException("Partial update should include all key columns, missing: "
+ col.getName());
}
}
for (Column col : olapTableSink.getCols()) {
partialUpdateCols.add(col.getName());
}
if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() != null
&& partialUpdateCols.contains(olapTable.getSequenceMapCol())) {
partialUpdateCols.add(Column.SEQUENCE_COL);
}
}
TupleDescriptor olapTuple = context.generateTupleDesc();
List<Column> targetTableColumns = olapTableSink.getTargetTable().getFullSchema();

View File

@ -2156,6 +2156,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
boolean isNotNull = ctx.NOT() != null;
String aggTypeString = ctx.aggType != null ? ctx.aggType.getText() : null;
Optional<DefaultValue> defaultValue = Optional.empty();
Optional<DefaultValue> onUpdateDefaultValue = Optional.empty();
if (ctx.DEFAULT() != null) {
if (ctx.INTEGER_VALUE() != null) {
defaultValue = Optional.of(new DefaultValue(ctx.INTEGER_VALUE().getText()));
@ -2164,14 +2165,24 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
} else if (ctx.nullValue != null) {
defaultValue = Optional.of(DefaultValue.NULL_DEFAULT_VALUE);
} else if (ctx.CURRENT_TIMESTAMP() != null) {
if (ctx.precision == null) {
if (ctx.defaultValuePrecision == null) {
defaultValue = Optional.of(DefaultValue.CURRENT_TIMESTAMP_DEFAULT_VALUE);
} else {
defaultValue = Optional.of(DefaultValue
.currentTimeStampDefaultValueWithPrecision(Long.valueOf(ctx.precision.getText())));
.currentTimeStampDefaultValueWithPrecision(
Long.valueOf(ctx.defaultValuePrecision.getText())));
}
}
}
if (ctx.UPDATE() != null) {
if (ctx.onUpdateValuePrecision == null) {
onUpdateDefaultValue = Optional.of(DefaultValue.CURRENT_TIMESTAMP_DEFAULT_VALUE);
} else {
onUpdateDefaultValue = Optional.of(DefaultValue
.currentTimeStampDefaultValueWithPrecision(
Long.valueOf(ctx.onUpdateValuePrecision.getText())));
}
}
AggregateType aggType = null;
if (aggTypeString != null) {
try {
@ -2182,7 +2193,8 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
}
}
String comment = ctx.comment != null ? ctx.comment.getText() : "";
return new ColumnDefinition(colName, colType, isKey, aggType, !isNotNull, defaultValue, comment);
return new ColumnDefinition(colName, colType, isKey, aggType, !isNotNull, defaultValue,
onUpdateDefaultValue, comment);
}
@Override

View File

@ -76,40 +76,70 @@ public class BindSink implements AnalysisRuleFactory {
Pair<Database, OlapTable> pair = bind(ctx.cascadesContext, sink);
Database database = pair.first;
OlapTable table = pair.second;
boolean isPartialUpdate = sink.isPartialUpdate();
LogicalPlan child = ((LogicalPlan) sink.child());
boolean isNeedSequenceCol = child.getOutput().stream()
boolean childHasSeqCol = child.getOutput().stream()
.anyMatch(slot -> slot.getName().equals(Column.SEQUENCE_COL));
if (sink.getColNames().isEmpty() && sink.isFromNativeInsertStmt()
&& sink.isPartialUpdate()) {
throw new AnalysisException("You must explicitly specify the columns to be updated when "
+ "updating partial columns using the INSERT statement.");
}
boolean needExtraSeqCol = isPartialUpdate && !childHasSeqCol && table.hasSequenceCol()
&& table.getSequenceMapCol() != null
&& sink.getColNames().contains(table.getSequenceMapCol());
Pair<List<Column>, Integer> bindColumnsResult =
bindTargetColumns(table, sink.getColNames(), childHasSeqCol, needExtraSeqCol);
List<Column> bindColumns = bindColumnsResult.first;
int extraColumnsNum = bindColumnsResult.second;
LogicalOlapTableSink<?> boundSink = new LogicalOlapTableSink<>(
database,
table,
bindTargetColumns(table, sink.getColNames(), isNeedSequenceCol),
bindColumns,
bindPartitionIds(table, sink.getPartitions()),
child.getOutput().stream()
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList()),
sink.isPartialUpdate(),
isPartialUpdate,
sink.isFromNativeInsertStmt(),
sink.child());
if (isPartialUpdate) {
// check the necessary conditions for partial updates
if (!table.getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("Partial update is only allowed in"
+ "unique table with merge-on-write enabled.");
}
if (sink.getColNames().isEmpty() && sink.isFromNativeInsertStmt()) {
throw new AnalysisException("You must explicitly specify the columns to be updated when "
+ "updating partial columns using the INSERT statement.");
}
for (Column col : table.getFullSchema()) {
boolean exists = false;
for (Column insertCol : boundSink.getCols()) {
if (insertCol.getName().equals(col.getName())) {
exists = true;
break;
}
}
if (col.isKey() && !exists) {
throw new AnalysisException("Partial update should include all key columns, missing: "
+ col.getName());
}
}
}
// we need to insert all the columns of the target table
// although some columns are not mentions.
// so we add a projects to supply the default value.
if (boundSink.getCols().size() != child.getOutput().size()) {
if (boundSink.getCols().size() != child.getOutput().size() + extraColumnsNum) {
throw new AnalysisException("insert into cols should be corresponding to the query output");
}
try {
// in upserts, users must specify the sequence mapping column explictly
// if the target table has sequence mapping column unless the sequence mapping
// column has the a default value of CURRENT_TIMESTAMP
if (table.hasSequenceCol() && table.getSequenceMapCol() != null
&& !sink.getColNames().isEmpty() && !boundSink.isPartialUpdate()) {
&& !sink.getColNames().isEmpty() && !isPartialUpdate) {
Column seqCol = table.getFullSchema().stream()
.filter(col -> col.getName().equals(table.getSequenceMapCol()))
.findFirst().get();
@ -127,7 +157,7 @@ public class BindSink implements AnalysisRuleFactory {
}
Map<Column, NamedExpression> columnToChildOutput = Maps.newHashMap();
for (int i = 0; i < boundSink.getCols().size(); ++i) {
for (int i = 0; i < child.getOutput().size(); ++i) {
columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i));
}
@ -171,11 +201,24 @@ public class BindSink implements AnalysisRuleFactory {
if (columnToOutput.get(seqCol.get().getName()) != null) {
columnToOutput.put(column.getName(), columnToOutput.get(seqCol.get().getName()));
}
} else if (sink.isPartialUpdate()) {
} else if (isPartialUpdate) {
// If the current load is a partial update, the values of unmentioned
// columns will be filled in SegmentWriter. And the output of sink node
// should not contain these unmentioned columns, so we just skip them.
continue;
// But if the column has 'on update value', we should unconditionally
// update the value of the column to the current timestamp whenever there
// is an update on the row
if (column.hasOnUpdateDefaultValue()) {
Expression defualtValueExpression = FunctionBinder.INSTANCE.rewrite(
new NereidsParser().parseExpression(
column.getOnUpdateDefaultValueExpr().toSqlWithoutTbl()),
new ExpressionRewriteContext(ctx.cascadesContext));
columnToOutput.put(column.getName(),
new Alias(defualtValueExpression, column.getName()));
} else {
continue;
}
} else if (column.getDefaultValue() == null) {
// Otherwise, the unmentioned columns should be filled with default values
// or null values
@ -291,20 +334,37 @@ public class BindSink implements AnalysisRuleFactory {
}).collect(Collectors.toList());
}
private List<Column> bindTargetColumns(OlapTable table, List<String> colsName, boolean isNeedSequenceCol) {
private Pair<List<Column>, Integer> bindTargetColumns(OlapTable table, List<String> colsName,
boolean childHasSeqCol, boolean needExtraSeqCol) {
// if the table set sequence column in stream load phase, the sequence map column is null, we query it.
return colsName.isEmpty()
? table.getBaseSchema(true).stream()
.filter(c -> validColumn(c, isNeedSequenceCol))
.collect(ImmutableList.toImmutableList())
: colsName.stream().map(cn -> {
Column column = table.getColumn(cn);
if (column == null) {
throw new AnalysisException(String.format("column %s is not found in table %s",
cn, table.getName()));
if (colsName.isEmpty()) {
return Pair.of(table.getBaseSchema(true).stream()
.filter(c -> validColumn(c, childHasSeqCol))
.collect(ImmutableList.toImmutableList()), 0);
} else {
int extraColumnsNum = (needExtraSeqCol ? 1 : 0);
List<String> processedColsName = Lists.newArrayList(colsName);
for (Column col : table.getFullSchema()) {
if (col.hasOnUpdateDefaultValue()) {
Optional<String> colName = colsName.stream().filter(c -> c.equals(col.getName())).findFirst();
if (!colName.isPresent()) {
++extraColumnsNum;
processedColsName.add(col.getName());
}
return column;
}).collect(ImmutableList.toImmutableList());
}
}
if (!processedColsName.contains(Column.SEQUENCE_COL) && (childHasSeqCol || needExtraSeqCol)) {
processedColsName.add(Column.SEQUENCE_COL);
}
return Pair.of(processedColsName.stream().map(cn -> {
Column column = table.getColumn(cn);
if (column == null) {
throw new AnalysisException(String.format("column %s is not found in table %s",
cn, table.getName()));
}
return column;
}).collect(ImmutableList.toImmutableList()), extraColumnsNum);
}
}
private boolean isSourceAndTargetStringLikeType(DataType input, DataType target) {

View File

@ -25,6 +25,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.nereids.analyzer.UnboundOlapTableSink;
import org.apache.doris.nereids.analyzer.UnboundSlot;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
@ -115,7 +116,14 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab
? ((NamedExpression) expr)
: new Alias(expr));
} else {
selectItems.add(new UnboundSlot(tableName, column.getName()));
if (column.hasOnUpdateDefaultValue()) {
Expression defualtValueExpression =
new NereidsParser().parseExpression(column.getOnUpdateDefaultValueExpr()
.toSqlWithoutTbl());
selectItems.add(new Alias(defualtValueExpression, column.getName()));
} else {
selectItems.add(new UnboundSlot(tableName, column.getName()));
}
}
}

View File

@ -51,6 +51,7 @@ public class ColumnDefinition {
private AggregateType aggType;
private boolean isNullable;
private Optional<DefaultValue> defaultValue;
private Optional<DefaultValue> onUpdateDefaultValue = Optional.empty();
private final String comment;
private final boolean isVisible;
private boolean aggTypeImplicit = false;
@ -60,6 +61,11 @@ public class ColumnDefinition {
this(name, type, isKey, aggType, isNullable, defaultValue, comment, true);
}
public ColumnDefinition(String name, DataType type, boolean isKey, AggregateType aggType, boolean isNullable,
Optional<DefaultValue> defaultValue, Optional<DefaultValue> onUpdateDefaultValue, String comment) {
this(name, type, isKey, aggType, isNullable, defaultValue, onUpdateDefaultValue, comment, true);
}
/**
* constructor
*/
@ -75,6 +81,23 @@ public class ColumnDefinition {
this.isVisible = isVisible;
}
/**
* constructor
*/
public ColumnDefinition(String name, DataType type, boolean isKey, AggregateType aggType, boolean isNullable,
Optional<DefaultValue> defaultValue, Optional<DefaultValue> onUpdateDefaultValue, String comment,
boolean isVisible) {
this.name = name;
this.type = type;
this.isKey = isKey;
this.aggType = aggType;
this.isNullable = isNullable;
this.defaultValue = defaultValue;
this.onUpdateDefaultValue = onUpdateDefaultValue;
this.comment = comment;
this.isVisible = isVisible;
}
public ColumnDefinition(String name, DataType type, boolean isNullable) {
this(name, type, false, null, isNullable, Optional.empty(), "");
}
@ -198,6 +221,34 @@ public class ColumnDefinition {
throw new AnalysisException(e.getMessage(), e);
}
}
if (onUpdateDefaultValue.isPresent()
&& onUpdateDefaultValue.get().getValue() != null
&& type.toCatalogDataType().isScalarType()) {
try {
ColumnDef.validateDefaultValue(type.toCatalogDataType(),
onUpdateDefaultValue.get().getValue(), onUpdateDefaultValue.get().getDefaultValueExprDef());
} catch (Exception e) {
throw new AnalysisException("meet error when validating the on update value of column["
+ name + "], reason: " + e.getMessage());
}
if (onUpdateDefaultValue.get().isCurrentTimeStamp()) {
if (!defaultValue.isPresent() || !defaultValue.get().isCurrentTimeStamp()) {
throw new AnalysisException("You must set the default value of the column["
+ name + "] to CURRENT_TIMESTAMP when using 'ON UPDATE CURRENT_TIMESTAMP'.");
}
} else if (onUpdateDefaultValue.get().isCurrentTimeStampWithPrecision()) {
if (!defaultValue.isPresent() || !defaultValue.get().isCurrentTimeStampWithPrecision()) {
throw new AnalysisException("You must set the default value of the column["
+ name + "] to CURRENT_TIMESTAMP when using 'ON UPDATE CURRENT_TIMESTAMP'.");
}
long precision1 = onUpdateDefaultValue.get().getCurrentTimeStampPrecision();
long precision2 = defaultValue.get().getCurrentTimeStampPrecision();
if (precision1 != precision2) {
throw new AnalysisException("The precision of the default value of column["
+ name + "] should be the same with the precision in 'ON UPDATE CURRENT_TIMESTAMP'.");
}
}
}
}
/**
@ -231,7 +282,8 @@ public class ColumnDefinition {
Column column = new Column(name, type.toCatalogDataType(), isKey, aggType, isNullable,
false, defaultValue.map(DefaultValue::getRawValue).orElse(null), comment, isVisible,
defaultValue.map(DefaultValue::getDefaultValueExprDef).orElse(null), Column.COLUMN_UNIQUE_ID_INIT_VALUE,
defaultValue.map(DefaultValue::getValue).orElse(null));
defaultValue.map(DefaultValue::getValue).orElse(null), onUpdateDefaultValue.isPresent(),
onUpdateDefaultValue.map(DefaultValue::getDefaultValueExprDef).orElse(null));
column.setAggregationTypeImplicit(aggTypeImplicit);
return column;
}

View File

@ -150,6 +150,9 @@ public class StreamLoadPlanner {
if (isPartialUpdate) {
for (Column col : destTable.getFullSchema()) {
boolean existInExpr = false;
if (col.hasOnUpdateDefaultValue()) {
partialUpdateInputColumns.add(col.getName());
}
for (ImportColumnDesc importColumnDesc : taskInfo.getColumnExprDescs().descs) {
if (importColumnDesc.getColumnName() != null
&& importColumnDesc.getColumnName().equals(col.getName())) {