diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 219c9b9373..d4802ea956 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -260,8 +260,20 @@ public class NativeInsertStmt extends InsertStmt { tblName.setDb(olapTable.getDatabase().getFullName()); tblName.setTbl(olapTable.getName()); if (olapTable.getDeleteSignColumn() != null) { - List columns = olapTable.getBaseSchema(false); + List columns = Lists.newArrayList(olapTable.getBaseSchema(false)); + // The same order as GroupCommitTableValuedFunction#getTableColumns + // delete sign col columns.add(olapTable.getDeleteSignColumn()); + // version col + Column versionColumn = olapTable.getFullSchema().stream().filter(Column::isVersionColumn).findFirst() + .orElse(null); + if (versionColumn != null) { + columns.add(versionColumn); + } + // sequence col + if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() == null) { + columns.add(olapTable.getSequenceCol()); + } targetColumnNames = columns.stream().map(c -> c.getName()).collect(Collectors.toList()); } } @@ -1136,6 +1148,9 @@ public class NativeInsertStmt extends InsertStmt { TStreamLoadPutRequest streamLoadPutRequest = new TStreamLoadPutRequest(); if (targetColumnNames != null) { streamLoadPutRequest.setColumns(String.join(",", targetColumnNames)); + if (targetColumnNames.stream().anyMatch(col -> col.equalsIgnoreCase(Column.SEQUENCE_COL))) { + streamLoadPutRequest.setSequenceCol(Column.SEQUENCE_COL); + } } streamLoadPutRequest.setDb(db.getFullName()).setMaxFilterRatio(1) .setTbl(getTbl()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java index 0b08f8c988..4029d612f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java @@ -66,13 +66,26 @@ public class GroupCommitTableValuedFunction extends ExternalFileTableValuedFunct throw new AnalysisException("Only support OLAP table, but table type of table_id " + tableId + " is " + table.getType()); } - Column deleteSignColumn = ((OlapTable) table).getDeleteSignColumn(); List tableColumns = table.getBaseSchema(false); for (int i = 1; i <= tableColumns.size(); i++) { fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getType(), true)); } + OlapTable olapTable = (OlapTable) table; + // delete sign column + Column deleteSignColumn = olapTable.getDeleteSignColumn(); if (deleteSignColumn != null) { - fileColumns.add(new Column("c" + (tableColumns.size() + 1), deleteSignColumn.getType(), true)); + fileColumns.add(new Column("c" + (fileColumns.size() + 1), deleteSignColumn.getType(), true)); + } + // version column + Column versionColumn = olapTable.getFullSchema().stream().filter(Column::isVersionColumn).findFirst() + .orElse(null); + if (versionColumn != null) { + fileColumns.add(new Column("c" + (fileColumns.size() + 1), deleteSignColumn.getType(), true)); + } + // sequence column + if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() == null) { + Column sequenceCol = olapTable.getSequenceCol(); + fileColumns.add(new Column("c" + (fileColumns.size() + 1), sequenceCol.getType(), true)); } return fileColumns; } diff --git a/regression-test/data/insert_p0/insert_group_commit_into_duplicate.out b/regression-test/data/insert_p0/insert_group_commit_into.out similarity index 100% rename from regression-test/data/insert_p0/insert_group_commit_into_duplicate.out rename to regression-test/data/insert_p0/insert_group_commit_into.out diff --git a/regression-test/data/insert_p0/insert_group_commit_into_unique.out b/regression-test/data/insert_p0/insert_group_commit_into_unique.out new file mode 100644 index 0000000000..ad77464506 --- /dev/null +++ b/regression-test/data/insert_p0/insert_group_commit_into_unique.out @@ -0,0 +1,71 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +1 a 10 1 +2 b -1 0 +3 c -1 0 +4 \N -1 0 +5 q 50 0 +6 \N -1 0 +11 a 211 0 +12 b 22 1 +13 c 23 0 +14 d 24 0 +15 c 23 0 +16 d 24 1 + +-- !sql -- +2 b -1 0 +3 c -1 0 +4 \N -1 0 +5 q 50 0 +6 \N -1 0 +11 a 211 0 +13 c 23 0 +14 d 24 0 +15 c 23 0 + +-- !sql -- +1 a 10 10 1 +2 b 30 30 0 +3 c 30 30 0 +4 \N 70 70 0 +5 q 50 50 0 +6 \N 60 60 0 +11 a 211 211 0 +12 b 22 22 1 +13 c 23 23 0 +14 d 24 24 0 +15 c 23 23 0 +16 d 24 24 1 + +-- !sql -- +2 b 30 30 0 +3 c 30 30 0 +4 \N 70 70 0 +5 q 50 50 0 +6 \N 60 60 0 +11 a 211 211 0 +13 c 23 23 0 +14 d 24 24 0 +15 c 23 23 0 + +-- !sql -- +1 a 200 200 1 +2 b 30 200 0 +3 c 30 300 0 +5 q 50 500 0 +6 \N 60 600 0 +10 a 10 11 0 +11 a 11 10 1 +12 a 12 10 0 +13 a 13 10 0 + +-- !sql -- +2 b 30 200 0 +3 c 30 300 0 +5 q 50 500 0 +6 \N 60 600 0 +10 a 10 11 0 +12 a 12 10 0 +13 a 13 10 0 + diff --git a/regression-test/data/insert_p0/test_group_commit_1.csv b/regression-test/data/insert_p0/test_group_commit_1.csv new file mode 100644 index 0000000000..df50c885fa --- /dev/null +++ b/regression-test/data/insert_p0/test_group_commit_1.csv @@ -0,0 +1,4 @@ +11,a,21 +12,b,22 +13,c,23 +14,d,24 \ No newline at end of file diff --git a/regression-test/data/insert_p0/test_group_commit_2.csv b/regression-test/data/insert_p0/test_group_commit_2.csv new file mode 100644 index 0000000000..afd03ffd2d --- /dev/null +++ b/regression-test/data/insert_p0/test_group_commit_2.csv @@ -0,0 +1,4 @@ +11,a,211,0 +12,b,22,1 +15,c,23,0 +16,d,24,1 \ No newline at end of file diff --git a/regression-test/data/insert_p0/test_group_commit_3.csv b/regression-test/data/insert_p0/test_group_commit_3.csv new file mode 100644 index 0000000000..16e8e43cc5 --- /dev/null +++ b/regression-test/data/insert_p0/test_group_commit_3.csv @@ -0,0 +1,4 @@ +10,a,10,10 +11,a,11,10 +12,a,12,10 +13,a,13,10 \ No newline at end of file diff --git a/regression-test/data/insert_p0/test_group_commit_4.csv b/regression-test/data/insert_p0/test_group_commit_4.csv new file mode 100644 index 0000000000..0724cfec78 --- /dev/null +++ b/regression-test/data/insert_p0/test_group_commit_4.csv @@ -0,0 +1,4 @@ +10,a,10,11,0 +11,a,11,10,1 +12,a,12,9,0 +13,a,13,9,1 \ No newline at end of file diff --git a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy b/regression-test/suites/insert_p0/insert_group_commit_into.groovy similarity index 99% rename from regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy rename to regression-test/suites/insert_p0/insert_group_commit_into.groovy index 9fbc6aaf36..26cc9ca977 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into_duplicate.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy @@ -17,7 +17,7 @@ import com.mysql.cj.jdbc.StatementImpl -suite("insert_group_commit_into_duplicate") { +suite("insert_group_commit_into") { def dbName = "regression_test_insert_p0" def tableName = "insert_group_commit_into_duplicate" def table = dbName + "." + tableName diff --git a/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy b/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy new file mode 100644 index 0000000000..1caafe16b7 --- /dev/null +++ b/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy @@ -0,0 +1,318 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import com.mysql.cj.jdbc.StatementImpl + +suite("insert_group_commit_into_unique") { + def dbName = "regression_test_insert_p0" + def tableName = "insert_group_commit_into_unique" + def dbTableName = dbName + "." + tableName + + def getRowCount = { expectedRowCount -> + def retry = 0 + while (retry < 30) { + sleep(2000) + def rowCount = sql "select count(*) from ${dbTableName}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + if (rowCount[0][0] >= expectedRowCount) { + break + } + retry++ + } + } + + def group_commit_insert = { sql, expected_row_count -> + def stmt = prepareStatement """ ${sql} """ + def result = stmt.executeUpdate() + logger.info("insert result: " + result) + def serverInfo = (((StatementImpl) stmt).results).getServerInfo() + logger.info("result server info: " + serverInfo) + if (result != expected_row_count) { + logger.warn("insert result: " + result + ", expected_row_count: " + expected_row_count + ", sql: " + sql) + } + // assertEquals(result, expected_row_count) + assertTrue(serverInfo.contains("'status':'PREPARE'")) + assertTrue(serverInfo.contains("'label':'group_commit_")) + } + + def checkStreamLoadResult = { exception, result, total_rows, loaded_rows, filtered_rows, unselected_rows -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertTrue(json.GroupCommit) + assertTrue(json.Label.startsWith("group_commit_")) + assertEquals(total_rows, json.NumberTotalRows) + assertEquals(loaded_rows, json.NumberLoadedRows) + assertEquals(filtered_rows, json.NumberFilteredRows) + assertEquals(unselected_rows, json.NumberUnselectedRows) + if (filtered_rows > 0) { + assertFalse(json.ErrorURL.isEmpty()) + } else { + assertTrue(json.ErrorURL == null || json.ErrorURL.isEmpty()) + } + } + + // 1. table without sequence column + try { + tableName = "insert_group_commit_into_unique" + "1" + dbTableName = dbName + "." + tableName + // create table + sql """ drop table if exists ${dbTableName}; """ + + sql """ + CREATE TABLE ${dbTableName} ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + // 1. insert into + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql """ set enable_insert_group_commit = true; """ + // TODO + sql """ set enable_nereids_dml = false; """ + + group_commit_insert """ insert into ${dbTableName} values (1, 'a', 10),(5, 'q', 50); """, 2 + group_commit_insert """ insert into ${dbTableName}(id) select 6; """, 1 + group_commit_insert """ insert into ${dbTableName}(id) values(4); """, 1 + group_commit_insert """ insert into ${dbTableName}(name, id) values('c', 3); """, 1 + group_commit_insert """ insert into ${dbTableName}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_DELETE_SIGN__) values(1, 'a', 10, 1) """, 1 + + /*getRowCount(5) + qt_sql """ select * from ${dbTableName} order by id, name, score asc; """*/ + } + + // 2. stream load + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'group_commit', 'true' + set 'columns', 'id, name, score' + file "test_group_commit_1.csv" + unset 'label' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 4, 4, 0, 0) + } + } + /*getRowCount(9) + qt_sql """ select * from ${dbTableName} order by id, name, score asc; """*/ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'group_commit', 'true' + set 'columns', 'id, name, score, __DORIS_DELETE_SIGN__' + file "test_group_commit_2.csv" + unset 'label' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 4, 4, 0, 0) + } + } + getRowCount(9) + sql """ set show_hidden_columns = true """ + qt_sql """ select id, name, score, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + sql """ set show_hidden_columns = false """ + qt_sql """ select id, name, score, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + } finally { + // try_sql("DROP TABLE ${dbTableName}") + } + + // 2. table with "function_column.sequence_col" + try { + tableName = "insert_group_commit_into_unique" + "2" + dbTableName = dbName + "." + tableName + // create table + sql """ drop table if exists ${dbTableName}; """ + + sql """ + CREATE TABLE ${dbTableName} ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "function_column.sequence_col" = "score" + ); + """ + + // 1. insert into + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql """ set enable_insert_group_commit = true; """ + // TODO + sql """ set enable_nereids_dml = false; """ + + group_commit_insert """ insert into ${dbTableName} values (1, 'a', 10),(5, 'q', 50); """, 2 + group_commit_insert """ insert into ${dbTableName}(id, score) select 6, 60; """, 1 + group_commit_insert """ insert into ${dbTableName}(id, score) values(4, 70); """, 1 + group_commit_insert """ insert into ${dbTableName}(name, id, score) values('c', 3, 30); """, 1 + group_commit_insert """ insert into ${dbTableName}(score, id, name) values(30, 2, 'b'); """, 1 + group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_DELETE_SIGN__) values(1, 'a', 10, 1) """, 1 + + /*getRowCount(5) + qt_sql """ select * from ${dbTableName} order by id, name, score asc; """*/ + }; + + // 2. stream load + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'group_commit', 'true' + set 'columns', 'id, name, score' + file "test_group_commit_1.csv" + unset 'label' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 4, 4, 0, 0) + } + } + /*getRowCount(9) + qt_sql """ select * from ${dbTableName} order by id, name, score asc; """*/ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'group_commit', 'true' + set 'columns', 'id, name, score, __DORIS_DELETE_SIGN__' + file "test_group_commit_2.csv" + unset 'label' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 4, 4, 0, 0) + } + } + getRowCount(9) + sql """ set show_hidden_columns = true """ + qt_sql """ select id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + sql """ set show_hidden_columns = false """ + qt_sql """ select id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + } finally { + // try_sql("DROP TABLE ${dbTableName}") + sql """ set show_hidden_columns = false """ + } + + // 3. table with "function_column.sequence_type" + try { + tableName = "insert_group_commit_into_unique" + "3" + dbTableName = dbName + "." + tableName + // create table + sql """ drop table if exists ${dbTableName}; """ + + sql """ + CREATE TABLE ${dbTableName} ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "function_column.sequence_type" = "int" + ); + """ + + // 1. insert into + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql """ set enable_insert_group_commit = true; """ + // TODO + sql """ set enable_nereids_dml = false; """ + + group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_SEQUENCE_COL__) values (1, 'a', 10, 100),(5, 'q', 50, 500); """, 2 + group_commit_insert """ insert into ${dbTableName}(id, score, __DORIS_SEQUENCE_COL__) select 6, 60, 600; """, 1 + group_commit_insert """ insert into ${dbTableName}(id, score, __DORIS_SEQUENCE_COL__) values(6, 50, 500); """, 1 + group_commit_insert """ insert into ${dbTableName}(name, id, score, __DORIS_SEQUENCE_COL__) values('c', 3, 30, 300); """, 1 + group_commit_insert """ insert into ${dbTableName}(score, id, name, __DORIS_SEQUENCE_COL__) values(30, 2, 'b', 200); """, 1 + group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_DELETE_SIGN__, __DORIS_SEQUENCE_COL__) values(1, 'a', 200, 1, 200) """, 1 + group_commit_insert """ insert into ${dbTableName}(score, id, name, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__) values(30, 2, 'b', 100, 1); """, 1 + + /*getRowCount(4) + qt_sql """ select * from ${dbTableName} order by id, name, score asc; """*/ + }; + + // 2. stream load + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'group_commit', 'true' + set 'columns', 'id, name, score, __DORIS_SEQUENCE_COL__' + set 'function_column.sequence_col', '__DORIS_SEQUENCE_COL__' + file "test_group_commit_3.csv" + unset 'label' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 4, 4, 0, 0) + } + } + /*getRowCount(9) + qt_sql """ select * from ${dbTableName} order by id, name, score asc; """*/ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'group_commit', 'true' + set 'columns', 'id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__' + set 'function_column.sequence_col', '__DORIS_SEQUENCE_COL__' + file "test_group_commit_4.csv" + unset 'label' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 4, 4, 0, 0) + } + } + getRowCount(7) + sql """ set show_hidden_columns = true """ + qt_sql """ select id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + sql """ set show_hidden_columns = false """ + qt_sql """ select id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + } finally { + // try_sql("DROP TABLE ${dbTableName}") + sql """ set show_hidden_columns = false """ + } +}