diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index 122cbab49e..f4a0953949 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -85,11 +85,10 @@ public class LoadAction extends RestBaseController { @PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) { boolean groupCommit = false; String groupCommitStr = request.getHeader("group_commit"); - if (groupCommitStr != null && groupCommitStr.equals("true")) { + if (groupCommitStr != null && groupCommitStr.equals("async_mode")) { groupCommit = true; try { String[] pair = new String[] {db, table}; - LOG.info(pair[0] + ":" + pair[1]); if (isGroupCommitBlock(pair)) { String msg = "insert table " + pair[1] + " is blocked on schema change"; return new RestBaseResult(msg); @@ -128,7 +127,7 @@ public class LoadAction extends RestBaseController { LOG.info("streaming load sql={}", sql); boolean groupCommit = false; String groupCommitStr = request.getHeader("group_commit"); - if (groupCommitStr != null && groupCommitStr.equals("true")) { + if (groupCommitStr != null && groupCommitStr.equals("async_mode")) { groupCommit = true; try { String[] pair = parseDbAndTb(sql); diff --git a/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_schema_change.out b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_schema_change.out index 1be3e26c88..2abf38b9be 100644 --- a/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_schema_change.out +++ b/regression-test/data/insert_p2/test_group_commit_http_stream_lineitem_schema_change.out @@ -11,6 +11,3 @@ -- !sql -- 6001215 --- !sql -- -6001215 - diff --git a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.out b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.out index ef23823e23..da7aa0f145 100644 --- a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.out +++ b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.out @@ -1,4 +1,4 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !sql -- -6001215 +600572 diff --git a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.out b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.out index 86534c1750..71cdc122f3 100644 --- a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.out +++ b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.out @@ -1,31 +1,34 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !sql -- -600572 +60000 -- !sql -- -600659 +60000 -- !sql -- -599397 +60000 -- !sql -- -600124 +60000 -- !sql -- -599647 +60000 -- !sql -- -599931 +60000 -- !sql -- -601365 +60000 -- !sql -- -599301 +60000 -- !sql -- -600504 +60000 -- !sql -- -599715 +60000 + +-- !sql -- +572 diff --git a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_normal.out b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_normal.out index ef23823e23..da7aa0f145 100644 --- a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_normal.out +++ b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_normal.out @@ -1,4 +1,4 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !sql -- -6001215 +600572 diff --git a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.out b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.out index 4f53091941..f2cd979eb4 100644 --- a/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.out +++ b/regression-test/data/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.out @@ -1,16 +1,16 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !sql -- -3000816 +180000 -- !sql -- -6001215 +300000 -- !sql -- -4673070 +240065 -- !sql -- -6001215 +300000 -- !sql -- -6001215 +300000 diff --git a/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_schema_change.out b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_schema_change.out index 1be3e26c88..50a4e09f4e 100644 --- a/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_schema_change.out +++ b/regression-test/data/insert_p2/test_group_commit_stream_load_lineitem_schema_change.out @@ -12,5 +12,5 @@ 6001215 -- !sql -- -6001215 +12002430 diff --git a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy index d1d4e476b4..44d9a0ff94 100644 --- a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_normal.groovy @@ -85,6 +85,7 @@ PROPERTIES ( def process = { int total = 0; for (int k = 0; k < 3; k++) { + logger.info("round:" + k) for (int i = 1; i <= 10; i++) { streamLoad { set 'version', '1' diff --git a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy index 8a979bf6f3..dd0d40e645 100644 --- a/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_http_stream_lineitem_schema_change.groovy @@ -151,22 +151,34 @@ PROPERTIES ( } def insert_data = { i, table_name -> - streamLoad { - set 'version', '1' - set 'sql', """ - insert into ${db}.${stream_load_table}(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, + int j = 0; + while (true) { + if (j >= 18) { + throw new Exception("""fail to much time""") + } + try { + streamLoad { + set 'version', '1' + set 'sql', """ + insert into ${db}.${table_name}(l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct, l_shipmode,l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13, c14, c15, c16 from http_stream ("format"="csv", "column_separator"="|") """ - set 'group_commit', 'async_mode' - file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i - unset 'label' + set 'group_commit', 'async_mode' + file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i + unset 'label' - check { result, exception, startTime, endTime -> - checkStreamLoadResult(exception, result, rowCountArray[i - 1], rowCountArray[i - 1], 0, 0) + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, rowCountArray[i - 1], rowCountArray[i - 1], 0, 0) + } + } + break + } catch (Exception e) { + Thread.sleep(10000) } + j++; } total += rowCountArray[i - 1]; } @@ -328,22 +340,25 @@ l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c14, c15, c def change_order = { table_name -> create_stream_load_table(table_name) total = 0; - for (int i = 1; i <= 10; i++) { - logger.info("process file:" + i) - if (i == 2) { - def retry = 0 - while (retry < 10) { - try { - sql """ alter table ${table_name} order by (l_orderkey,l_shipdate,l_linenumber, l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment); """ - break - } catch (Exception e) { - log.info("exception:", e) + for (int k = 0; k < 2; k++) { + logger.info("round:" + k) + for (int i = 1; i <= 10; i++) { + logger.info("process file:" + i) + if (k == 0 && i == 2) { + def retry = 0 + while (retry < 10) { + try { + sql """ alter table ${table_name} order by (l_orderkey,l_shipdate,l_linenumber, l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment); """ + break + } catch (Exception e) { + log.info("exception:", e) + } + Thread.sleep(2000) + retry++ } - Thread.sleep(2000) - retry++ } + insert_data(i, table_name) } - insert_data(i, table_name) } logger.info("process change order total:" + total) assertTrue(getAlterTableState(table_name), "modify column order should success") diff --git a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy index 3a094c17ce..e53299a001 100644 --- a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_client.groovy @@ -35,22 +35,22 @@ String[] getFiles(String dirName, int num) { suite("test_group_commit_insert_into_lineitem_multiple_client") { String[] file_array; def prepare = { - def dataDir = "${context.config.cacheDataPath}/insert_into_lineitem_multiple_client/" + def dataDir = "${context.config.cacheDataPath}/insert_into_lineitem_multiple_client" File dir = new File(dataDir) if (!dir.exists()) { - new File("${context.config.cacheDataPath}/insert_into_lineitem_multiple_client/").mkdir() - for (int i = 1; i <= 10; i++) { - logger.info("download lineitem.tbl.${i}") - def download_file = """/usr/bin/curl ${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i} ---output ${context.config.cacheDataPath}/insert_into_lineitem_multiple_client/lineitem.tbl.${i}""".execute().getText() - } + new File(dataDir).mkdir() + logger.info("download lineitem") + def download_file = """/usr/bin/curl ${getS3Url()}/regression/tpch/sf1/lineitem.tbl.1 +--output ${dataDir}/lineitem.tbl.1""".execute().getText() + def split_file = """split -l 60000 ${dataDir}/lineitem.tbl.1 ${dataDir}/""".execute().getText() + def rm_file = """rm ${dataDir}/lineitem.tbl.1""".execute().getText() } - file_array = getFiles(dataDir, 10) + file_array = getFiles(dataDir, 11) for (String s : file_array) { logger.info(s) } } - def insert_table = "test_insert_into_lineitem_multiple_client_sf1" + def insert_table = "test_insert_into_lineitem_multiple_client" def batch = 100; def total = 0; def rwLock = new ReentrantReadWriteLock(); @@ -107,7 +107,24 @@ PROPERTIES ( sql """ set enable_nereids_dml = false; """ } - def do_insert_into = { file_name -> + def do_insert_into = { exp_str, num -> + def i = 0; + while (true) { + try { + def result = insert_into_sql(exp_str, num); + logger.info("result:" + result); + break + } catch (Exception e) { + logger.info("got exception:" + e) + } + i++; + if (i >= 30) { + throw new Exception("""fail to much time""") + } + } + } + + def insert_file = { file_name -> logger.info("file:" + file_name) sql """ set group_commit = async_mode; """ sql """ set enable_nereids_dml = false; """ @@ -128,15 +145,7 @@ PROPERTIES ( if (c == batch) { sb.append(";"); String exp = sb.toString(); - while (true) { - try { - def result = insert_into_sql(exp, c); - logger.info("result:" + result); - break - } catch (Exception e) { - logger.info("got exception:" + e) - } - } + do_insert_into(exp, c); c = 0; } s = reader.readLine(); @@ -162,15 +171,7 @@ PROPERTIES ( } else if (c > 0) { sb.append(";"); String exp = sb.toString(); - while (true) { - try { - def result = insert_into_sql(exp, c); - logger.info("result:" + result); - break - } catch (Exception e) { - logger.info("got exception:" + e) - } - } + do_insert_into(exp, c); break; } else { break; @@ -198,7 +199,7 @@ PROPERTIES ( String file_name = file_array[n] logger.info("insert into file:" + file_name) threads.add(Thread.startDaemon { - do_insert_into(file_name) + insert_file(file_name) }) } for (Thread th in threads) { diff --git a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy index eef2d2bc3e..3241b44b19 100644 --- a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_multiple_table.groovy @@ -35,22 +35,22 @@ String[] getFiles(String dirName, int num) { suite("test_group_commit_insert_into_lineitem_multiple_table") { String[] file_array; def prepare = { - def dataDir = "${context.config.cacheDataPath}/insert_into_lineitem_multiple_table/" + def dataDir = "${context.config.cacheDataPath}/insert_into_lineitem_multiple_table" File dir = new File(dataDir) if (!dir.exists()) { - new File("${context.config.cacheDataPath}/insert_into_lineitem_multiple_table/").mkdir() - for (int i = 1; i <= 10; i++) { - logger.info("download lineitem.tbl.${i}") - def download_file = """/usr/bin/curl ${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i} ---output ${context.config.cacheDataPath}/insert_into_lineitem_multiple_table/lineitem.tbl.${i}""".execute().getText() - } + new File(dataDir).mkdir() + logger.info("download lineitem") + def download_file = """/usr/bin/curl ${getS3Url()}/regression/tpch/sf1/lineitem.tbl.1 +--output ${dataDir}/lineitem.tbl.1""".execute().getText() + def split_file = """split -l 60000 ${dataDir}/lineitem.tbl.1 ${dataDir}/""".execute().getText() + def rm_file = """rm ${dataDir}/lineitem.tbl.1""".execute().getText() } - file_array = getFiles(dataDir, 10) + file_array = getFiles(dataDir, 11) for (String s : file_array) { logger.info(s) } } - def insert_table_base = "test_insert_into_lineitem_multiple_table_sf1" + def insert_table_base = "test_insert_into_lineitem_multiple_table" def batch = 100; def total = 0; def rwLock = new ReentrantReadWriteLock(); @@ -107,7 +107,24 @@ PROPERTIES ( sql """ set enable_nereids_dml = false; """ } - def do_insert_into = { file_name, table_name -> + def do_insert_into = { exp_str, num -> + def i = 0; + while (true) { + try { + def result = insert_into_sql(exp_str, num); + logger.info("result:" + result); + break + } catch (Exception e) { + logger.info("got exception:" + e) + } + i++; + if (i >= 30) { + throw new Exception("""fail to much time""") + } + } + } + + def insert_file = { file_name, table_name -> sql """ set group_commit = async_mode; """ sql """ set enable_nereids_dml = false; """ logger.info("file:" + file_name) @@ -128,15 +145,7 @@ PROPERTIES ( if (c == batch) { sb.append(";"); String exp = sb.toString(); - while (true) { - try { - def result = insert_into_sql(exp, c); - logger.info("result:" + result); - break - } catch (Exception e) { - logger.info("got exception:" + e) - } - } + do_insert_into(exp, c); c = 0; } s = reader.readLine(); @@ -162,15 +171,7 @@ PROPERTIES ( } else if (c > 0) { sb.append(";"); String exp = sb.toString(); - while (true) { - try { - def result = insert_into_sql(exp, c); - logger.info("result:" + result); - break - } catch (Exception e) { - logger.info("got exception:" + e) - } - } + do_insert_into(exp, c); break; } else { break; @@ -201,7 +202,7 @@ PROPERTIES ( create_insert_table(table_name) logger.info("insert into file:" + file_name) threads.add(Thread.startDaemon { - do_insert_into(file_name, table_name) + insert_file(file_name, table_name) }) } for (Thread th in threads) { diff --git a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy index 9fee850899..a47784e7d5 100644 --- a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_normal.groovy @@ -31,22 +31,22 @@ String[] getFiles(String dirName, int num) { suite("test_group_commit_insert_into_lineitem_normal") { String[] file_array; def prepare = { - def dataDir = "${context.config.cacheDataPath}/insert_into_lineitem_normal/" + def dataDir = "${context.config.cacheDataPath}/insert_into_lineitem_normal" File dir = new File(dataDir) if (!dir.exists()) { - new File("${context.config.cacheDataPath}/insert_into_lineitem_normal/").mkdir() - for (int i = 1; i <= 10; i++) { - logger.info("download lineitem.tbl.${i}") - def download_file = """/usr/bin/curl ${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i} ---output ${context.config.cacheDataPath}/insert_into_lineitem_normal/lineitem.tbl.${i}""".execute().getText() - } + new File(dataDir).mkdir() + logger.info("download lineitem") + def download_file = """/usr/bin/curl ${getS3Url()}/regression/tpch/sf1/lineitem.tbl.1 +--output ${dataDir}/lineitem.tbl.1""".execute().getText() + def split_file = """split -l 60000 ${dataDir}/lineitem.tbl.1 ${dataDir}/""".execute().getText() + def rm_file = """rm ${dataDir}/lineitem.tbl.1""".execute().getText() } - file_array = getFiles(dataDir, 10) + file_array = getFiles(dataDir, 11) for (String s : file_array) { logger.info(s) } } - def insert_table = "test_insert_into_lineitem_sf1" + def insert_table = "test_insert_into_lineitem_normal" def batch = 100; def count = 0; def total = 0; @@ -102,6 +102,23 @@ PROPERTIES ( sql """ set enable_nereids_dml = false; """ } + def do_insert_into = { exp_str, num -> + def i = 0; + while (true) { + try { + def result = insert_into_sql(exp_str, num); + logger.info("result:" + result); + break + } catch (Exception e) { + logger.info("got exception:" + e) + } + i++; + if (i >= 30) { + throw new Exception("""fail to much time""") + } + } + } + def process = { for (String file : file_array) { logger.info("insert into file: " + file) @@ -120,15 +137,7 @@ PROPERTIES ( if (count == batch) { sb.append(";"); String exp = sb.toString(); - while (true) { - try { - def result = insert_into_sql(exp, count); - logger.info("result:" + result); - break - } catch (Exception e) { - logger.info("got exception:" + e) - } - } + do_insert_into(exp, count); count = 0; } s = reader.readLine(); @@ -154,15 +163,7 @@ PROPERTIES ( } else if (count > 0) { sb.append(";"); String exp = sb.toString(); - while (true) { - try { - def result = insert_into_sql(exp, count); - logger.info("result:" + result); - break - } catch (Exception e) { - logger.info("got exception:" + e) - } - } + do_insert_into(exp, count); break; } else { break; diff --git a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy index 917cadf31b..1c77dcbeb5 100644 --- a/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_insert_into_lineitem_scheme_change.groovy @@ -52,33 +52,37 @@ String[] getFiles(String dirName, int num) { if (num != datas.length) { throw new Exception("num not equals,expect:" + num + " vs real:" + datas.length) } - String[] array = new String[datas.length]; + String[] tmp_array = new String[datas.length]; for (int i = 0; i < datas.length; i++) { - array[i] = datas[i].getPath(); + tmp_array[i] = datas[i].getPath(); + } + Arrays.sort(tmp_array); + String[] array = new String[5]; + for (int i = 0; i < 5; i++) { + array[i] = tmp_array[i]; } - Arrays.sort(array); return array; } suite("test_group_commit_insert_into_lineitem_scheme_change") { String[] file_array; def prepare = { - def dataDir = "${context.config.cacheDataPath}/insert_into_lineitem_scheme_change/" + def dataDir = "${context.config.cacheDataPath}/insert_into_lineitem_scheme_change" File dir = new File(dataDir) if (!dir.exists()) { - new File("${context.config.cacheDataPath}/insert_into_lineitem_scheme_change/").mkdir() - for (int i = 1; i <= 10; i++) { - logger.info("download lineitem.tbl.${i}") - def download_file = """/usr/bin/curl ${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i} ---output ${context.config.cacheDataPath}/insert_into_lineitem_scheme_change/lineitem.tbl.${i}""".execute().getText() - } + new File(dataDir).mkdir() + logger.info("download lineitem") + def download_file = """/usr/bin/curl ${getS3Url()}/regression/tpch/sf1/lineitem.tbl.1 +--output ${dataDir}/lineitem.tbl.1""".execute().getText() + def split_file = """split -l 60000 ${dataDir}/lineitem.tbl.1 ${dataDir}/""".execute().getText() + def rm_file = """rm ${dataDir}/lineitem.tbl.1""".execute().getText() } - file_array = getFiles(dataDir, 10) + file_array = getFiles(dataDir, 11) for (String s : file_array) { logger.info(s) } } - def insert_table = "test_lineitem_scheme_change_sf1" + def insert_table = "test_lineitem_scheme_change" def batch = 100; def count = 0; def total = 0; @@ -168,6 +172,27 @@ PROPERTIES ( } + def do_insert_into = { exp_str, num -> + def i = 0; + while (true) { + try { + def result = insert_into_sql(exp_str, num); + logger.info("result:" + result); + break + } catch (Exception e) { + logger.info("msg:" + e.getMessage()) + logger.info("got exception:" + e) + if (e.getMessage().contains("is blocked on schema change")) { + Thread.sleep(10000) + } + } + i++; + if (i >= 30) { + throw new Exception("""fail to much time""") + } + } + } + def insert_data = { file_name, table_name, index -> BufferedReader reader; try { @@ -184,15 +209,7 @@ PROPERTIES ( if (count == batch) { sb.append(";"); String exp = sb.toString(); - while (true) { - try { - def result = insert_into_sql(exp, count); - logger.info("result:" + result); - break - } catch (Exception e) { - logger.info("got exception:" + e) - } - } + do_insert_into(exp, count) count = 0; } s = reader.readLine(); @@ -229,15 +246,7 @@ PROPERTIES ( } else if (count > 0) { sb.append(";"); String exp = sb.toString(); - while (true) { - try { - def result = insert_into_sql(exp, count); - logger.info("result:" + result); - break - } catch (Exception e) { - logger.info("got exception:" + e) - } - } + do_insert_into(exp, count) break; } else { break; @@ -275,7 +284,7 @@ PROPERTIES ( for (int i = 0; i < file_array.length; i++) { String fileName = file_array[i] logger.info("process file:" + fileName) - if (i == 5) { + if (i == (int) (file_array.length / 2)) { getRowCount(total, table_name) def retry = 0 while (retry < 10) { @@ -303,13 +312,13 @@ PROPERTIES ( for (int i = 0; i < file_array.length; i++) { String fileName = file_array[i] logger.info("process file:" + fileName) - if (i == 5) { + if (i == (int) (file_array.length / 2)) { def retry = 0 while (retry < 10) { try { - def rowCount = sql """select count(*) from ${table_name} where l_orderkey >=1000000 and l_orderkey <=5000000;""" + def rowCount = sql """select count(*) from ${table_name} where l_orderkey >=10000;""" log.info("rowCount:" + rowCount[0][0]) - sql """ delete from ${table_name} where l_orderkey >=1000000 and l_orderkey <=5000000; """ + sql """ delete from ${table_name} where l_orderkey >=10000; """ total -= rowCount[0][0] break } catch (Exception e) { @@ -332,7 +341,7 @@ PROPERTIES ( for (int i = 0; i < file_array.length; i++) { String fileName = file_array[i] logger.info("process file:" + fileName) - if (i == 5) { + if (i == (int) (file_array.length / 2)) { def retry = 0 while (retry < 10) { try { @@ -345,7 +354,7 @@ PROPERTIES ( Thread.sleep(2000) } } - if (i < 5) { + if (i < (int) (file_array.length / 2)) { insert_data(fileName, table_name, STATE.NORMAL.value) } else { insert_data(fileName, table_name, STATE.DROP_COLUMN.value) @@ -363,7 +372,7 @@ PROPERTIES ( for (int i = 0; i < file_array.length; i++) { String fileName = file_array[i] logger.info("process file:" + fileName) - if (i == 5) { + if (i == (int) (file_array.length / 2)) { def retry = 0 while (retry < 10) { try { @@ -394,7 +403,7 @@ PROPERTIES ( for (int i = 0; i < file_array.length; i++) { String fileName = file_array[i] logger.info("process file:" + fileName) - if (i == 5) { + if (i == (int) (file_array.length / 2)) { def retry = 0 while (retry < 10) { try { @@ -417,7 +426,7 @@ PROPERTIES ( def process = { table_name -> - for (int i = 1; i <= 4; i++) { + for (int i = 1; i <= 5; i++) { switch (i) { case SC.TRUNCATE_TABLE.value: truncate(table_name) diff --git a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy index 5f3d86116e..83784555ca 100644 --- a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_normal.groovy @@ -86,6 +86,7 @@ l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipin def process = { int total = 0; for (int k = 0; k < 3; k++) { + logger.info("round:" + k) for (int i = 1; i <= 10; i++) { streamLoad { table stream_load_table diff --git a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy index d7cf67d542..a3e362ea32 100644 --- a/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy +++ b/regression-test/suites/insert_p2/test_group_commit_stream_load_lineitem_schema_change.groovy @@ -152,18 +152,30 @@ PROPERTIES ( } def insert_data = { i, table_name -> - streamLoad { - table table_name - - set 'column_separator', '|' - set 'columns', columns + ",lo_dummy" - set 'group_commit', 'async_mode' - unset 'label' - file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i - - check { result, exception, startTime, endTime -> - checkStreamLoadResult(exception, result, rowCountArray[i - 1], rowCountArray[i - 1], 0, 0) + int j = 0; + while (true) { + if (j >= 18) { + throw new Exception("""fail to much time""") } + try { + streamLoad { + table table_name + + set 'column_separator', '|' + set 'columns', columns + ",lo_dummy" + set 'group_commit', 'async_mode' + unset 'label' + file """${getS3Url()}/regression/tpch/sf1/lineitem.tbl.""" + i + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, rowCountArray[i - 1], rowCountArray[i - 1], 0, 0) + } + } + break; + } catch (Exception e) { + Thread.sleep(10000) + } + j++; } total += rowCountArray[i - 1]; } @@ -296,22 +308,25 @@ PROPERTIES ( def change_order = { table_name -> create_stream_load_table(table_name) total = 0; - for (int i = 1; i <= 10; i++) { - logger.info("process file:" + i) - if (i == 2) { - def retry = 0 - while (retry < 10) { - try { - sql """ alter table ${table_name} order by (l_orderkey,l_shipdate,l_linenumber, l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment); """ - break - } catch (Exception e) { - log.info("exception:", e) + for (int k = 0; k < 2; k++) { + logger.info("round:" + k) + for (int i = 1; i <= 10; i++) { + logger.info("process file:" + i) + if (k == 0 && i == 2) { + def retry = 0 + while (retry < 10) { + try { + sql """ alter table ${table_name} order by (l_orderkey,l_shipdate,l_linenumber, l_partkey,l_suppkey,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment); """ + break + } catch (Exception e) { + log.info("exception:", e) + } + Thread.sleep(2000) + retry++ } - Thread.sleep(2000) - retry++ } + insert_data(i, table_name) } - insert_data(i, table_name) } logger.info("process change order total:" + total) assertTrue(getAlterTableState(table_name), "modify column order should success") @@ -321,7 +336,7 @@ PROPERTIES ( def process = { table_name -> - for (int i = 1; i <= 4; i++) { + for (int i = 1; i <= 5; i++) { switch (i) { case SC.TRUNCATE_TABLE.value: truncate(table_name)