From dcd6844ea5295aed41f3d1cce4d0b69de56cf7b0 Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Thu, 27 Jul 2023 09:51:42 +0800 Subject: [PATCH] [improvement](regression-test) add partial update with schema change case (#22213) --- .../partial_update/schema_change/load.csv | 1 + .../partial_update/schema_change/load1.csv | 1 + .../load_with_change_properties.csv | 1 + .../schema_change/load_with_create_index.csv | 1 + .../schema_change/load_with_delete_column.csv | 1 + .../schema_change/load_with_key_column.csv | 1 + .../schema_change/load_with_new_column.csv | 1 + .../schema_change/load_with_update_column.csv | 1 + .../load_without_delete_column.csv | 1 + .../schema_change/load_without_new_column.csv | 1 + .../test_partial_update_schema_change.out | 78 +- .../test_partial_update_schema_change.groovy | 1073 ++++++++++++++++- 12 files changed, 1136 insertions(+), 25 deletions(-) create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/schema_change/load.csv create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/schema_change/load1.csv create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_change_properties.csv create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_create_index.csv create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_delete_column.csv create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_key_column.csv create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_new_column.csv create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_update_column.csv create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_without_delete_column.csv create mode 100644 regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_without_new_column.csv diff --git a/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load.csv b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load.csv new file mode 100644 index 0000000000..fce38cb485 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load.csv @@ -0,0 +1 @@ +1, 0, 0, 0, 0, 0, 0, 0, 0, 0 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load1.csv b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load1.csv new file mode 100644 index 0000000000..56a6051ca2 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load1.csv @@ -0,0 +1 @@ +1 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_change_properties.csv b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_change_properties.csv new file mode 100644 index 0000000000..a0f89729d2 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_change_properties.csv @@ -0,0 +1 @@ +1, 1, 1 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_create_index.csv b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_create_index.csv new file mode 100644 index 0000000000..a0f89729d2 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_create_index.csv @@ -0,0 +1 @@ +1, 1, 1 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_delete_column.csv b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_delete_column.csv new file mode 100644 index 0000000000..a0f89729d2 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_delete_column.csv @@ -0,0 +1 @@ +1, 1, 1 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_key_column.csv b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_key_column.csv new file mode 100644 index 0000000000..128ecf3838 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_key_column.csv @@ -0,0 +1 @@ +1, 1 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_new_column.csv b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_new_column.csv new file mode 100644 index 0000000000..d1afec6173 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_new_column.csv @@ -0,0 +1 @@ +1, 1, 1, 10 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_update_column.csv b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_update_column.csv new file mode 100644 index 0000000000..de674c6356 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_with_update_column.csv @@ -0,0 +1 @@ +1, 1, 1.0 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_without_delete_column.csv b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_without_delete_column.csv new file mode 100644 index 0000000000..a0f89729d2 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_without_delete_column.csv @@ -0,0 +1 @@ +1, 1, 1 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_without_new_column.csv b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_without_new_column.csv new file mode 100644 index 0000000000..a0f89729d2 --- /dev/null +++ b/regression-test/data/unique_with_mow_p0/partial_update/schema_change/load_without_new_column.csv @@ -0,0 +1 @@ +1, 1, 1 \ No newline at end of file diff --git a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_schema_change.out b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_schema_change.out index 215ce7c181..cd83800d0c 100644 --- a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_schema_change.out +++ b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_schema_change.out @@ -1,13 +1,73 @@ -- This file is automatically generated. You should know what you did if you want to edit this --- !sql -- -1 1 1 -2 2 2 +-- !sql1 -- +1 0 0 0 0 0 0 0 0 0 --- !sql -- -1 3 1 0 -2 2 2 0 +-- !sql2 -- +1 1 1 0 0 0 0 0 0 0 0 --- !sql -- -1 4 1 -2 2 2 +-- !sql3 -- +1 1 1 0 0 0 0 0 0 0 10 + +-- !sql4 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql5 -- +1 1 1 0 0 0 0 0 0 + +-- !sql6 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql7 -- +1 1 1.0 0 0 0 0 0 0 0 + +-- !sql8 -- +1 + +-- !sql10 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql11 -- +1 1 1 0 0 0 0 0 0 0 + +-- !sql12 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql13 -- +1 1 1 0 0 0 0 0 0 0 + +-- !sql14 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql15 -- +1 1 1 0 0 0 0 0 0 0 0 + +-- !sql16 -- +1 1 1 0 0 0 0 0 0 0 10 + +-- !sql17 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql18 -- +1 1 1 0 0 0 0 0 0 + +-- !sql19 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql20 -- +1 1 1.0 0 0 0 0 0 0 0 + +-- !sql21 -- +1 + +-- !sql23 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql24 -- +1 1 1 0 0 0 0 0 0 0 + +-- !sql25 -- +1 0 0 0 0 0 0 0 0 0 + +-- !sql26 -- +1 1 1 0 0 0 0 0 0 0 diff --git a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy index 596fa204d8..905c622159 100644 --- a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy +++ b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_schema_change.groovy @@ -17,39 +17,1080 @@ // under the License. suite("test_partial_update_schema_change", "p0") { - def tableName = "test_partial_update_schema_change" - - // create table + // test add value column + def tableName = "test_partial_update_light_schema_change_add_column" sql """ DROP TABLE IF EXISTS ${tableName} """ sql """ CREATE TABLE ${tableName} ( - `c0` int NOT NULL, - `c1` int NOT NULL, - `c2` int NOT NULL) + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 PROPERTIES( "replication_num" = "1", "light_schema_change" = "true", "enable_unique_key_merge_on_write" = "true") """ - sql " insert into ${tableName} values(1,1,1) " - sql " insert into ${tableName} values(2,2,2) " + streamLoad { + table "${tableName}" - qt_sql " select * from ${tableName} order by c0 " + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' - sql " ALTER table ${tableName} add column c3 INT DEFAULT '0' " + file 'schema_change/load.csv' + time 10000 // limit inflight 10s - sql " update ${tableName} set c1 = 3 where c0 = 1 " + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql1 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} add column c10 INT DEFAULT '0' " + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + if(res[0][9].toString() == "FINISHED"){ + break; + } + Thread.sleep(500) + } + + // test load data without new column + streamLoad { + table "${tableName}" - qt_sql " select * from ${tableName} order by c0 " + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' - sql " ALTER table ${tableName} drop column c3 " + file 'schema_change/load_without_new_column.csv' + time 10000 // limit inflight 10s - sql " update ${tableName} set c1 = 4 where c0 = 1 " + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + // check data, new column is filled by default value. + qt_sql2 " select * from ${tableName} order by c0 " - qt_sql " select * from ${tableName} order by c0 " + // test load data with new column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2, c10' + + file 'schema_change/load_with_new_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + // check data, new column is filled by given value. + qt_sql3 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test delete value column + tableName = "test_partial_update_light_schema_change_delete_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql4 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} DROP COLUMN c8 " + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + if(res[0][9].toString() == "FINISHED"){ + break; + } + Thread.sleep(500) + } + + // test load data without delete column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_delete_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql5 " select * from ${tableName} order by c0 " + + // test load data with delete column + // todo bug + // streamLoad { + // table "${tableName}" + + // set 'column_separator', ',' + // set 'partial_columns', 'true' + // set 'columns', 'c0, c1, c8' + + // file 'schema_change/load_without_delete_column.csv' + // time 10000 // limit inflight 10s + + // check { result, exception, startTime, endTime -> + // if (exception != null) { + // throw exception + // } + // // check result, which is fail for loading delete column. + // log.info("Stream load result: ${result}".toString()) + // def json = parseJson(result) + // assertEquals("fail", json.Status.toLowerCase()) + // assertEquals(1, json.NumberTotalRows) + // assertEquals(1, json.NumberFilteredRows) + // assertEquals(0, json.NumberUnselectedRows) + // } + // } + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test update value column + tableName = "test_partial_update_light_schema_change_update_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql6 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} MODIFY COLUMN c2 double " + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + if(res[0][9].toString() == "FINISHED"){ + break; + } + Thread.sleep(500) + } + + // test load data with update column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_update_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql7 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test add key column + tableName = "test_partial_update_light_schema_change_add_key_column" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0' + + file 'schema_change/load1.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql8 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} ADD COLUMN c1 int key null " + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + if(res[0][9].toString() == "FINISHED"){ + break; + } + Thread.sleep(500) + } + sql " ALTER table ${tableName} ADD COLUMN c2 int null " + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + if(res[0][9].toString() == "FINISHED"){ + break; + } + Thread.sleep(500) + } + + // test load data with all key column + // todo cause core + // streamLoad { + // table "${tableName}" + + // set 'column_separator', ',' + // set 'partial_columns', 'true' + // set 'columns', 'c0, c1' + + // file 'schema_change/load_with_key_column.csv' + // time 10000 // limit inflight 10s + + // check { result, exception, startTime, endTime -> + // if (exception != null) { + // throw exception + // } + // log.info("Stream load result: ${result}".toString()) + // def json = parseJson(result) + // assertEquals("success", json.Status.toLowerCase()) + // assertEquals(1, json.NumberTotalRows) + // assertEquals(0, json.NumberFilteredRows) + // assertEquals(0, json.NumberUnselectedRows) + // } + // } + // //check data + // qt_sql9 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test create index + tableName = "test_partial_update_light_schema_change_create_index" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql10 " select * from ${tableName} order by c0 " + + sql " CREATE INDEX test ON ${tableName} (c1) USING BITMAP " + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + if(res[0][9].toString() == "FINISHED"){ + break; + } + Thread.sleep(500) + } + + //test load data with create index + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_create_index.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql11 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + // test change properties + tableName = "test_partial_update_light_schema_change_properties" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "light_schema_change" = "true", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql12 " select * from ${tableName} order by c0 " + + sql " ALTER TABLE ${tableName} set ('in_memory' = 'false') " + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_change_properties.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql13 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + // test add value column + tableName = "test_partial_update_schema_change_add_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql14 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} add column c10 INT DEFAULT '0' " + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + if(res[0][9].toString() == "FINISHED"){ + break; + } + Thread.sleep(500) + } + + // test load data without new column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_without_new_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + // check data, new column is filled by default value. + qt_sql15 " select * from ${tableName} order by c0 " + + // test load data with new column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2, c10' + + file 'schema_change/load_with_new_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + // check data, new column is filled by given value. + qt_sql16 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test delete value column + tableName = "test_partial_update_schema_change_delete_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql17 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} DROP COLUMN c8 " + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + if(res[0][9].toString() == "FINISHED"){ + break; + } + Thread.sleep(500) + } + + // test load data without delete column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_delete_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql18 " select * from ${tableName} order by c0 " + + // test load data with delete column + // todo bug + // streamLoad { + // table "${tableName}" + + // set 'column_separator', ',' + // set 'partial_columns', 'true' + // set 'columns', 'c0, c1, c8' + + // file 'schema_change/load_without_delete_column.csv' + // time 10000 // limit inflight 10s + + // check { result, exception, startTime, endTime -> + // if (exception != null) { + // throw exception + // } + // // check result, which is fail for loading delete column. + // log.info("Stream load result: ${result}".toString()) + // def json = parseJson(result) + // assertEquals("fail", json.Status.toLowerCase()) + // assertEquals(1, json.NumberTotalRows) + // assertEquals(1, json.NumberFilteredRows) + // assertEquals(0, json.NumberUnselectedRows) + // } + // } + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test update value column + tableName = "test_partial_update_schema_change_update_column" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql19 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} MODIFY COLUMN c2 double " + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + if(res[0][9].toString() == "FINISHED"){ + break; + } + Thread.sleep(500) + } + + // test load data with update column + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_update_column.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql20 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test add key column + tableName = "test_partial_update_schema_change_add_key_column" + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0' + + file 'schema_change/load1.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql21 " select * from ${tableName} order by c0 " + + // schema change + sql " ALTER table ${tableName} ADD COLUMN c1 int key null " + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + if(res[0][9].toString() == "FINISHED"){ + break; + } + Thread.sleep(500) + } + sql " ALTER table ${tableName} ADD COLUMN c2 int null " + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + if(res[0][9].toString() == "FINISHED"){ + break; + } + Thread.sleep(500) + } + + // test load data with all key column + // todo cause core + // streamLoad { + // table "${tableName}" + + // set 'column_separator', ',' + // set 'partial_columns', 'true' + // set 'columns', 'c0, c1' + + // file 'schema_change/load_with_key_column.csv' + // time 10000 // limit inflight 10s + + // check { result, exception, startTime, endTime -> + // if (exception != null) { + // throw exception + // } + // log.info("Stream load result: ${result}".toString()) + // def json = parseJson(result) + // assertEquals("success", json.Status.toLowerCase()) + // assertEquals(1, json.NumberTotalRows) + // assertEquals(0, json.NumberFilteredRows) + // assertEquals(0, json.NumberUnselectedRows) + // } + // } + // //check data + // qt_sql22 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + + // test create index + tableName = "test_partial_update_schema_change_create_index" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql23 " select * from ${tableName} order by c0 " + + sql " CREATE INDEX test ON ${tableName} (c1) USING BITMAP " + while(true){ + def res = sql " SHOW ALTER TABLE COLUMN WHERE TableName = '${tableName}' ORDER BY CreateTime DESC LIMIT 1 " + if(res[0][9].toString() == "FINISHED"){ + break; + } + Thread.sleep(500) + } + + //test load data with create index + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_create_index.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql24 " select * from ${tableName} order by c0 " + + sql """ DROP TABLE IF EXISTS ${tableName} """ + + // test change properties + tableName = "test_partial_update_schema_change_properties" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE ${tableName} ( + `c0` int NULL, + `c1` int NULL, + `c2` int NULL, + `c3` int NULL, + `c4` int NULL, + `c5` int NULL, + `c6` int NULL, + `c7` int NULL, + `c8` int NULL, + `c9` int NULL) + UNIQUE KEY(`c0`) DISTRIBUTED BY HASH(`c0`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "true") + """ + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'columns', 'c0, c1, c2, c3, c4, c5, c6, c7, c8, c9' + + file 'schema_change/load.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql25 " select * from ${tableName} order by c0 " + + sql " ALTER TABLE ${tableName} set ('in_memory' = 'false') " + + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'partial_columns', 'true' + set 'columns', 'c0, c1, c2' + + file 'schema_change/load_with_change_properties.csv' + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(1, json.NumberTotalRows) + assertEquals(0, json.NumberFilteredRows) + assertEquals(0, json.NumberUnselectedRows) + } + } + qt_sql26 " select * from ${tableName} order by c0 " - // drop table sql """ DROP TABLE IF EXISTS ${tableName} """ }