[fix](txn insert) Fix txn insert into values for sequence column or column name is keyword (#33336)
This commit is contained in:
@ -1326,4 +1326,9 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
slotDesc.setIsNullable(col.isAllowNull());
|
||||
}
|
||||
}
|
||||
|
||||
public boolean containTargetColumnName(String columnName) {
|
||||
return targetColumnNames != null && targetColumnNames.stream()
|
||||
.anyMatch(col -> col.equalsIgnoreCase(columnName));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1838,13 +1838,11 @@ public class StmtExecutor {
|
||||
int schemaSize = tbl.getBaseSchema(false).size();
|
||||
if (parsedStmt instanceof NativeInsertStmt
|
||||
&& ((NativeInsertStmt) parsedStmt).getTargetColumnNames() != null) {
|
||||
|
||||
if (((NativeInsertStmt) parsedStmt).getTargetColumnNames()
|
||||
.contains(Column.SEQUENCE_COL)) {
|
||||
NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) parsedStmt;
|
||||
if (nativeInsertStmt.containTargetColumnName(Column.SEQUENCE_COL)) {
|
||||
schemaSize++;
|
||||
}
|
||||
if (((NativeInsertStmt) parsedStmt).getTargetColumnNames()
|
||||
.contains(Column.DELETE_SIGN)) {
|
||||
if (nativeInsertStmt.containTargetColumnName(Column.DELETE_SIGN)) {
|
||||
schemaSize++;
|
||||
}
|
||||
}
|
||||
@ -1922,19 +1920,13 @@ public class StmtExecutor {
|
||||
.setExecMemLimit(maxExecMemByte).setTimeout((int) timeoutSecond)
|
||||
.setTimezone(timeZone).setSendBatchParallelism(sendBatchParallelism).setTrimDoubleQuotes(true);
|
||||
if (parsedStmt instanceof NativeInsertStmt && ((NativeInsertStmt) parsedStmt).getTargetColumnNames() != null) {
|
||||
List<String> targetColumnNames = ((NativeInsertStmt) parsedStmt).getTargetColumnNames();
|
||||
if (targetColumnNames.contains(Column.SEQUENCE_COL) || targetColumnNames.contains(Column.DELETE_SIGN)) {
|
||||
if (targetColumnNames.contains(Column.SEQUENCE_COL)) {
|
||||
NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) parsedStmt;
|
||||
if (nativeInsertStmt.containTargetColumnName(Column.SEQUENCE_COL)
|
||||
|| nativeInsertStmt.containTargetColumnName(Column.DELETE_SIGN)) {
|
||||
if (nativeInsertStmt.containTargetColumnName(Column.SEQUENCE_COL)) {
|
||||
request.setSequenceCol(Column.SEQUENCE_COL);
|
||||
}
|
||||
StringBuilder allCols = new StringBuilder();
|
||||
for (String col : ((NativeInsertStmt) parsedStmt).getTargetColumnNames()) {
|
||||
allCols.append(col);
|
||||
allCols.append(",");
|
||||
}
|
||||
allCols.deleteCharAt(allCols.length() - 1);
|
||||
request.setColumns(String.valueOf(allCols));
|
||||
request.setColumnSeparator(",");
|
||||
request.setColumns("`" + String.join("`,`", nativeInsertStmt.getTargetColumnNames()) + "`");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -13,7 +13,7 @@ k1 INT Yes true \N
|
||||
v1 TINYINT Yes false \N NONE
|
||||
v2 INT Yes false \N NONE
|
||||
v3 INT Yes false \N NONE
|
||||
v4 INT Yes false \N NONE
|
||||
or INT Yes false \N NONE
|
||||
__DORIS_DELETE_SIGN__ TINYINT No false 0 NONE
|
||||
__DORIS_VERSION_COL__ BIGINT No false 0 NONE
|
||||
|
||||
|
||||
@ -75,7 +75,7 @@ k1 INT Yes true \N
|
||||
v1 TINYINT Yes false \N REPLACE
|
||||
v2 INT Yes false \N REPLACE
|
||||
v3 INT Yes false \N REPLACE
|
||||
v4 INT Yes false \N REPLACE
|
||||
or INT Yes false \N REPLACE
|
||||
__DORIS_DELETE_SIGN__ TINYINT No false 0 REPLACE
|
||||
__DORIS_VERSION_COL__ BIGINT No false 0 REPLACE
|
||||
__DORIS_SEQUENCE_COL__ INT Yes false \N REPLACE
|
||||
@ -156,7 +156,7 @@ k1 INT Yes true \N
|
||||
v1 TINYINT Yes false \N REPLACE
|
||||
v2 INT Yes false \N REPLACE
|
||||
v3 INT Yes false \N REPLACE
|
||||
v4 INT Yes false \N REPLACE
|
||||
or INT Yes false \N REPLACE
|
||||
__DORIS_DELETE_SIGN__ TINYINT No false 0 REPLACE
|
||||
__DORIS_VERSION_COL__ BIGINT No false 0 REPLACE
|
||||
__DORIS_SEQUENCE_COL__ INT Yes false \N REPLACE
|
||||
|
||||
@ -50,7 +50,7 @@ suite("test_unique_table") {
|
||||
`v1` tinyint NULL,
|
||||
`v2` int,
|
||||
`v3` int,
|
||||
`v4` int
|
||||
`or` int
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(k1)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
|
||||
@ -61,19 +61,19 @@ suite("test_unique_table") {
|
||||
sql "SET show_hidden_columns=true"
|
||||
qt_0 "desc ${tbName}"
|
||||
sql "begin;"
|
||||
sql "insert into ${tbName} (k1, v1, v2, v3, v4, __DORIS_DELETE_SIGN__) values (1,1,1,1,1,0),(2,2,2,2,2,0),(3,3,3,3,3,0);"
|
||||
sql "insert into ${tbName} (k1, v1, v2, v3, `or`, __doris_delete_sign__) values (1,1,1,1,1,0),(2,2,2,2,2,0),(3,3,3,3,3,0);"
|
||||
sql "commit;"
|
||||
|
||||
qt_1 "select * from ${tbName} order by k1;"
|
||||
|
||||
sql "begin;"
|
||||
sql "insert into ${tbName} (k1, v1, v2, v3, v4, __DORIS_DELETE_SIGN__) values (2,20,20,20,20,0);"
|
||||
sql "insert into ${tbName} (k1, v1, v2, v3, `or`, __DORIS_DELETE_SIGN__) values (2,20,20,20,20,0);"
|
||||
sql "commit;"
|
||||
|
||||
qt_2 "select * from ${tbName} order by k1;"
|
||||
|
||||
sql "begin;"
|
||||
sql "insert into ${tbName} (k1, v1, v2, v3, v4, __DORIS_DELETE_SIGN__) values (3,30,30,30,30,1);"
|
||||
sql "insert into ${tbName} (k1, v1, v2, v3, `or`, __DORIS_DELETE_SIGN__) values (3,30,30,30,30,1);"
|
||||
sql "commit;"
|
||||
|
||||
qt_3 "select * from ${tbName} order by k1"
|
||||
|
||||
@ -16,7 +16,7 @@
|
||||
// under the License.
|
||||
|
||||
suite("test_unique_table_new_sequence") {
|
||||
for (def enable_fall_back : [false, true]) {
|
||||
for (def enable_nereids_planner : [false, true]) {
|
||||
def tableName = "test_uniq_new_sequence"
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
sql """
|
||||
@ -36,7 +36,9 @@ suite("test_unique_table_new_sequence") {
|
||||
"light_schema_change" = "true"
|
||||
);
|
||||
"""
|
||||
sql "set enable_fallback_to_original_planner=${enable_fall_back}"
|
||||
sql """ set enable_nereids_dml = ${enable_nereids_planner}; """
|
||||
sql """ set enable_nereids_planner=${enable_nereids_planner}; """
|
||||
sql "set enable_fallback_to_original_planner=false; "
|
||||
// test streamload with seq col
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
@ -141,13 +143,13 @@ suite("test_unique_table_new_sequence") {
|
||||
`v1` tinyint NULL,
|
||||
`v2` int,
|
||||
`v3` int,
|
||||
`v4` int
|
||||
`or` int
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(k1)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
|
||||
PROPERTIES (
|
||||
"enable_unique_key_merge_on_write" = "false",
|
||||
"function_column.sequence_col" = "v4",
|
||||
"function_column.sequence_col" = "or",
|
||||
"replication_allocation" = "tag.location.default: 1",
|
||||
"light_schema_change" = "true"
|
||||
);
|
||||
@ -170,13 +172,13 @@ suite("test_unique_table_new_sequence") {
|
||||
qt_1 "select * from ${tableName} order by k1;"
|
||||
|
||||
sql "begin;"
|
||||
sql "insert into ${tableName} (k1, v1, v2, v3, v4) values (2,20,20,20,20);"
|
||||
sql "insert into ${tableName} (k1, v1, v2, v3, `OR`) values (2,20,20,20,20);"
|
||||
sql "commit;"
|
||||
|
||||
qt_2 "select * from ${tableName} order by k1;"
|
||||
|
||||
sql "begin;"
|
||||
sql "insert into ${tableName} (k1, v1, v2, v3, v4) values (3,30,30,30,1);"
|
||||
sql "insert into ${tableName} (k1, v1, v2, v3, `or`) values (3,30,30,30,1);"
|
||||
sql "commit;"
|
||||
|
||||
qt_3 "select * from ${tableName} order by k1"
|
||||
|
||||
@ -16,7 +16,7 @@
|
||||
// under the License.
|
||||
|
||||
suite("test_unique_table_sequence") {
|
||||
for (def enable_fall_back : [false, true]) {
|
||||
for (def enable_nereids_planner : [false, true]) {
|
||||
def tableName = "test_uniq_sequence"
|
||||
sql "DROP TABLE IF EXISTS ${tableName}"
|
||||
sql """
|
||||
@ -34,7 +34,9 @@ suite("test_unique_table_sequence") {
|
||||
"replication_allocation" = "tag.location.default: 1"
|
||||
);
|
||||
"""
|
||||
sql "set enable_fallback_to_original_planner=${enable_fall_back}"
|
||||
sql """ set enable_nereids_dml = ${enable_nereids_planner}; """
|
||||
sql """ set enable_nereids_planner=${enable_nereids_planner}; """
|
||||
sql "set enable_fallback_to_original_planner=false; "
|
||||
// test streamload with seq col
|
||||
streamLoad {
|
||||
table "${tableName}"
|
||||
@ -145,7 +147,7 @@ suite("test_unique_table_sequence") {
|
||||
`v1` tinyint NULL,
|
||||
`v2` int,
|
||||
`v3` int,
|
||||
`v4` int
|
||||
`or` int
|
||||
) ENGINE=OLAP
|
||||
UNIQUE KEY(k1)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
|
||||
@ -166,25 +168,25 @@ suite("test_unique_table_sequence") {
|
||||
// test insert into with column list, in begin/commit
|
||||
sql "begin;"
|
||||
test {
|
||||
sql "INSERT INTO ${tableName} (k1, v1, v2, v3, v4) values(15, 8, 19, 20, 21)"
|
||||
sql "INSERT INTO ${tableName} (k1, v1, v2, v3, `or`) values(15, 8, 19, 20, 21)"
|
||||
exception "Table ${tableName} has sequence column, need to specify the sequence column"
|
||||
}
|
||||
sql "commit;"
|
||||
|
||||
sql "begin;"
|
||||
sql "insert into ${tableName} (k1, v1, v2, v3, v4, __DORIS_SEQUENCE_COL__) values (1,1,1,1,1,1),(2,2,2,2,2,2),(3,3,3,3,3,3);"
|
||||
sql "insert into ${tableName} (k1, v1, v2, v3, `or`, __doris_sequence_col__) values (1,1,1,1,1,1),(2,2,2,2,2,2),(3,3,3,3,3,3);"
|
||||
sql "commit;"
|
||||
|
||||
qt_1 "select * from ${tableName} order by k1;"
|
||||
|
||||
sql "begin;"
|
||||
sql "insert into ${tableName} (k1, v1, v2, v3, v4, __DORIS_SEQUENCE_COL__) values (2,20,20,20,20,20);"
|
||||
sql "insert into ${tableName} (k1, v1, v2, v3, `or`, __DORIS_SEQUENCE_COL__) values (2,20,20,20,20,20);"
|
||||
sql "commit;"
|
||||
|
||||
qt_2 "select * from ${tableName} order by k1;"
|
||||
|
||||
sql "begin;"
|
||||
sql "insert into ${tableName} (k1, v1, v2, v3, v4, __DORIS_SEQUENCE_COL__) values (3,30,30,30,30,1);"
|
||||
sql "insert into ${tableName} (k1, v1, v2, v3, `or`, __DORIS_SEQUENCE_COL__) values (3,30,30,30,30,1);"
|
||||
sql "commit;"
|
||||
|
||||
qt_3 "select * from ${tableName} order by k1"
|
||||
|
||||
Reference in New Issue
Block a user