From a7fa9f290ca8533f5041d6304dbe25d05413cab4 Mon Sep 17 00:00:00 2001 From: zfr95 <87513668+zfr9527@users.noreply.github.com> Date: Thu, 21 Mar 2024 11:10:21 +0800 Subject: [PATCH] [fix](test)change stream load of auto partition (#32544) --- .../stress_test_diff_date_list.groovy | 89 ++++----------- .../auto_partition/diff_data/thread_load_1.sh | 17 --- .../auto_partition/diff_data/thread_load_2.sh | 17 --- .../stress_test_high_concurrency_load.groovy | 97 ++++++---------- .../stress_test_same_date_range.groovy | 87 +++++--------- .../auto_partition/same_data/thread_load_1.sh | 17 --- .../auto_partition/same_data/thread_load_2.sh | 17 --- .../stress_test_two_stream_load.groovy | 106 +++++------------- .../two_stream_load/thread_load_1.sh | 17 --- .../two_stream_load/thread_load_2.sh | 17 --- 10 files changed, 107 insertions(+), 374 deletions(-) delete mode 100644 regression-test/suites/partition_p2/auto_partition/diff_data/thread_load_1.sh delete mode 100644 regression-test/suites/partition_p2/auto_partition/diff_data/thread_load_2.sh delete mode 100644 regression-test/suites/partition_p2/auto_partition/same_data/thread_load_1.sh delete mode 100644 regression-test/suites/partition_p2/auto_partition/same_data/thread_load_2.sh delete mode 100644 regression-test/suites/partition_p2/auto_partition/two_stream_load/thread_load_1.sh delete mode 100644 regression-test/suites/partition_p2/auto_partition/two_stream_load/thread_load_2.sh diff --git a/regression-test/suites/partition_p2/auto_partition/diff_data/stress_test_diff_date_list.groovy b/regression-test/suites/partition_p2/auto_partition/diff_data/stress_test_diff_date_list.groovy index 72549e18d9..fc8df758c7 100644 --- a/regression-test/suites/partition_p2/auto_partition/diff_data/stress_test_diff_date_list.groovy +++ b/regression-test/suites/partition_p2/auto_partition/diff_data/stress_test_diff_date_list.groovy @@ -41,7 +41,7 @@ suite("stress_test_diff_date_list", "p2,nonConcurrent") { def data_count = 2 def cur_rows = 1000000 - // 用doris-dbgen生成数据文件 + // use doris-dbgen product data file def doris_dbgen_create_data = { db_name, tb_name, part_type, i -> def rows = cur_rows // total rows to load def bulkSize = rows @@ -86,63 +86,33 @@ suite("stress_test_diff_date_list", "p2,nonConcurrent") { file.write(content) } - def cm1 - def cm2 - def sql_port_res = sql """show backends;""" - println(sql_port_res) - if (sql_port_res.size < 2) { - assert(false) - } - def be_http_1 = sql_port_res[0][1] - def be_http_2 = sql_port_res[1][1] - def be_port_1 = sql_port_res[0][4] - def be_port_2 = sql_port_res[1][4] - - def doris_dbgen_load_data = { db_name, tb_name, part_type, i -> - def tableName = tb_name - - def jdbcUrl = context.config.jdbcUrl - def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) - def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) - - String realDb = db_name - String user = context.config.jdbcUser - String password = context.config.jdbcPassword - + def doris_dbgen_stream_load_data = { db_name, tb_name, part_type, i -> def list = [] def dir = new File("""${context.file.parent}""" + "/" + part_type + "_" + i) dir.eachFileRecurse (FileType.FILES) { file -> list << file } + logger.info(list[0].toString()) - if (password) { - if (i == 1) { - cm1 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load""" - } else if (i == 2) { - cm2 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load""" - } - } else { - if (i == 1) { - cm1 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load""" - } else if (i == 2) { - cm2 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load""" + streamLoad { + db "${db_name}" + table "${tb_name}" + + set 'column_separator', '|' + + file """${list[0].toString()}""" + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(0, json.NumberFilteredRows) } } - logger.info("command is: " + cm1) - logger.info("command is: " + cm2) - - def load_path_1 = """${context.file.parent}/thread_load_1.sh""" - if (i == 1) { - write_to_file(load_path_1, cm1) - cm1 = "bash " + load_path_1 - } - - def load_path_2 = """${context.file.parent}/thread_load_2.sh""" - if (i == 2) { - write_to_file(load_path_2, cm2) - cm2 = "bash " + load_path_2 - } - } def data_delete = { part_type -> @@ -167,7 +137,6 @@ suite("stress_test_diff_date_list", "p2,nonConcurrent") { dir.eachFileRecurse (FileType.FILES) { file -> list << file } - println(list[0]) File file = list[0] file.append(content) @@ -191,25 +160,11 @@ suite("stress_test_diff_date_list", "p2,nonConcurrent") { append_to_file(add_one_row) doris_dbgen_create_data(database_name, tb_name2, "list", 2) - doris_dbgen_load_data(database_name, tb_name1, "list", 1) - doris_dbgen_load_data(database_name, tb_name1, "list", 2) - def thread3 = Thread.start { - logger.info("load1 start") - def proc = cm1.execute() - def sout = new StringBuilder(), serr = new StringBuilder() - proc.consumeProcessOutput(sout, serr) - proc.waitForOrKill(7200000) - logger.info("std out: " + sout + "std err: " + serr) + doris_dbgen_stream_load_data(database_name, tb_name1, "list", 1) } def thread4 = Thread.start { - sleep(1 * 1000) - logger.info("load2 start") - def proc = cm2.execute() - def sout = new StringBuilder(), serr = new StringBuilder() - proc.consumeProcessOutput(sout, serr) - proc.waitForOrKill(7200000) - logger.info("std out: " + sout + "std err: " + serr) + doris_dbgen_stream_load_data(database_name, tb_name1, "list", 2) } thread3.join() thread4.join() diff --git a/regression-test/suites/partition_p2/auto_partition/diff_data/thread_load_1.sh b/regression-test/suites/partition_p2/auto_partition/diff_data/thread_load_1.sh deleted file mode 100644 index 6edd950cc7..0000000000 --- a/regression-test/suites/partition_p2/auto_partition/diff_data/thread_load_1.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/regression-test/suites/partition_p2/auto_partition/diff_data/thread_load_2.sh b/regression-test/suites/partition_p2/auto_partition/diff_data/thread_load_2.sh deleted file mode 100644 index 6edd950cc7..0000000000 --- a/regression-test/suites/partition_p2/auto_partition/diff_data/thread_load_2.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/regression-test/suites/partition_p2/auto_partition/high_concur_load/stress_test_high_concurrency_load.groovy b/regression-test/suites/partition_p2/auto_partition/high_concur_load/stress_test_high_concurrency_load.groovy index 124088047f..df9b647b20 100644 --- a/regression-test/suites/partition_p2/auto_partition/high_concur_load/stress_test_high_concurrency_load.groovy +++ b/regression-test/suites/partition_p2/auto_partition/high_concur_load/stress_test_high_concurrency_load.groovy @@ -42,7 +42,7 @@ suite("stress_test_high_concurrency_load", "p2,nonConcurrent") { def data_count = 10 def cur_rows = 100000 - // 用doris-dbgen生成数据文件 + // use doris-dbgen product data file def doris_dbgen_create_data = { db_name, tb_name, part_type -> def rows = cur_rows // total rows to load def bulkSize = rows @@ -82,54 +82,32 @@ suite("stress_test_high_concurrency_load", "p2,nonConcurrent") { } } - def write_to_file = { cur_path, content -> - File file = new File(cur_path) - file.write(content) - } - - def cm_list = [] - def doris_dbgen_load_data = { db_name, tb_name, part_type -> - def tableName = tb_name - - def jdbcUrl = context.config.jdbcUrl - def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) - def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) - def sql_port - if (urlWithoutSchema.indexOf("/") >= 0) { - // e.g: jdbc:mysql://locahost:8080/?a=b - sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/")) - } else { - // e.g: jdbc:mysql://locahost:8080 - sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1) + def doris_dbgen_stream_load_data = { db_name, tb_name, part_type, i -> + def list = [] + def dir = new File("""${context.file.parent}""" + "/" + part_type + "/" + part_type + "_" + i) + dir.eachFileRecurse (FileType.FILES) { file -> + list << file } - String feHttpAddress = context.config.feHttpAddress - def http_port = feHttpAddress.substring(feHttpAddress.indexOf(":") + 1) + logger.info(list[0].toString()) - String realDb = db_name - String user = context.config.jdbcUser - String password = context.config.jdbcPassword + streamLoad { + db "${db_name}" + table "${tb_name}" + set 'column_separator', '|' - for (int i = 0; i < data_count; i++) { - def cm = "" - def list = [] - def dir = new File("""${context.file.parent}""" + "/" + part_type + "/" + part_type + "_" + i) - dir.eachFileRecurse (FileType.FILES) { file -> - list << file + file """${list[0].toString()}""" + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(0, json.NumberFilteredRows) } - - if (password) { - cm = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${sql_ip}:${http_port}/api/${realDb}/${tableName}/_stream_load""" - } else { - cm = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${sql_ip}:${http_port}/api/${realDb}/${tableName}/_stream_load""" - } - logger.info("command is: " + cm) - - def load_path = """${context.file.parent}/range/thread_load_${i}.sh""" - write_to_file(load_path, cm) - cm = """bash ${context.file.parent}/range/thread_load_${i}.sh""" - def cm_copy = cm - cm_list = cm_list.toList() + [cm_copy] } } @@ -150,29 +128,18 @@ suite("stress_test_high_concurrency_load", "p2,nonConcurrent") { data_delete("range") doris_dbgen_create_data(database_name, tb_name2, "range") - doris_dbgen_load_data(database_name, tb_name2, "range") def thread_thread_1000 = [] - def concurrent_load = { str -> - logger.info("load1 start:" + str) - logger.info("cm: " + str) - def proc = str.execute() - def sout = new StringBuilder(), serr = new StringBuilder() - proc.consumeProcessOutput(sout, serr) - proc.waitForOrKill(7200000) - logger.info("std out: " + sout + "std err: " + serr) - } - - thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[0])}) - thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[1])}) - thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[2])}) - thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[3])}) - thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[4])}) - thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[5])}) - thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[6])}) - thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[7])}) - thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[8])}) - thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[9])}) + thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 0)}) + thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 1)}) + thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 2)}) + thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 3)}) + thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 4)}) + thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 5)}) + thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 6)}) + thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 7)}) + thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 8)}) + thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 9)}) for (Thread th in thread_thread_1000) { th.join() diff --git a/regression-test/suites/partition_p2/auto_partition/same_data/stress_test_same_date_range.groovy b/regression-test/suites/partition_p2/auto_partition/same_data/stress_test_same_date_range.groovy index e081e3bf09..af53c945f5 100644 --- a/regression-test/suites/partition_p2/auto_partition/same_data/stress_test_same_date_range.groovy +++ b/regression-test/suites/partition_p2/auto_partition/same_data/stress_test_same_date_range.groovy @@ -41,7 +41,7 @@ suite("stress_test_same_date_range", "p2,nonConcurrent") { def data_count = 1 def cur_rows = 100000 - // 用doris-dbgen生成数据文件 + // use doris-dbgen product data file def doris_dbgen_create_data = { db_name, tb_name, part_type -> def rows = cur_rows // total rows to load def bulkSize = rows @@ -81,58 +81,32 @@ suite("stress_test_same_date_range", "p2,nonConcurrent") { } } - def write_to_file = { cur_path, content -> - File file = new File(cur_path) - file.write(content) - } - - def cm1 - def cm2 - def doris_dbgen_load_data = { db_name, tb_name, part_type -> - def tableName = tb_name - - def jdbcUrl = context.config.jdbcUrl - def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) - def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) - def sql_port_res = sql """show backends;""" - println(sql_port_res) - if (sql_port_res.size < 2) { - assert(false) + def doris_dbgen_stream_load_data = { db_name, tb_name, part_type, i -> + def list = [] + def dir = new File("""${context.file.parent}""" + "/" + part_type + "_" + i) + dir.eachFileRecurse (FileType.FILES) { file -> + list << file } - def be_http_1 = sql_port_res[0][1] - def be_http_2 = sql_port_res[1][1] - def be_port_1 = sql_port_res[0][4] - def be_port_2 = sql_port_res[1][4] + logger.info(list[0].toString()) - String realDb = db_name - String user = context.config.jdbcUser - String password = context.config.jdbcPassword + streamLoad { + db "${db_name}" + table "${tb_name}" - for (int i = 1; i <= data_count; i++) { - def list = [] - def dir = new File("""${context.file.parent}""" + "/" + part_type + "_" + i) - dir.eachFileRecurse (FileType.FILES) { file -> - list << file + set 'column_separator', '|' + + file """${list[0].toString()}""" + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(0, json.NumberFilteredRows) } - - if (password) { - cm1 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load""" - cm2 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load""" - } else { - cm1 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load""" - cm2 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load""" - } - logger.info("command is: " + cm1) - logger.info("command is: " + cm2) - - def load_path_1 = """${context.file.parent}/thread_load_1.sh""" - write_to_file(load_path_1, cm1) - cm1 = "bash " + load_path_1 - - def load_path_2 = """${context.file.parent}/thread_load_2.sh""" - write_to_file(load_path_2, cm2) - cm2 = "bash " + load_path_2 - } } @@ -165,23 +139,12 @@ suite("stress_test_same_date_range", "p2,nonConcurrent") { data_delete("range") doris_dbgen_create_data(database_name, tb_name1, "range") - doris_dbgen_load_data(database_name, tb_name2, "range") def thread1 = Thread.start { - logger.info("load1 start") - def proc = cm1.execute() - def sout = new StringBuilder(), serr = new StringBuilder() - proc.consumeProcessOutput(sout, serr) - proc.waitForOrKill(7200000) - logger.info("std out: " + sout + "std err: " + serr) + doris_dbgen_stream_load_data(database_name, tb_name2, "range", 1) } def thread2 = Thread.start { - logger.info("load2 start") - def proc = cm2.execute() - def sout = new StringBuilder(), serr = new StringBuilder() - proc.consumeProcessOutput(sout, serr) - proc.waitForOrKill(7200000) - logger.info("std out: " + sout + "std err: " + serr) + doris_dbgen_stream_load_data(database_name, tb_name2, "range", 1) } thread1.join() thread2.join() diff --git a/regression-test/suites/partition_p2/auto_partition/same_data/thread_load_1.sh b/regression-test/suites/partition_p2/auto_partition/same_data/thread_load_1.sh deleted file mode 100644 index 6edd950cc7..0000000000 --- a/regression-test/suites/partition_p2/auto_partition/same_data/thread_load_1.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/regression-test/suites/partition_p2/auto_partition/same_data/thread_load_2.sh b/regression-test/suites/partition_p2/auto_partition/same_data/thread_load_2.sh deleted file mode 100644 index 6edd950cc7..0000000000 --- a/regression-test/suites/partition_p2/auto_partition/same_data/thread_load_2.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/regression-test/suites/partition_p2/auto_partition/two_stream_load/stress_test_two_stream_load.groovy b/regression-test/suites/partition_p2/auto_partition/two_stream_load/stress_test_two_stream_load.groovy index bd0f7d1aa2..49db0f3e9a 100644 --- a/regression-test/suites/partition_p2/auto_partition/two_stream_load/stress_test_two_stream_load.groovy +++ b/regression-test/suites/partition_p2/auto_partition/two_stream_load/stress_test_two_stream_load.groovy @@ -18,8 +18,6 @@ import groovy.io.FileType import java.nio.file.Files import java.nio.file.Paths -import java.net.URL -import java.io.File suite("stress_test_two_stream_load", "p2,nonConcurrent") { @@ -41,7 +39,7 @@ suite("stress_test_two_stream_load", "p2,nonConcurrent") { def data_count = 1 def cur_rows = 10000 - // 用doris-dbgen生成数据文件 + // use doris-dbgen product data file def doris_dbgen_create_data = { db_name, tb_name, part_type -> def rows = cur_rows // total rows to load def bulkSize = rows @@ -81,58 +79,32 @@ suite("stress_test_two_stream_load", "p2,nonConcurrent") { } } - def write_to_file = { cur_path, content -> - File file = new File(cur_path) - file.write(content) - } - - def cm1 - def cm2 - def doris_dbgen_load_data = { db_name, tb_name, part_type -> - def tableName = tb_name - - def jdbcUrl = context.config.jdbcUrl - def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) - def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) - def sql_port_res = sql """show backends;""" - println(sql_port_res) - if (sql_port_res.size < 2) { - assert(false) + def doris_dbgen_stream_load_data = { db_name, tb_name, part_type, i -> + def list = [] + def dir = new File("""${context.file.parent}""" + "/" + part_type + "_" + i) + dir.eachFileRecurse (FileType.FILES) { file -> + list << file } - def be_http_1 = sql_port_res[0][1] - def be_http_2 = sql_port_res[1][1] - def be_port_1 = sql_port_res[0][4] - def be_port_2 = sql_port_res[1][4] + logger.info(list[0].toString()) - String realDb = db_name - String user = context.config.jdbcUser - String password = context.config.jdbcPassword + streamLoad { + db "${db_name}" + table "${tb_name}" - for (int i = 1; i <= data_count; i++) { - def list = [] - def dir = new File("""${context.file.parent}""" + "/" + part_type + "_" + i) - dir.eachFileRecurse (FileType.FILES) { file -> - list << file + set 'column_separator', '|' + + file """${list[0].toString()}""" + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(0, json.NumberFilteredRows) } - - if (password) { - cm1 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load""" - cm2 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load""" - } else { - cm1 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load""" - cm2 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load""" - } - logger.info("command is: " + cm1) - logger.info("command is: " + cm2) - - def load_path_1 = """${context.file.parent}/thread_load_1.sh""" - write_to_file(load_path_1, cm1) - cm1 = "bash " + load_path_1 - - def load_path_2 = """${context.file.parent}/thread_load_2.sh""" - write_to_file(load_path_2, cm2) - cm2 = "bash " + load_path_2 - } } @@ -173,23 +145,12 @@ suite("stress_test_two_stream_load", "p2,nonConcurrent") { data_delete("list") doris_dbgen_create_data(database_name, tb_name1, "range") - doris_dbgen_load_data(database_name, tb_name2, "range") def thread1 = Thread.start { - logger.info("load1 start") - def proc = cm1.execute() - def sout = new StringBuilder(), serr = new StringBuilder() - proc.consumeProcessOutput(sout, serr) - proc.waitForOrKill(7200000) - logger.info("std out: " + sout + "std err: " + serr) + doris_dbgen_stream_load_data(database_name, tb_name2, "range", 1) } def thread2 = Thread.start { - logger.info("load2 start") - def proc = cm2.execute() - def sout = new StringBuilder(), serr = new StringBuilder() - proc.consumeProcessOutput(sout, serr) - proc.waitForOrKill(7200000) - logger.info("std out: " + sout + "std err: " + serr) + doris_dbgen_stream_load_data(database_name, tb_name2, "range", 1) } thread1.join() thread2.join() @@ -198,25 +159,14 @@ suite("stress_test_two_stream_load", "p2,nonConcurrent") { def partition_res_range = sql """show partitions from ${tb_name2};""" assertTrue(row_count_range[0][0] == partition_res_range.size) - doris_dbgen_create_data(database_name, tb_name4, "list") - doris_dbgen_load_data(database_name, tb_name3, "list") data_delete("range") + doris_dbgen_create_data(database_name, tb_name4, "list") def thread3 = Thread.start { - logger.info("load1 start") - def proc = cm1.execute() - def sout = new StringBuilder(), serr = new StringBuilder() - proc.consumeProcessOutput(sout, serr) - proc.waitForOrKill(7200000) - logger.info("std out: " + sout + "std err: " + serr) + doris_dbgen_stream_load_data(database_name, tb_name3, "list", 1) } def thread4 = Thread.start { - logger.info("load2 start") - def proc = cm2.execute() - def sout = new StringBuilder(), serr = new StringBuilder() - proc.consumeProcessOutput(sout, serr) - proc.waitForOrKill(7200000) - logger.info("std out: " + sout + "std err: " + serr) + doris_dbgen_stream_load_data(database_name, tb_name3, "list", 1) } thread3.join() thread4.join() diff --git a/regression-test/suites/partition_p2/auto_partition/two_stream_load/thread_load_1.sh b/regression-test/suites/partition_p2/auto_partition/two_stream_load/thread_load_1.sh deleted file mode 100644 index 6edd950cc7..0000000000 --- a/regression-test/suites/partition_p2/auto_partition/two_stream_load/thread_load_1.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. diff --git a/regression-test/suites/partition_p2/auto_partition/two_stream_load/thread_load_2.sh b/regression-test/suites/partition_p2/auto_partition/two_stream_load/thread_load_2.sh deleted file mode 100644 index 6edd950cc7..0000000000 --- a/regression-test/suites/partition_p2/auto_partition/two_stream_load/thread_load_2.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env bash -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License.