[fix](test)change stream load of auto partition (#32544)
This commit is contained in:
@ -41,7 +41,7 @@ suite("stress_test_diff_date_list", "p2,nonConcurrent") {
|
||||
def data_count = 2
|
||||
def cur_rows = 1000000
|
||||
|
||||
// 用doris-dbgen生成数据文件
|
||||
// use doris-dbgen product data file
|
||||
def doris_dbgen_create_data = { db_name, tb_name, part_type, i ->
|
||||
def rows = cur_rows // total rows to load
|
||||
def bulkSize = rows
|
||||
@ -86,63 +86,33 @@ suite("stress_test_diff_date_list", "p2,nonConcurrent") {
|
||||
file.write(content)
|
||||
}
|
||||
|
||||
def cm1
|
||||
def cm2
|
||||
def sql_port_res = sql """show backends;"""
|
||||
println(sql_port_res)
|
||||
if (sql_port_res.size < 2) {
|
||||
assert(false)
|
||||
}
|
||||
def be_http_1 = sql_port_res[0][1]
|
||||
def be_http_2 = sql_port_res[1][1]
|
||||
def be_port_1 = sql_port_res[0][4]
|
||||
def be_port_2 = sql_port_res[1][4]
|
||||
|
||||
def doris_dbgen_load_data = { db_name, tb_name, part_type, i ->
|
||||
def tableName = tb_name
|
||||
|
||||
def jdbcUrl = context.config.jdbcUrl
|
||||
def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
|
||||
def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":"))
|
||||
|
||||
String realDb = db_name
|
||||
String user = context.config.jdbcUser
|
||||
String password = context.config.jdbcPassword
|
||||
|
||||
def doris_dbgen_stream_load_data = { db_name, tb_name, part_type, i ->
|
||||
def list = []
|
||||
def dir = new File("""${context.file.parent}""" + "/" + part_type + "_" + i)
|
||||
dir.eachFileRecurse (FileType.FILES) { file ->
|
||||
list << file
|
||||
}
|
||||
logger.info(list[0].toString())
|
||||
|
||||
if (password) {
|
||||
if (i == 1) {
|
||||
cm1 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load"""
|
||||
} else if (i == 2) {
|
||||
cm2 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load"""
|
||||
}
|
||||
} else {
|
||||
if (i == 1) {
|
||||
cm1 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load"""
|
||||
} else if (i == 2) {
|
||||
cm2 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load"""
|
||||
streamLoad {
|
||||
db "${db_name}"
|
||||
table "${tb_name}"
|
||||
|
||||
set 'column_separator', '|'
|
||||
|
||||
file """${list[0].toString()}"""
|
||||
time 10000 // limit inflight 10s
|
||||
|
||||
check { result, exception, startTime, endTime ->
|
||||
if (exception != null) {
|
||||
throw exception
|
||||
}
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertEquals(0, json.NumberFilteredRows)
|
||||
}
|
||||
}
|
||||
logger.info("command is: " + cm1)
|
||||
logger.info("command is: " + cm2)
|
||||
|
||||
def load_path_1 = """${context.file.parent}/thread_load_1.sh"""
|
||||
if (i == 1) {
|
||||
write_to_file(load_path_1, cm1)
|
||||
cm1 = "bash " + load_path_1
|
||||
}
|
||||
|
||||
def load_path_2 = """${context.file.parent}/thread_load_2.sh"""
|
||||
if (i == 2) {
|
||||
write_to_file(load_path_2, cm2)
|
||||
cm2 = "bash " + load_path_2
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
def data_delete = { part_type ->
|
||||
@ -167,7 +137,6 @@ suite("stress_test_diff_date_list", "p2,nonConcurrent") {
|
||||
dir.eachFileRecurse (FileType.FILES) { file ->
|
||||
list << file
|
||||
}
|
||||
println(list[0])
|
||||
|
||||
File file = list[0]
|
||||
file.append(content)
|
||||
@ -191,25 +160,11 @@ suite("stress_test_diff_date_list", "p2,nonConcurrent") {
|
||||
append_to_file(add_one_row)
|
||||
doris_dbgen_create_data(database_name, tb_name2, "list", 2)
|
||||
|
||||
doris_dbgen_load_data(database_name, tb_name1, "list", 1)
|
||||
doris_dbgen_load_data(database_name, tb_name1, "list", 2)
|
||||
|
||||
def thread3 = Thread.start {
|
||||
logger.info("load1 start")
|
||||
def proc = cm1.execute()
|
||||
def sout = new StringBuilder(), serr = new StringBuilder()
|
||||
proc.consumeProcessOutput(sout, serr)
|
||||
proc.waitForOrKill(7200000)
|
||||
logger.info("std out: " + sout + "std err: " + serr)
|
||||
doris_dbgen_stream_load_data(database_name, tb_name1, "list", 1)
|
||||
}
|
||||
def thread4 = Thread.start {
|
||||
sleep(1 * 1000)
|
||||
logger.info("load2 start")
|
||||
def proc = cm2.execute()
|
||||
def sout = new StringBuilder(), serr = new StringBuilder()
|
||||
proc.consumeProcessOutput(sout, serr)
|
||||
proc.waitForOrKill(7200000)
|
||||
logger.info("std out: " + sout + "std err: " + serr)
|
||||
doris_dbgen_stream_load_data(database_name, tb_name1, "list", 2)
|
||||
}
|
||||
thread3.join()
|
||||
thread4.join()
|
||||
|
||||
@ -1,17 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
@ -1,17 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
@ -42,7 +42,7 @@ suite("stress_test_high_concurrency_load", "p2,nonConcurrent") {
|
||||
def data_count = 10
|
||||
def cur_rows = 100000
|
||||
|
||||
// 用doris-dbgen生成数据文件
|
||||
// use doris-dbgen product data file
|
||||
def doris_dbgen_create_data = { db_name, tb_name, part_type ->
|
||||
def rows = cur_rows // total rows to load
|
||||
def bulkSize = rows
|
||||
@ -82,54 +82,32 @@ suite("stress_test_high_concurrency_load", "p2,nonConcurrent") {
|
||||
}
|
||||
}
|
||||
|
||||
def write_to_file = { cur_path, content ->
|
||||
File file = new File(cur_path)
|
||||
file.write(content)
|
||||
}
|
||||
|
||||
def cm_list = []
|
||||
def doris_dbgen_load_data = { db_name, tb_name, part_type ->
|
||||
def tableName = tb_name
|
||||
|
||||
def jdbcUrl = context.config.jdbcUrl
|
||||
def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
|
||||
def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":"))
|
||||
def sql_port
|
||||
if (urlWithoutSchema.indexOf("/") >= 0) {
|
||||
// e.g: jdbc:mysql://locahost:8080/?a=b
|
||||
sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/"))
|
||||
} else {
|
||||
// e.g: jdbc:mysql://locahost:8080
|
||||
sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1)
|
||||
def doris_dbgen_stream_load_data = { db_name, tb_name, part_type, i ->
|
||||
def list = []
|
||||
def dir = new File("""${context.file.parent}""" + "/" + part_type + "/" + part_type + "_" + i)
|
||||
dir.eachFileRecurse (FileType.FILES) { file ->
|
||||
list << file
|
||||
}
|
||||
String feHttpAddress = context.config.feHttpAddress
|
||||
def http_port = feHttpAddress.substring(feHttpAddress.indexOf(":") + 1)
|
||||
logger.info(list[0].toString())
|
||||
|
||||
String realDb = db_name
|
||||
String user = context.config.jdbcUser
|
||||
String password = context.config.jdbcPassword
|
||||
streamLoad {
|
||||
db "${db_name}"
|
||||
table "${tb_name}"
|
||||
|
||||
set 'column_separator', '|'
|
||||
|
||||
for (int i = 0; i < data_count; i++) {
|
||||
def cm = ""
|
||||
def list = []
|
||||
def dir = new File("""${context.file.parent}""" + "/" + part_type + "/" + part_type + "_" + i)
|
||||
dir.eachFileRecurse (FileType.FILES) { file ->
|
||||
list << file
|
||||
file """${list[0].toString()}"""
|
||||
time 10000 // limit inflight 10s
|
||||
|
||||
check { result, exception, startTime, endTime ->
|
||||
if (exception != null) {
|
||||
throw exception
|
||||
}
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertEquals(0, json.NumberFilteredRows)
|
||||
}
|
||||
|
||||
if (password) {
|
||||
cm = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${sql_ip}:${http_port}/api/${realDb}/${tableName}/_stream_load"""
|
||||
} else {
|
||||
cm = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${sql_ip}:${http_port}/api/${realDb}/${tableName}/_stream_load"""
|
||||
}
|
||||
logger.info("command is: " + cm)
|
||||
|
||||
def load_path = """${context.file.parent}/range/thread_load_${i}.sh"""
|
||||
write_to_file(load_path, cm)
|
||||
cm = """bash ${context.file.parent}/range/thread_load_${i}.sh"""
|
||||
def cm_copy = cm
|
||||
cm_list = cm_list.toList() + [cm_copy]
|
||||
}
|
||||
}
|
||||
|
||||
@ -150,29 +128,18 @@ suite("stress_test_high_concurrency_load", "p2,nonConcurrent") {
|
||||
|
||||
data_delete("range")
|
||||
doris_dbgen_create_data(database_name, tb_name2, "range")
|
||||
doris_dbgen_load_data(database_name, tb_name2, "range")
|
||||
|
||||
def thread_thread_1000 = []
|
||||
def concurrent_load = { str ->
|
||||
logger.info("load1 start:" + str)
|
||||
logger.info("cm: " + str)
|
||||
def proc = str.execute()
|
||||
def sout = new StringBuilder(), serr = new StringBuilder()
|
||||
proc.consumeProcessOutput(sout, serr)
|
||||
proc.waitForOrKill(7200000)
|
||||
logger.info("std out: " + sout + "std err: " + serr)
|
||||
}
|
||||
|
||||
thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[0])})
|
||||
thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[1])})
|
||||
thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[2])})
|
||||
thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[3])})
|
||||
thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[4])})
|
||||
thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[5])})
|
||||
thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[6])})
|
||||
thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[7])})
|
||||
thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[8])})
|
||||
thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[9])})
|
||||
thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 0)})
|
||||
thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 1)})
|
||||
thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 2)})
|
||||
thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 3)})
|
||||
thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 4)})
|
||||
thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 5)})
|
||||
thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 6)})
|
||||
thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 7)})
|
||||
thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 8)})
|
||||
thread_thread_1000.add(Thread.startDaemon {doris_dbgen_stream_load_data(database_name, tb_name2, "range", 9)})
|
||||
|
||||
for (Thread th in thread_thread_1000) {
|
||||
th.join()
|
||||
|
||||
@ -41,7 +41,7 @@ suite("stress_test_same_date_range", "p2,nonConcurrent") {
|
||||
def data_count = 1
|
||||
def cur_rows = 100000
|
||||
|
||||
// 用doris-dbgen生成数据文件
|
||||
// use doris-dbgen product data file
|
||||
def doris_dbgen_create_data = { db_name, tb_name, part_type ->
|
||||
def rows = cur_rows // total rows to load
|
||||
def bulkSize = rows
|
||||
@ -81,58 +81,32 @@ suite("stress_test_same_date_range", "p2,nonConcurrent") {
|
||||
}
|
||||
}
|
||||
|
||||
def write_to_file = { cur_path, content ->
|
||||
File file = new File(cur_path)
|
||||
file.write(content)
|
||||
}
|
||||
|
||||
def cm1
|
||||
def cm2
|
||||
def doris_dbgen_load_data = { db_name, tb_name, part_type ->
|
||||
def tableName = tb_name
|
||||
|
||||
def jdbcUrl = context.config.jdbcUrl
|
||||
def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
|
||||
def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":"))
|
||||
def sql_port_res = sql """show backends;"""
|
||||
println(sql_port_res)
|
||||
if (sql_port_res.size < 2) {
|
||||
assert(false)
|
||||
def doris_dbgen_stream_load_data = { db_name, tb_name, part_type, i ->
|
||||
def list = []
|
||||
def dir = new File("""${context.file.parent}""" + "/" + part_type + "_" + i)
|
||||
dir.eachFileRecurse (FileType.FILES) { file ->
|
||||
list << file
|
||||
}
|
||||
def be_http_1 = sql_port_res[0][1]
|
||||
def be_http_2 = sql_port_res[1][1]
|
||||
def be_port_1 = sql_port_res[0][4]
|
||||
def be_port_2 = sql_port_res[1][4]
|
||||
logger.info(list[0].toString())
|
||||
|
||||
String realDb = db_name
|
||||
String user = context.config.jdbcUser
|
||||
String password = context.config.jdbcPassword
|
||||
streamLoad {
|
||||
db "${db_name}"
|
||||
table "${tb_name}"
|
||||
|
||||
for (int i = 1; i <= data_count; i++) {
|
||||
def list = []
|
||||
def dir = new File("""${context.file.parent}""" + "/" + part_type + "_" + i)
|
||||
dir.eachFileRecurse (FileType.FILES) { file ->
|
||||
list << file
|
||||
set 'column_separator', '|'
|
||||
|
||||
file """${list[0].toString()}"""
|
||||
time 10000 // limit inflight 10s
|
||||
|
||||
check { result, exception, startTime, endTime ->
|
||||
if (exception != null) {
|
||||
throw exception
|
||||
}
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertEquals(0, json.NumberFilteredRows)
|
||||
}
|
||||
|
||||
if (password) {
|
||||
cm1 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load"""
|
||||
cm2 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load"""
|
||||
} else {
|
||||
cm1 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load"""
|
||||
cm2 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load"""
|
||||
}
|
||||
logger.info("command is: " + cm1)
|
||||
logger.info("command is: " + cm2)
|
||||
|
||||
def load_path_1 = """${context.file.parent}/thread_load_1.sh"""
|
||||
write_to_file(load_path_1, cm1)
|
||||
cm1 = "bash " + load_path_1
|
||||
|
||||
def load_path_2 = """${context.file.parent}/thread_load_2.sh"""
|
||||
write_to_file(load_path_2, cm2)
|
||||
cm2 = "bash " + load_path_2
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -165,23 +139,12 @@ suite("stress_test_same_date_range", "p2,nonConcurrent") {
|
||||
|
||||
data_delete("range")
|
||||
doris_dbgen_create_data(database_name, tb_name1, "range")
|
||||
doris_dbgen_load_data(database_name, tb_name2, "range")
|
||||
|
||||
def thread1 = Thread.start {
|
||||
logger.info("load1 start")
|
||||
def proc = cm1.execute()
|
||||
def sout = new StringBuilder(), serr = new StringBuilder()
|
||||
proc.consumeProcessOutput(sout, serr)
|
||||
proc.waitForOrKill(7200000)
|
||||
logger.info("std out: " + sout + "std err: " + serr)
|
||||
doris_dbgen_stream_load_data(database_name, tb_name2, "range", 1)
|
||||
}
|
||||
def thread2 = Thread.start {
|
||||
logger.info("load2 start")
|
||||
def proc = cm2.execute()
|
||||
def sout = new StringBuilder(), serr = new StringBuilder()
|
||||
proc.consumeProcessOutput(sout, serr)
|
||||
proc.waitForOrKill(7200000)
|
||||
logger.info("std out: " + sout + "std err: " + serr)
|
||||
doris_dbgen_stream_load_data(database_name, tb_name2, "range", 1)
|
||||
}
|
||||
thread1.join()
|
||||
thread2.join()
|
||||
|
||||
@ -1,17 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
@ -1,17 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
@ -18,8 +18,6 @@
|
||||
import groovy.io.FileType
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Paths
|
||||
import java.net.URL
|
||||
import java.io.File
|
||||
|
||||
suite("stress_test_two_stream_load", "p2,nonConcurrent") {
|
||||
|
||||
@ -41,7 +39,7 @@ suite("stress_test_two_stream_load", "p2,nonConcurrent") {
|
||||
def data_count = 1
|
||||
def cur_rows = 10000
|
||||
|
||||
// 用doris-dbgen生成数据文件
|
||||
// use doris-dbgen product data file
|
||||
def doris_dbgen_create_data = { db_name, tb_name, part_type ->
|
||||
def rows = cur_rows // total rows to load
|
||||
def bulkSize = rows
|
||||
@ -81,58 +79,32 @@ suite("stress_test_two_stream_load", "p2,nonConcurrent") {
|
||||
}
|
||||
}
|
||||
|
||||
def write_to_file = { cur_path, content ->
|
||||
File file = new File(cur_path)
|
||||
file.write(content)
|
||||
}
|
||||
|
||||
def cm1
|
||||
def cm2
|
||||
def doris_dbgen_load_data = { db_name, tb_name, part_type ->
|
||||
def tableName = tb_name
|
||||
|
||||
def jdbcUrl = context.config.jdbcUrl
|
||||
def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
|
||||
def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":"))
|
||||
def sql_port_res = sql """show backends;"""
|
||||
println(sql_port_res)
|
||||
if (sql_port_res.size < 2) {
|
||||
assert(false)
|
||||
def doris_dbgen_stream_load_data = { db_name, tb_name, part_type, i ->
|
||||
def list = []
|
||||
def dir = new File("""${context.file.parent}""" + "/" + part_type + "_" + i)
|
||||
dir.eachFileRecurse (FileType.FILES) { file ->
|
||||
list << file
|
||||
}
|
||||
def be_http_1 = sql_port_res[0][1]
|
||||
def be_http_2 = sql_port_res[1][1]
|
||||
def be_port_1 = sql_port_res[0][4]
|
||||
def be_port_2 = sql_port_res[1][4]
|
||||
logger.info(list[0].toString())
|
||||
|
||||
String realDb = db_name
|
||||
String user = context.config.jdbcUser
|
||||
String password = context.config.jdbcPassword
|
||||
streamLoad {
|
||||
db "${db_name}"
|
||||
table "${tb_name}"
|
||||
|
||||
for (int i = 1; i <= data_count; i++) {
|
||||
def list = []
|
||||
def dir = new File("""${context.file.parent}""" + "/" + part_type + "_" + i)
|
||||
dir.eachFileRecurse (FileType.FILES) { file ->
|
||||
list << file
|
||||
set 'column_separator', '|'
|
||||
|
||||
file """${list[0].toString()}"""
|
||||
time 10000 // limit inflight 10s
|
||||
|
||||
check { result, exception, startTime, endTime ->
|
||||
if (exception != null) {
|
||||
throw exception
|
||||
}
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertEquals(0, json.NumberFilteredRows)
|
||||
}
|
||||
|
||||
if (password) {
|
||||
cm1 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load"""
|
||||
cm2 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load"""
|
||||
} else {
|
||||
cm1 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load"""
|
||||
cm2 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load"""
|
||||
}
|
||||
logger.info("command is: " + cm1)
|
||||
logger.info("command is: " + cm2)
|
||||
|
||||
def load_path_1 = """${context.file.parent}/thread_load_1.sh"""
|
||||
write_to_file(load_path_1, cm1)
|
||||
cm1 = "bash " + load_path_1
|
||||
|
||||
def load_path_2 = """${context.file.parent}/thread_load_2.sh"""
|
||||
write_to_file(load_path_2, cm2)
|
||||
cm2 = "bash " + load_path_2
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -173,23 +145,12 @@ suite("stress_test_two_stream_load", "p2,nonConcurrent") {
|
||||
data_delete("list")
|
||||
|
||||
doris_dbgen_create_data(database_name, tb_name1, "range")
|
||||
doris_dbgen_load_data(database_name, tb_name2, "range")
|
||||
|
||||
def thread1 = Thread.start {
|
||||
logger.info("load1 start")
|
||||
def proc = cm1.execute()
|
||||
def sout = new StringBuilder(), serr = new StringBuilder()
|
||||
proc.consumeProcessOutput(sout, serr)
|
||||
proc.waitForOrKill(7200000)
|
||||
logger.info("std out: " + sout + "std err: " + serr)
|
||||
doris_dbgen_stream_load_data(database_name, tb_name2, "range", 1)
|
||||
}
|
||||
def thread2 = Thread.start {
|
||||
logger.info("load2 start")
|
||||
def proc = cm2.execute()
|
||||
def sout = new StringBuilder(), serr = new StringBuilder()
|
||||
proc.consumeProcessOutput(sout, serr)
|
||||
proc.waitForOrKill(7200000)
|
||||
logger.info("std out: " + sout + "std err: " + serr)
|
||||
doris_dbgen_stream_load_data(database_name, tb_name2, "range", 1)
|
||||
}
|
||||
thread1.join()
|
||||
thread2.join()
|
||||
@ -198,25 +159,14 @@ suite("stress_test_two_stream_load", "p2,nonConcurrent") {
|
||||
def partition_res_range = sql """show partitions from ${tb_name2};"""
|
||||
assertTrue(row_count_range[0][0] == partition_res_range.size)
|
||||
|
||||
doris_dbgen_create_data(database_name, tb_name4, "list")
|
||||
doris_dbgen_load_data(database_name, tb_name3, "list")
|
||||
data_delete("range")
|
||||
doris_dbgen_create_data(database_name, tb_name4, "list")
|
||||
|
||||
def thread3 = Thread.start {
|
||||
logger.info("load1 start")
|
||||
def proc = cm1.execute()
|
||||
def sout = new StringBuilder(), serr = new StringBuilder()
|
||||
proc.consumeProcessOutput(sout, serr)
|
||||
proc.waitForOrKill(7200000)
|
||||
logger.info("std out: " + sout + "std err: " + serr)
|
||||
doris_dbgen_stream_load_data(database_name, tb_name3, "list", 1)
|
||||
}
|
||||
def thread4 = Thread.start {
|
||||
logger.info("load2 start")
|
||||
def proc = cm2.execute()
|
||||
def sout = new StringBuilder(), serr = new StringBuilder()
|
||||
proc.consumeProcessOutput(sout, serr)
|
||||
proc.waitForOrKill(7200000)
|
||||
logger.info("std out: " + sout + "std err: " + serr)
|
||||
doris_dbgen_stream_load_data(database_name, tb_name3, "list", 1)
|
||||
}
|
||||
thread3.join()
|
||||
thread4.join()
|
||||
|
||||
@ -1,17 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
@ -1,17 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
Reference in New Issue
Block a user