[fix](sequence column) insert into should require sequence column in all scenario (#27780)
This commit is contained in:
@ -441,16 +441,43 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
}
|
||||
}
|
||||
|
||||
if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() != null && targetColumnNames != null) {
|
||||
Optional<String> foundCol = targetColumnNames.stream()
|
||||
.filter(c -> c.equalsIgnoreCase(olapTable.getSequenceMapCol())).findAny();
|
||||
Optional<Column> seqCol = olapTable.getFullSchema().stream()
|
||||
.filter(col -> col.getName().equals(olapTable.getSequenceMapCol()))
|
||||
.findFirst();
|
||||
if (seqCol.isPresent() && !foundCol.isPresent() && !isPartialUpdate && !isFromDeleteOrUpdateStmt
|
||||
// For Unique Key table with sequence column (which default value is not CURRENT_TIMESTAMP),
|
||||
// user MUST specify the sequence column while inserting data
|
||||
//
|
||||
// case1: create table by `function_column.sequence_col`
|
||||
// a) insert with column list, must include the sequence map column
|
||||
// b) insert without column list, already contains the column, don't need to check
|
||||
// case2: create table by `function_column.sequence_type`
|
||||
// a) insert with column list, must include the hidden column __DORIS_SEQUENCE_COL__
|
||||
// b) insert without column list, don't include the hidden column __DORIS_SEQUENCE_COL__
|
||||
// by default, will fail.
|
||||
if (olapTable.hasSequenceCol()) {
|
||||
boolean haveInputSeqCol = false;
|
||||
Optional<Column> seqColInTable = Optional.empty();
|
||||
if (olapTable.getSequenceMapCol() != null) {
|
||||
if (targetColumnNames != null) {
|
||||
if (targetColumnNames.stream()
|
||||
.anyMatch(c -> c.equalsIgnoreCase(olapTable.getSequenceMapCol()))) {
|
||||
haveInputSeqCol = true; // case1.a
|
||||
}
|
||||
} else {
|
||||
haveInputSeqCol = true; // case1.b
|
||||
}
|
||||
seqColInTable = olapTable.getFullSchema().stream()
|
||||
.filter(col -> col.getName().equals(olapTable.getSequenceMapCol())).findFirst();
|
||||
} else {
|
||||
if (targetColumnNames != null) {
|
||||
if (targetColumnNames.stream()
|
||||
.anyMatch(c -> c.equalsIgnoreCase(Column.SEQUENCE_COL))) {
|
||||
haveInputSeqCol = true; // case2.a
|
||||
} // else case2.b
|
||||
}
|
||||
}
|
||||
|
||||
if (!haveInputSeqCol && !isPartialUpdate && !isFromDeleteOrUpdateStmt
|
||||
&& !analyzer.getContext().getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
|
||||
if (seqCol.get().getDefaultValue() == null
|
||||
|| !seqCol.get().getDefaultValue().equals(DefaultValue.CURRENT_TIMESTAMP)) {
|
||||
if (!seqColInTable.isPresent() || seqColInTable.get().getDefaultValue() == null
|
||||
|| !seqColInTable.get().getDefaultValue().equals(DefaultValue.CURRENT_TIMESTAMP)) {
|
||||
throw new AnalysisException("Table " + olapTable.getName()
|
||||
+ " has sequence column, need to specify the sequence column");
|
||||
}
|
||||
|
||||
@ -135,21 +135,44 @@ public class BindSink implements AnalysisRuleFactory {
|
||||
}
|
||||
|
||||
try {
|
||||
// in upserts, users must specify the sequence mapping column explictly
|
||||
// if the target table has sequence mapping column unless the sequence mapping
|
||||
// column has the a default value of CURRENT_TIMESTAMP
|
||||
if (table.hasSequenceCol() && table.getSequenceMapCol() != null
|
||||
&& !sink.getColNames().isEmpty() && !isPartialUpdate) {
|
||||
Column seqCol = table.getFullSchema().stream()
|
||||
.filter(col -> col.getName().equals(table.getSequenceMapCol()))
|
||||
.findFirst().get();
|
||||
Optional<String> foundCol = sink.getColNames().stream()
|
||||
.filter(col -> col.equals(table.getSequenceMapCol()))
|
||||
.findFirst();
|
||||
if (!foundCol.isPresent() && (seqCol.getDefaultValue() == null
|
||||
|| !seqCol.getDefaultValue().equals(DefaultValue.CURRENT_TIMESTAMP))) {
|
||||
throw new AnalysisException("Table " + table.getName()
|
||||
+ " has sequence column, need to specify the sequence column");
|
||||
// For Unique Key table with sequence column (which default value is not CURRENT_TIMESTAMP),
|
||||
// user MUST specify the sequence column while inserting data
|
||||
//
|
||||
// case1: create table by `function_column.sequence_col`
|
||||
// a) insert with column list, must include the sequence map column
|
||||
// b) insert without column list, already contains the column, don't need to check
|
||||
// case2: create table by `function_column.sequence_type`
|
||||
// a) insert with column list, must include the hidden column __DORIS_SEQUENCE_COL__
|
||||
// b) insert without column list, don't include the hidden column __DORIS_SEQUENCE_COL__
|
||||
// by default, will fail.
|
||||
if (table.hasSequenceCol()) {
|
||||
boolean haveInputSeqCol = false;
|
||||
Optional<Column> seqColInTable = Optional.empty();
|
||||
if (table.getSequenceMapCol() != null) {
|
||||
if (!sink.getColNames().isEmpty()) {
|
||||
if (sink.getColNames().contains(table.getSequenceMapCol())) {
|
||||
haveInputSeqCol = true; // case1.a
|
||||
}
|
||||
} else {
|
||||
haveInputSeqCol = true; // case1.b
|
||||
}
|
||||
seqColInTable = table.getFullSchema().stream()
|
||||
.filter(col -> col.getName().equals(table.getSequenceMapCol())).findFirst();
|
||||
} else {
|
||||
if (!sink.getColNames().isEmpty()) {
|
||||
if (sink.getColNames().contains(Column.SEQUENCE_COL)) {
|
||||
haveInputSeqCol = true; // case2.a
|
||||
} // else case2.b
|
||||
}
|
||||
}
|
||||
|
||||
if (!haveInputSeqCol && !isPartialUpdate) {
|
||||
if (!seqColInTable.isPresent() || seqColInTable.get().getDefaultValue() == null
|
||||
|| !seqColInTable.get().getDefaultValue()
|
||||
.equals(DefaultValue.CURRENT_TIMESTAMP)) {
|
||||
throw new org.apache.doris.common.AnalysisException("Table " + table.getName()
|
||||
+ " has sequence column, need to specify the sequence column");
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
||||
@ -35,13 +35,13 @@
|
||||
|
||||
-- !part --
|
||||
1 10 15
|
||||
15 9 18
|
||||
15 8 19
|
||||
2 5 14
|
||||
3 6 11
|
||||
|
||||
-- !all --
|
||||
1 10 15 16 17 0 4 15
|
||||
15 9 18 21 22 0 8 \N
|
||||
15 8 19 20 21 0 7 3
|
||||
2 5 14 13 14 0 5 12
|
||||
3 6 11 14 15 0 6 13
|
||||
|
||||
|
||||
@ -34,7 +34,7 @@ suite("test_unique_table_new_sequence") {
|
||||
"light_schema_change" = "true"
|
||||
);
|
||||
"""
|
||||
// load unique key
|
||||
// test streamload with seq col
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
@ -60,7 +60,7 @@ suite("test_unique_table_new_sequence") {
|
||||
sql "sync"
|
||||
order_qt_all "SELECT * from ${tableName}"
|
||||
|
||||
// load unique key
|
||||
// test update data, using streamload with seq col
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
@ -105,9 +105,17 @@ suite("test_unique_table_new_sequence") {
|
||||
|
||||
order_qt_all "SELECT * from ${tableName}"
|
||||
|
||||
// test insert into with column list, which not contains the seq mapping column v2
|
||||
test {
|
||||
sql "INSERT INTO ${tableName} (k1, v1, v3, v4) values(15, 8, 20, 21)"
|
||||
exception "Table ${tableName} has sequence column, need to specify the sequence column"
|
||||
}
|
||||
|
||||
// test insert into without column list
|
||||
sql "INSERT INTO ${tableName} values(15, 8, 19, 20, 21)"
|
||||
|
||||
sql "INSERT INTO ${tableName} values(15, 9, 18, 21, 22)"
|
||||
// test insert into with column list
|
||||
sql "INSERT INTO ${tableName} (k1, v1, v2, v3, v4) values(15, 9, 18, 21, 22)"
|
||||
|
||||
sql "SET show_hidden_columns=true"
|
||||
|
||||
@ -141,8 +149,18 @@ suite("test_unique_table_new_sequence") {
|
||||
);
|
||||
"""
|
||||
|
||||
// test insert into with column list, which not contains the seq mapping column v4
|
||||
// in begin/commit
|
||||
sql "begin;"
|
||||
sql "insert into ${tableName} (k1, v1, v2, v3, v4) values (1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3);"
|
||||
test {
|
||||
sql "INSERT INTO ${tableName} (k1, v1, v2, v3) values(1,1,1,1)"
|
||||
exception "Table ${tableName} has sequence column, need to specify the sequence column"
|
||||
}
|
||||
sql "commit;"
|
||||
|
||||
// test insert into without column list, in begin/commit
|
||||
sql "begin;"
|
||||
sql "insert into ${tableName} values (1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3);"
|
||||
sql "commit;"
|
||||
|
||||
qt_1 "select * from ${tableName} order by k1;"
|
||||
|
||||
@ -33,7 +33,7 @@ suite("test_unique_table_sequence") {
|
||||
"replication_allocation" = "tag.location.default: 1"
|
||||
);
|
||||
"""
|
||||
// load unique key
|
||||
// test streamload with seq col
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
@ -60,7 +60,7 @@ suite("test_unique_table_sequence") {
|
||||
sql "sync"
|
||||
order_qt_all "SELECT * from ${tableName}"
|
||||
|
||||
// load unique key
|
||||
// test update data, using streamload with seq col
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
|
||||
@ -92,6 +92,7 @@ suite("test_unique_table_sequence") {
|
||||
|
||||
order_qt_all "SELECT * from ${tableName}"
|
||||
|
||||
// test update on table with seq col
|
||||
sql "UPDATE ${tableName} SET v1 = 10 WHERE k1 = 1"
|
||||
|
||||
sql "UPDATE ${tableName} SET v2 = 14 WHERE k1 = 2"
|
||||
@ -106,9 +107,22 @@ suite("test_unique_table_sequence") {
|
||||
|
||||
order_qt_all "SELECT * from ${tableName}"
|
||||
|
||||
sql "INSERT INTO ${tableName} values(15, 8, 19, 20, 21)"
|
||||
// test insert into without column list
|
||||
test {
|
||||
sql "INSERT INTO ${tableName} values(15, 8, 19, 20, 21)"
|
||||
exception "Table ${tableName} has sequence column, need to specify the sequence column"
|
||||
}
|
||||
|
||||
sql "INSERT INTO ${tableName} values(15, 9, 18, 21, 22)"
|
||||
// test insert into with column list
|
||||
test {
|
||||
sql "INSERT INTO ${tableName} (k1, v1, v2, v3, v4) values(15, 8, 19, 20, 21)"
|
||||
exception "Table ${tableName} has sequence column, need to specify the sequence column"
|
||||
}
|
||||
|
||||
// correct way of insert into with seq col
|
||||
sql "INSERT INTO ${tableName} (k1, v1, v2, v3, v4, __DORIS_SEQUENCE_COL__) values(15, 8, 19, 20, 21, 3)"
|
||||
|
||||
sql "INSERT INTO ${tableName} (k1, v1, v2, v3, v4, __DORIS_SEQUENCE_COL__) values(15, 9, 18, 21, 22, 2)"
|
||||
|
||||
sql "SET show_hidden_columns=true"
|
||||
|
||||
@ -139,6 +153,22 @@ suite("test_unique_table_sequence") {
|
||||
);
|
||||
"""
|
||||
|
||||
// test insert into without column list, in begin/commit
|
||||
sql "begin;"
|
||||
test {
|
||||
sql "INSERT INTO ${tableName} values(15, 8, 19, 20, 21)"
|
||||
exception "Table ${tableName} has sequence column, need to specify the sequence column"
|
||||
}
|
||||
sql "commit;"
|
||||
|
||||
// test insert into with column list, in begin/commit
|
||||
sql "begin;"
|
||||
test {
|
||||
sql "INSERT INTO ${tableName} (k1, v1, v2, v3, v4) values(15, 8, 19, 20, 21)"
|
||||
exception "Table ${tableName} has sequence column, need to specify the sequence column"
|
||||
}
|
||||
sql "commit;"
|
||||
|
||||
sql "begin;"
|
||||
sql "insert into ${tableName} (k1, v1, v2, v3, v4, __DORIS_SEQUENCE_COL__) values (1,1,1,1,1,1),(2,2,2,2,2,2),(3,3,3,3,3,3);"
|
||||
sql "commit;"
|
||||
|
||||
@ -115,18 +115,15 @@ suite("test_point_query") {
|
||||
""", property)
|
||||
}
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
for (int i = 0; i < 2; i++) {
|
||||
tableName = realDb + ".tbl_point_query" + i
|
||||
sql """DROP TABLE IF EXISTS ${tableName}"""
|
||||
if (i == 0) {
|
||||
def sql0 = create_table_sql("")
|
||||
sql """ ${sql0} """
|
||||
} else if (i == 1) {
|
||||
def sql1 = create_table_sql("\"function_column.sequence_type\" = 'int',")
|
||||
sql """ ${sql1} """
|
||||
} else {
|
||||
def sql2 = create_table_sql("\"function_column.sequence_col\" = 'k6',")
|
||||
sql """ ${sql2} """
|
||||
def sql1 = create_table_sql("\"function_column.sequence_col\" = 'k6',")
|
||||
sql """ ${sql1} """
|
||||
}
|
||||
sql """ INSERT INTO ${tableName} VALUES(1231, 119291.11, "ddd", "laooq", null, "2020-01-01 12:36:38", null, "1022-01-01 11:30:38", null, 1.111112, [119181.1111, 819019.1191, null], null) """
|
||||
sql """ INSERT INTO ${tableName} VALUES(1232, 12222.99121135, "xxx", "laooq", "2023-01-02", "2020-01-01 12:36:38", 522.762, "2022-01-01 11:30:38", 1, 212.111, null, null) """
|
||||
|
||||
@ -116,18 +116,15 @@ suite("test_point_query_cluster_key") {
|
||||
""", property)
|
||||
}
|
||||
|
||||
for (int i = 0; i < 3; i++) {
|
||||
for (int i = 0; i < 2; i++) {
|
||||
tableName = realDb + ".tbl_point_query_cluster_key" + i
|
||||
sql """DROP TABLE IF EXISTS ${tableName}"""
|
||||
if (i == 0) {
|
||||
def sql0 = create_table_sql("")
|
||||
sql """ ${sql0} """
|
||||
} else if (i == 1) {
|
||||
def sql1 = create_table_sql("\"function_column.sequence_type\" = 'int',")
|
||||
sql """ ${sql1} """
|
||||
} else {
|
||||
def sql2 = create_table_sql("\"function_column.sequence_col\" = 'k6',")
|
||||
sql """ ${sql2} """
|
||||
def sql1 = create_table_sql("\"function_column.sequence_col\" = 'k6',")
|
||||
sql """ ${sql1} """
|
||||
}
|
||||
sql """ INSERT INTO ${tableName} VALUES(1231, 119291.11, "ddd", "laooq", null, "2020-01-01 12:36:38", null, "1022-01-01 11:30:38", null, 1.111112, [119181.1111, 819019.1191, null], null) """
|
||||
sql """ INSERT INTO ${tableName} VALUES(1232, 12222.99121135, "xxx", "laooq", "2023-01-02", "2020-01-01 12:36:38", 522.762, "2022-01-01 11:30:38", 1, 212.111, null, null) """
|
||||
|
||||
@ -50,7 +50,7 @@ suite("test_uniq_seq_col_schema_change", "schema_change") {
|
||||
|
||||
sql "insert into ${tbName1} ${columnWithHidden_2}values(5,5,5,5,5,0,5);"
|
||||
sql "insert into ${tbName1} ${columnWithHidden_2}values(5,6,6,6,6,0,6);"
|
||||
sql "insert into ${tbName1} values(5,6,6,7,6);"
|
||||
sql "insert into ${tbName1} ${columnWithHidden_2}values(5,6,6,7,6,0,4);"
|
||||
qt_sql "select * from ${tbName1} order by k1;"
|
||||
sql "insert into ${tbName1} ${columnWithHidden_2}values(5,6,6,7,6,0,7);"
|
||||
qt_sql "select * from ${tbName1} order by k1;"
|
||||
|
||||
@ -47,9 +47,11 @@ suite("test_primary_key_partial_update_seq_type", "p0") {
|
||||
"store_row_column" = "${use_row_store}"); """
|
||||
// insert 2 lines
|
||||
sql """
|
||||
insert into ${tableName} values
|
||||
(2, "doris2", 2000, 223, 1, '2023-01-01'),
|
||||
(1, "doris", 1000, 123, 1, '2023-01-01')
|
||||
insert into ${tableName}
|
||||
(id, name, score, test, dft, update_time, __DORIS_SEQUENCE_COL__)
|
||||
values
|
||||
(2, "doris2", 2000, 223, 1, '2023-01-01', 1),
|
||||
(1, "doris", 1000, 123, 1, '2023-01-01', 1)
|
||||
"""
|
||||
|
||||
sql "sync"
|
||||
|
||||
@ -47,9 +47,11 @@ suite("test_primary_key_partial_update_seq_type_delete", "p0") {
|
||||
"store_row_column" = "${use_row_store}"); """
|
||||
// insert 2 lines
|
||||
sql """
|
||||
insert into ${tableName} values
|
||||
(2, "doris2", 2000, 223, 1, '2023-01-01'),
|
||||
(1, "doris", 1000, 123, 1, '2023-01-01')
|
||||
insert into ${tableName}
|
||||
(id, name, score, test, dft, update_time, __DORIS_SEQUENCE_COL__)
|
||||
values
|
||||
(2, "doris2", 2000, 223, 1, '2023-01-01', 1),
|
||||
(1, "doris", 1000, 123, 1, '2023-01-01', 1)
|
||||
"""
|
||||
|
||||
sql "sync"
|
||||
|
||||
Reference in New Issue
Block a user