improve workload group regression stress test script (#26104)
This commit is contained in:
@ -1,237 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
import groovy.sql.Sql
|
||||
import org.apache.commons.math3.stat.StatUtils
|
||||
import org.apache.groovy.parser.antlr4.util.StringUtils
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
/*
|
||||
|
||||
how to use:
|
||||
|
||||
third party dependency:
|
||||
1 need install java and groovy
|
||||
2 commons-math3-3.6.1.jar/mysql-connector-java-5.1.38.jar
|
||||
|
||||
command to run:
|
||||
groovy -cp "lib/*" basic_workload_group_test.groovy
|
||||
lib contains commons-math3-3.6.1.jar/mysql-connector-java-5.1.38.jar
|
||||
|
||||
data:
|
||||
default sql and data comes from clickbench
|
||||
|
||||
*/
|
||||
|
||||
|
||||
def begin_time = System.currentTimeMillis()
|
||||
|
||||
def url = 'jdbc:mysql://127.0.0.1:9030/hits?useSSL=false'
|
||||
def username = 'root'
|
||||
def password = ''
|
||||
|
||||
AtomicBoolean should_stop = new AtomicBoolean(false);
|
||||
AtomicInteger bigq_succ_num = new AtomicInteger(0);
|
||||
AtomicInteger bigq_failed_num = new AtomicInteger(0);
|
||||
|
||||
AtomicInteger smallq_succ_num = new AtomicInteger(0);
|
||||
AtomicInteger smallq_failed_num = new AtomicInteger(0);
|
||||
|
||||
AtomicInteger concurrency_succ_num = new AtomicInteger(0);
|
||||
AtomicInteger concurrency_failed_num = new AtomicInteger(0);
|
||||
|
||||
def query_func = { sql, label, test_sql, time_array, succ_num, failed_num ->
|
||||
def start_time = System.currentTimeMillis()
|
||||
String err_msg = ""
|
||||
boolean is_succ = true
|
||||
try {
|
||||
sql.execute(test_sql)
|
||||
succ_num.incrementAndGet()
|
||||
} catch (Exception e) {
|
||||
failed_num.incrementAndGet()
|
||||
err_msg = e.getMessage()
|
||||
is_succ = false
|
||||
}
|
||||
if (!is_succ) {
|
||||
return
|
||||
}
|
||||
def end_time = System.currentTimeMillis()
|
||||
def exec_time = end_time - start_time
|
||||
time_array.add(exec_time)
|
||||
println(label + " : " + exec_time)
|
||||
println()
|
||||
}
|
||||
|
||||
// label, test name
|
||||
// group name, workload group name
|
||||
// test_sql, sql
|
||||
// file_name, the file contains sql, if file_name and test_sql are both not empty, then file_name works
|
||||
// concurrency, how many threads to send query at the same time
|
||||
// iterations, how many times to send query to doris
|
||||
// time_array, save query time
|
||||
def thread_query_func = { label, group_name, test_sql, file_name, concurrency, iterations, time_array, succ_num, failed_num ->
|
||||
def threads = []
|
||||
def cur_sql = test_sql
|
||||
if (!StringUtils.isEmpty(file_name)) {
|
||||
def q_file = new File(file_name)
|
||||
cur_sql = q_file.text
|
||||
}
|
||||
|
||||
for (int i = 0; i < concurrency; i++) {
|
||||
def cur_array = []
|
||||
time_array.add(cur_array);
|
||||
threads.add(Thread.startDaemon {
|
||||
def sql = Sql.newInstance(url, username, password, 'com.mysql.jdbc.Driver')
|
||||
if (group_name != "") {
|
||||
sql.execute("set workload_group='" + group_name + "'")
|
||||
}
|
||||
for (int j = 0; j < iterations; j++) {
|
||||
if (should_stop.get()) {
|
||||
break
|
||||
}
|
||||
query_func(sql, label + " " + j, cur_sql, cur_array, succ_num, failed_num)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
for (Thread t in threads) {
|
||||
t.join()
|
||||
}
|
||||
println(label + " query finished")
|
||||
should_stop.set(true)
|
||||
}
|
||||
|
||||
def calculate_tpxx = { label, timecost_array ->
|
||||
List<Double> ret_val1 = new ArrayList<>();
|
||||
for (int[] array1 : timecost_array) {
|
||||
for (int val : array1) {
|
||||
ret_val1.add((double) val);
|
||||
}
|
||||
}
|
||||
|
||||
double[] arr = ret_val1.toArray()
|
||||
double tp_50 = StatUtils.percentile(arr, 50)
|
||||
double tp_75 = StatUtils.percentile(arr, 75)
|
||||
double tp_90 = StatUtils.percentile(arr, 90)
|
||||
double tp_95 = StatUtils.percentile(arr, 95)
|
||||
double tp_99 = StatUtils.percentile(arr, 99)
|
||||
|
||||
println(label + " tp50=" + tp_50)
|
||||
println(label + " tp75=" + tp_75)
|
||||
println(label + " tp90=" + tp_90)
|
||||
println(label + " tp95=" + tp_95)
|
||||
println(label + " tp99=" + tp_99)
|
||||
}
|
||||
|
||||
def print_test_result = { label, c, i, time_cost, succ_num, failed_num ->
|
||||
println label + " iteration=" + i
|
||||
println label + " concurrency=" + c
|
||||
calculate_tpxx(label, time_cost)
|
||||
println label + " succ sum=" + succ_num.get()
|
||||
println label + " failed num=" + failed_num.get()
|
||||
println ""
|
||||
}
|
||||
|
||||
|
||||
|
||||
def test_two_group_query = {
|
||||
def bigquery = 'SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits.hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;'
|
||||
int bigquery_c = 1
|
||||
int bigquery_i = 1
|
||||
def bigquery_timecost = [][]
|
||||
def bigquery_group_name = ""
|
||||
def bigquery_file = ""
|
||||
def test_label_1 = "bigq"
|
||||
def bigquery_thread = Thread.start {
|
||||
thread_query_func(test_label_1, bigquery_group_name, bigquery, bigquery_file, bigquery_c, bigquery_i, bigquery_timecost, bigq_succ_num, bigq_failed_num);
|
||||
}
|
||||
|
||||
def smallquery = 'SELECT SUM(AdvEngineID), COUNT(*), AVG(ResolutionWidth) FROM hits.hits'
|
||||
int smallquery_c = 1
|
||||
int smallquery_i = 10
|
||||
def smallquery_timecost = [][]
|
||||
def small_group_name = ""
|
||||
def small_query_file = ""
|
||||
def test_label_2 = "smallq"
|
||||
def smallquery_thread = Thread.start {
|
||||
thread_query_func(test_label_2, small_group_name, smallquery, small_query_file, smallquery_c, smallquery_i, smallquery_timecost, smallq_succ_num, smallq_failed_num);
|
||||
}
|
||||
|
||||
bigquery_thread.join()
|
||||
smallquery_thread.join()
|
||||
|
||||
println ""
|
||||
print_test_result(test_label_1, bigquery_c, bigquery_i, bigquery_timecost, bigq_succ_num, bigq_failed_num)
|
||||
print_test_result(test_label_2, smallquery_c, smallquery_i, smallquery_timecost, smallq_succ_num, smallq_failed_num)
|
||||
|
||||
}
|
||||
|
||||
def test_concurrency = {
|
||||
def test_label = "concurrency"
|
||||
def group_name = ""
|
||||
def query = 'SELECT WatchID, ClientIP, COUNT(*) AS c, SUM(IsRefresh), AVG(ResolutionWidth) FROM hits.hits GROUP BY WatchID, ClientIP ORDER BY c DESC LIMIT 10;'
|
||||
def query_file = ""
|
||||
int c = 1
|
||||
int i = 10
|
||||
def timecost = [][]
|
||||
def test_thread = Thread.start {
|
||||
thread_query_func(test_label, group_name, query, query_file, c, i, timecost, concurrency_succ_num, concurrency_failed_num);
|
||||
}
|
||||
test_thread.join()
|
||||
|
||||
println ""
|
||||
print_test_result(test_label, c, i, timecost, concurrency_succ_num, concurrency_failed_num)
|
||||
}
|
||||
|
||||
def show_global_config = {
|
||||
println "========== show global config info"
|
||||
def show_sql_con = Sql.newInstance(url, username, password, 'com.mysql.jdbc.Driver')
|
||||
def show_sql1 = "show variables like '%experimental_enable_pipeline_engine%'"
|
||||
def show_sql2 = "ADMIN SHOW FRONTEND CONFIG like '%enable_workload_group%';"
|
||||
def show_sql3 = "show variables like '%parallel_fragment_exec_instance_num%';"
|
||||
def show_sql4 = "show variables like '%parallel_pipeline_task_num%';"
|
||||
show_sql_con.eachRow(show_sql1,) { row ->
|
||||
println row[0] + " = " + row[1]
|
||||
}
|
||||
|
||||
show_sql_con.eachRow(show_sql2,) { row ->
|
||||
println row[0] + " = " + row[1]
|
||||
}
|
||||
|
||||
show_sql_con.eachRow(show_sql3,) { row ->
|
||||
println row[0] + " = " + row[1]
|
||||
}
|
||||
|
||||
show_sql_con.eachRow(show_sql4,) { row ->
|
||||
println row[0] + " = " + row[1]
|
||||
}
|
||||
}
|
||||
|
||||
// note(wb) you can close the comment to test
|
||||
|
||||
// test 1, test two group runs at same time
|
||||
//test_two_group_query()
|
||||
|
||||
// test2, just run one group to test concurrency
|
||||
//test_concurrency()
|
||||
|
||||
// show config
|
||||
//show_global_config()
|
||||
|
||||
//println "==========Test finish, time cost=" + (System.currentTimeMillis() - begin_time) / 1000
|
||||
@ -0,0 +1,46 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
global_conf.enable_test="false"
|
||||
global_conf.url="jdbc:mysql://127.0.0.1:8030/hits?useSSL=false"
|
||||
global_conf.username="root"
|
||||
global_conf.password=""
|
||||
global_conf.enable_pipe="true"
|
||||
global_conf.enable_group="true"
|
||||
|
||||
/*
|
||||
about query directory
|
||||
1 All SQL can be placed in one file separated by ";"
|
||||
2 Placing SQL in multiple files, just like
|
||||
q1.sql, q2.sql, q3.sql
|
||||
*/
|
||||
|
||||
ckbench_query.label="ckbench query"
|
||||
ckbench_query.dir="../query/ckbench_query/"
|
||||
ckbench_query.c="1"
|
||||
ckbench_query.i="1"
|
||||
ckbench_query.group="normal"
|
||||
ckbench_query.db="hits"
|
||||
|
||||
tpch_query.label="tpch query"
|
||||
tpch_query.dir="../query/tpch_query/"
|
||||
tpch_query.c="1"
|
||||
tpch_query.i="1"
|
||||
tpch_query.group="test"
|
||||
tpch_query.db="tpch100"
|
||||
|
||||
|
||||
@ -0,0 +1,197 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
import groovy.sql.Sql
|
||||
import org.apache.groovy.parser.antlr4.util.StringUtils
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import org.apache.commons.math3.stat.StatUtils
|
||||
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
|
||||
/*
|
||||
|
||||
how to use:
|
||||
|
||||
third party dependency:
|
||||
1 need install java and groovy
|
||||
2 commons-math3-3.6.1.jar/mysql-connector-java-5.1.38.jar
|
||||
|
||||
command to run:
|
||||
groovy -cp "lib/*" mixed_query_test_conf.groovy
|
||||
lib contains commons-math3-3.6.1.jar/mysql-connector-java-5.1.38.jar
|
||||
|
||||
*/
|
||||
|
||||
def begin_time = System.currentTimeMillis()
|
||||
|
||||
ReentrantLock write_ret_lock = new ReentrantLock()
|
||||
List<String> print_ret = new ArrayList<>()
|
||||
|
||||
def test_conf = new ConfigSlurper()
|
||||
.parse(
|
||||
new File("conf/mixed_query_test_conf.groovy")
|
||||
.toURI()
|
||||
.toURL()
|
||||
)
|
||||
|
||||
|
||||
boolean enable_test = Boolean.parseBoolean(test_conf.global_conf.enable_test)
|
||||
if (!enable_test) {
|
||||
System.exit(0)
|
||||
}
|
||||
|
||||
url = test_conf.global_conf.url
|
||||
username = test_conf.global_conf.username
|
||||
password = test_conf.global_conf.password
|
||||
boolean enable_pipe = Boolean.parseBoolean(test_conf.global_conf.enable_pipe)
|
||||
boolean enable_group = Boolean.parseBoolean(test_conf.global_conf.enable_group)
|
||||
|
||||
AtomicBoolean should_stop = new AtomicBoolean(false);
|
||||
|
||||
def calculate_tpxx = { label, timecost_array, list ->
|
||||
List<Double> ret_val1 = new ArrayList<>();
|
||||
for (int[] array1 : timecost_array) {
|
||||
for (int val : array1) {
|
||||
ret_val1.add((double) val);
|
||||
}
|
||||
}
|
||||
|
||||
double[] arr = ret_val1.toArray()
|
||||
double tp_50 = StatUtils.percentile(arr, 50)
|
||||
double tp_75 = StatUtils.percentile(arr, 75)
|
||||
double tp_90 = StatUtils.percentile(arr, 90)
|
||||
double tp_95 = StatUtils.percentile(arr, 95)
|
||||
double tp_99 = StatUtils.percentile(arr, 99)
|
||||
|
||||
list.add(label + " tp50=" + tp_50)
|
||||
list.add(label + " tp75=" + tp_75)
|
||||
list.add(label + " tp90=" + tp_90)
|
||||
list.add(label + " tp95=" + tp_95)
|
||||
list.add(label + " tp99=" + tp_99)
|
||||
}
|
||||
|
||||
|
||||
def query_func = { conf ->
|
||||
AtomicInteger succ_num = new AtomicInteger(0)
|
||||
AtomicInteger failed_num = new AtomicInteger(0)
|
||||
long query_func_begin_time = System.currentTimeMillis()
|
||||
|
||||
// 1 get sql list
|
||||
List<String> sql_array_list = new ArrayList<String>()
|
||||
def sql_file_dir = new File(conf.dir)
|
||||
File[] fs_list = sql_file_dir.listFiles()
|
||||
for (File sql_file : fs_list) {
|
||||
String[] sql_arr = sql_file.text.split(";")
|
||||
for (String sql : sql_arr) {
|
||||
sql = sql.trim()
|
||||
if (StringUtils.isEmpty(sql)) {
|
||||
continue
|
||||
}
|
||||
sql_array_list.add(sql)
|
||||
}
|
||||
}
|
||||
|
||||
List<Long> timeCost = new ArrayList<>()
|
||||
// 2 submit query
|
||||
int concurrency = Integer.parseInt(conf.c)
|
||||
int iteration = Integer.parseInt(conf.i)
|
||||
def threads = []
|
||||
for (int i = 0; i < concurrency; i++) {
|
||||
int curindex = i
|
||||
threads.add(Thread.startDaemon {
|
||||
def sql = Sql.newInstance(url, username, password, 'com.mysql.jdbc.Driver')
|
||||
if (enable_group && !StringUtils.isEmpty(conf.group)) {
|
||||
sql.execute("set workload_group='" + conf.group + "'")
|
||||
}
|
||||
if (enable_pipe) {
|
||||
sql.execute("set enable_pipeline_engine=true")
|
||||
} else {
|
||||
sql.execute("set enable_pipeline_engine=false")
|
||||
}
|
||||
if (!StringUtils.isEmpty(conf.db)) {
|
||||
sql.execute("use " + conf.db + ";")
|
||||
}
|
||||
for (int j = 0; j < iteration; j++) {
|
||||
if (should_stop.get()) {
|
||||
break
|
||||
}
|
||||
|
||||
for (int k = 0; k < sql_array_list.size(); k++) {
|
||||
if (should_stop.get()) {
|
||||
break
|
||||
}
|
||||
|
||||
String query_sql = sql_array_list.get(k)
|
||||
if (StringUtils.isEmpty(query_sql)) {
|
||||
continue
|
||||
}
|
||||
|
||||
def query_start_time = System.currentTimeMillis()
|
||||
boolean is_succ = true;
|
||||
try {
|
||||
sql.execute(query_sql)
|
||||
succ_num.incrementAndGet()
|
||||
} catch (Exception e) {
|
||||
is_succ = false;
|
||||
failed_num.incrementAndGet()
|
||||
}
|
||||
if (!is_succ) {
|
||||
continue
|
||||
}
|
||||
int query_time = System.currentTimeMillis() - query_start_time
|
||||
println conf.label + " " + curindex + "," + j + "," + k + " : " + query_time + " ms"
|
||||
timeCost.add(query_time)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
for (Thread t : threads) {
|
||||
t.join()
|
||||
}
|
||||
long query_func_timecost = System.currentTimeMillis() - query_func_begin_time;
|
||||
|
||||
// 3 print test result
|
||||
write_ret_lock.lock()
|
||||
print_ret.add("\n")
|
||||
print_ret.add("=============" + conf.label + "=============")
|
||||
print_ret.add(conf.label + " iteration=" + iteration)
|
||||
print_ret.add(conf.label + " concurrency=" + concurrency)
|
||||
calculate_tpxx(conf.label, timeCost, print_ret)
|
||||
print_ret.add(conf.label + " succ sum=" + succ_num.get())
|
||||
print_ret.add(conf.label + " failed num=" + failed_num.get())
|
||||
print_ret.add(conf.label + " workload group=" + conf.group)
|
||||
print_ret.add(conf.label + " time cost=" + query_func_timecost)
|
||||
print_ret.add("==========================")
|
||||
|
||||
write_ret_lock.unlock()
|
||||
}
|
||||
|
||||
def t1 = Thread.start { query_func(test_conf.ckbench_query) }
|
||||
def t2 = Thread.start { query_func(test_conf.tpch_query) }
|
||||
|
||||
t1.join()
|
||||
t2.join()
|
||||
|
||||
for (int i = 0; i < print_ret.size(); i++) {
|
||||
println(print_ret.get(i))
|
||||
}
|
||||
|
||||
def end_time = System.currentTimeMillis()
|
||||
|
||||
println "time cost=" + (end_time - begin_time)
|
||||
@ -1,18 +0,0 @@
|
||||
-- Licensed to the Apache Software Foundation (ASF) under one
|
||||
-- or more contributor license agreements. See the NOTICE file
|
||||
-- distributed with this work for additional information
|
||||
-- regarding copyright ownership. The ASF licenses this file
|
||||
-- to you under the Apache License, Version 2.0 (the
|
||||
-- "License"); you may not use this file except in compliance
|
||||
-- with the License. You may obtain a copy of the License at
|
||||
--
|
||||
-- http://www.apache.org/licenses/LICENSE-2.0
|
||||
--
|
||||
-- Unless required by applicable law or agreed to in writing,
|
||||
-- software distributed under the License is distributed on an
|
||||
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
-- KIND, either express or implied. See the License for the
|
||||
-- specific language governing permissions and limitations
|
||||
-- under the License.
|
||||
|
||||
SELECT REGEXP_REPLACE(Referer, '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length(Referer)) AS l, COUNT(*) AS c, MIN(Referer) FROM hits WHERE Referer <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
|
||||
Reference in New Issue
Block a user