[Fix](merge-on-write) Fix some bugs about sequence column (#24915)

1. add checks and handling of sequence column in #21896 to insert statement in origin planner and Nereids planner.
2. disable drop sequence mapping column in schema change.
This commit is contained in:
bobhan1
2023-10-18 20:40:12 +08:00
committed by GitHub
parent 9c9fc84f39
commit 2a442972a8
19 changed files with 378 additions and 52 deletions

View File

@ -323,6 +323,11 @@ public class SchemaChangeHandler extends AlterHandler {
throw new DdlException("Can not drop key column in Unique data model table");
}
if (olapTable.hasSequenceCol() && dropColName.equalsIgnoreCase(olapTable.getSequenceMapCol())) {
throw new DdlException("Can not drop sequence mapping column[" + dropColName
+ "] in Unique data model table[" + olapTable.getName() + "]");
}
} else if (KeysType.AGG_KEYS == olapTable.getKeysType()) {
if (null == targetIndexName) {
// drop column in base table

View File

@ -18,6 +18,7 @@
package org.apache.doris.analysis;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.analysis.ColumnDef.DefaultValue;
import org.apache.doris.catalog.BrokerTable;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
@ -89,6 +90,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -442,6 +444,23 @@ public class NativeInsertStmt extends InsertStmt {
targetPartitionIds.add(part.getId());
}
}
if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() != null && targetColumnNames != null) {
Optional<String> foundCol = targetColumnNames.stream()
.filter(c -> c.equalsIgnoreCase(olapTable.getSequenceMapCol())).findAny();
Optional<Column> seqCol = olapTable.getFullSchema().stream()
.filter(col -> col.getName().equals(olapTable.getSequenceMapCol()))
.findFirst();
if (seqCol.isPresent() && !foundCol.isPresent() && !isPartialUpdate && !isFromDeleteOrUpdateStmt
&& !analyzer.getContext().getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
if (seqCol.get().getDefaultValue() == null
|| !seqCol.get().getDefaultValue().equals(DefaultValue.CURRENT_TIMESTAMP)) {
throw new AnalysisException("Table " + olapTable.getName()
+ " has sequence column, need to specify the sequence column");
}
}
}
if (isPartialUpdate && olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() != null
&& partialUpdateCols.contains(olapTable.getSequenceMapCol())) {
partialUpdateCols.add(Column.SEQUENCE_COL);

View File

@ -17,6 +17,7 @@
package org.apache.doris.nereids.rules.analysis;
import org.apache.doris.analysis.ColumnDef.DefaultValue;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
@ -107,6 +108,25 @@ public class BindSink implements AnalysisRuleFactory {
throw new AnalysisException("insert into cols should be corresponding to the query output");
}
try {
if (table.hasSequenceCol() && table.getSequenceMapCol() != null
&& !sink.getColNames().isEmpty() && !boundSink.isPartialUpdate()) {
Column seqCol = table.getFullSchema().stream()
.filter(col -> col.getName().equals(table.getSequenceMapCol()))
.findFirst().get();
Optional<String> foundCol = sink.getColNames().stream()
.filter(col -> col.equals(table.getSequenceMapCol()))
.findFirst();
if (!foundCol.isPresent() && (seqCol.getDefaultValue() == null
|| !seqCol.getDefaultValue().equals(DefaultValue.CURRENT_TIMESTAMP))) {
throw new AnalysisException("Table " + table.getName()
+ " has sequence column, need to specify the sequence column");
}
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}
Map<Column, NamedExpression> columnToChildOutput = Maps.newHashMap();
for (int i = 0; i < boundSink.getCols().size(); ++i) {
columnToChildOutput.put(boundSink.getCols().get(i), child.getOutput().get(i));
@ -149,7 +169,9 @@ public class BindSink implements AnalysisRuleFactory {
throw new AnalysisException("sequence column is not contained in"
+ " target table " + table.getName());
}
columnToOutput.put(column.getName(), columnToOutput.get(seqCol.get().getName()));
if (columnToOutput.get(seqCol.get().getName()) != null) {
columnToOutput.put(column.getName(), columnToOutput.get(seqCol.get().getName()));
}
} else if (sink.isPartialUpdate()) {
// If the current load is a partial update, the values of unmentioned
// columns will be filled in SegmentWriter. And the output of sink node

View File

@ -18,6 +18,7 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.ColumnDef.DefaultValue;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.IntLiteral;
import org.apache.doris.analysis.SlotRef;
@ -188,7 +189,8 @@ public class LoadScanProvider {
// add columnExpr for sequence column
TableIf targetTable = getTargetTable();
if (targetTable instanceof OlapTable && ((OlapTable) targetTable).hasSequenceCol()) {
String sequenceCol = ((OlapTable) targetTable).getSequenceMapCol();
OlapTable olapTable = (OlapTable) targetTable;
String sequenceCol = olapTable.getSequenceMapCol();
if (sequenceCol != null) {
String finalSequenceCol = sequenceCol;
Optional<ImportColumnDesc> foundCol = columnDescs.descs.stream()
@ -199,8 +201,14 @@ public class LoadScanProvider {
columnDescs.descs.add(new ImportColumnDesc(Column.SEQUENCE_COL,
new SlotRef(null, sequenceCol)));
} else if (!fileGroupInfo.isPartialUpdate()) {
throw new UserException("Table " + targetTable.getName()
+ " has sequence column, need to specify the sequence column");
Column seqCol = olapTable.getFullSchema().stream()
.filter(col -> col.getName().equals(olapTable.getSequenceMapCol()))
.findFirst().get();
if (seqCol.getDefaultValue() == null
|| !seqCol.getDefaultValue().equals(DefaultValue.CURRENT_TIMESTAMP)) {
throw new UserException("Table " + olapTable.getName()
+ " has sequence column, need to specify the sequence column");
}
}
} else {
sequenceCol = context.fileGroup.getSequenceCol();