[fix](insert) fix group commit regression test (#28142)
This commit is contained in:
@ -214,54 +214,54 @@ suite("insert_group_commit_into") {
|
||||
// try_sql("DROP TABLE ${table}")
|
||||
}
|
||||
|
||||
// test connect to observer fe
|
||||
try {
|
||||
def fes = sql_return_maparray "show frontends"
|
||||
logger.info("frontends: ${fes}")
|
||||
if (fes.size() > 1) {
|
||||
def observer_fe = null
|
||||
for (def fe : fes) {
|
||||
if (fe.IsMaster == "false") {
|
||||
observer_fe = fe
|
||||
break
|
||||
}
|
||||
}
|
||||
if (observer_fe != null) {
|
||||
def url = "jdbc:mysql://${observer_fe.Host}:${observer_fe.QueryPort}/"
|
||||
logger.info("observer url: " + url)
|
||||
connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = url) {
|
||||
sql """ set enable_insert_group_commit = true; """
|
||||
sql """ set enable_nereids_dml = false; """
|
||||
sql """ set enable_profile= true; """
|
||||
|
||||
// 1. insert into
|
||||
def server_info = group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1
|
||||
assertTrue(server_info.contains('query_id'))
|
||||
// get query_id, such as 43f87963586a482a-b0496bcf9e2b5555
|
||||
def query_id_index = server_info.indexOf("'query_id':'") + "'query_id':'".length()
|
||||
def query_id = server_info.substring(query_id_index, query_id_index + 33)
|
||||
logger.info("query_id: " + query_id)
|
||||
// 2. check profile
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("curl -X GET -u ${context.config.jdbcUser}:${context.config.jdbcPassword} http://${observer_fe.Host}:${observer_fe.HttpPort}")
|
||||
sb.append("/api/profile?query_id=").append(query_id)
|
||||
String command = sb.toString()
|
||||
logger.info(command)
|
||||
def process = command.execute()
|
||||
def code = process.waitFor()
|
||||
def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
|
||||
def out = process.getText()
|
||||
logger.info("Get profile: code=" + code + ", out=" + out + ", err=" + err)
|
||||
assertEquals(code, 0)
|
||||
def json = parseJson(out)
|
||||
assertEquals("success", json.msg.toLowerCase())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.info("only one fe, skip test connect to observer fe")
|
||||
}
|
||||
} finally {
|
||||
}
|
||||
// // test connect to observer fe
|
||||
// try {
|
||||
// def fes = sql_return_maparray "show frontends"
|
||||
// logger.info("frontends: ${fes}")
|
||||
// if (fes.size() > 1) {
|
||||
// def observer_fe = null
|
||||
// for (def fe : fes) {
|
||||
// if (fe.IsMaster == "false") {
|
||||
// observer_fe = fe
|
||||
// break
|
||||
// }
|
||||
// }
|
||||
// if (observer_fe != null) {
|
||||
// def url = "jdbc:mysql://${observer_fe.Host}:${observer_fe.QueryPort}/"
|
||||
// logger.info("observer url: " + url)
|
||||
// connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = url) {
|
||||
// sql """ set enable_insert_group_commit = true; """
|
||||
// sql """ set enable_nereids_dml = false; """
|
||||
// sql """ set enable_profile= true; """
|
||||
//
|
||||
// // 1. insert into
|
||||
// def server_info = group_commit_insert """ insert into ${table}(name, id) values('c', 3); """, 1
|
||||
// assertTrue(server_info.contains('query_id'))
|
||||
// // get query_id, such as 43f87963586a482a-b0496bcf9e2b5555
|
||||
// def query_id_index = server_info.indexOf("'query_id':'") + "'query_id':'".length()
|
||||
// def query_id = server_info.substring(query_id_index, query_id_index + 33)
|
||||
// logger.info("query_id: " + query_id)
|
||||
// // 2. check profile
|
||||
// StringBuilder sb = new StringBuilder();
|
||||
// sb.append("curl -X GET -u ${context.config.jdbcUser}:${context.config.jdbcPassword} http://${observer_fe.Host}:${observer_fe.HttpPort}")
|
||||
// sb.append("/api/profile?query_id=").append(query_id)
|
||||
// String command = sb.toString()
|
||||
// logger.info(command)
|
||||
// def process = command.execute()
|
||||
// def code = process.waitFor()
|
||||
// def err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
|
||||
// def out = process.getText()
|
||||
// logger.info("Get profile: code=" + code + ", out=" + out + ", err=" + err)
|
||||
// assertEquals(code, 0)
|
||||
// def json = parseJson(out)
|
||||
// assertEquals("success", json.msg.toLowerCase())
|
||||
// }
|
||||
// }
|
||||
// } else {
|
||||
// logger.info("only one fe, skip test connect to observer fe")
|
||||
// }
|
||||
// } finally {
|
||||
// }
|
||||
|
||||
// table with array type
|
||||
tableName = "insert_group_commit_into_duplicate_array"
|
||||
|
||||
@ -70,7 +70,8 @@ suite("test_group_commit_interval_ms_property") {
|
||||
sql """ set enable_nereids_dml = false; """
|
||||
}
|
||||
|
||||
qt_1 "show create table ${test_table}"
|
||||
def res1 = sql """show create table ${test_table}"""
|
||||
assertTrue(res1.toString().contains("\"group_commit_interval_ms\" = \"10000\""))
|
||||
|
||||
def msg1 = group_commit_insert """insert into ${test_table} values(1,1); """, 1
|
||||
|
||||
@ -82,7 +83,8 @@ suite("test_group_commit_interval_ms_property") {
|
||||
|
||||
sql "ALTER table ${test_table} SET (\"group_commit_interval_ms\"=\"1000\"); "
|
||||
|
||||
qt_2 "show create table ${test_table}"
|
||||
def res2 = sql """show create table ${test_table}"""
|
||||
assertTrue(res2.toString().contains("\"group_commit_interval_ms\" = \"1000\""))
|
||||
|
||||
def msg3 = group_commit_insert """insert into ${test_table} values(3,3); """, 1
|
||||
|
||||
|
||||
@ -50,8 +50,6 @@ suite("test_group_commit_http_stream_lineitem_multiple_client") {
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertTrue(json.GroupCommit)
|
||||
assertTrue(json.Label.startsWith("group_commit_"))
|
||||
assertEquals(total_rows, json.NumberTotalRows)
|
||||
assertEquals(loaded_rows, json.NumberLoadedRows)
|
||||
assertEquals(filtered_rows, json.NumberFilteredRows)
|
||||
|
||||
@ -74,8 +74,6 @@ PROPERTIES (
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertTrue(json.GroupCommit)
|
||||
assertTrue(json.Label.startsWith("group_commit_"))
|
||||
assertEquals(total_rows, json.NumberTotalRows)
|
||||
assertEquals(loaded_rows, json.NumberLoadedRows)
|
||||
assertEquals(filtered_rows, json.NumberFilteredRows)
|
||||
|
||||
@ -39,8 +39,6 @@ suite("test_group_commit_http_stream_lineitem_normal") {
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertTrue(json.GroupCommit)
|
||||
assertTrue(json.Label.startsWith("group_commit_"))
|
||||
assertEquals(total_rows, json.NumberTotalRows)
|
||||
assertEquals(loaded_rows, json.NumberLoadedRows)
|
||||
assertEquals(filtered_rows, json.NumberFilteredRows)
|
||||
@ -52,7 +50,7 @@ suite("test_group_commit_http_stream_lineitem_normal") {
|
||||
}
|
||||
}
|
||||
def db = "regression_test_insert_p2"
|
||||
def stream_load_table = "test_stream_load_lineitem_normal_sf1"
|
||||
def stream_load_table = "test_http_stream_lineitem_normal_sf1"
|
||||
def create_stream_load_table = {
|
||||
sql """ drop table if exists ${stream_load_table}; """
|
||||
|
||||
|
||||
@ -75,8 +75,6 @@ suite("test_group_commit_http_stream_lineitem_schema_change") {
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertTrue(json.GroupCommit)
|
||||
assertTrue(json.Label.startsWith("group_commit_"))
|
||||
assertEquals(total_rows, json.NumberTotalRows)
|
||||
assertEquals(loaded_rows, json.NumberLoadedRows)
|
||||
assertEquals(filtered_rows, json.NumberFilteredRows)
|
||||
@ -355,7 +353,7 @@ l_comment) select c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c14, c15, c
|
||||
|
||||
|
||||
def process = { table_name ->
|
||||
for (int i = 1; i <= 5; i++) {
|
||||
for (int i = 1; i <= 4; i++) {
|
||||
switch (i) {
|
||||
case SC.TRUNCATE_TABLE.value:
|
||||
truncate(table_name)
|
||||
|
||||
@ -35,14 +35,14 @@ String[] getFiles(String dirName, int num) {
|
||||
suite("test_group_commit_insert_into_lineitem_multiple_client") {
|
||||
String[] file_array;
|
||||
def prepare = {
|
||||
def dataDir = "${context.config.cacheDataPath}/lineitem/"
|
||||
def dataDir = "${context.config.cacheDataPath}/insert_into_lineitem_multiple_client/"
|
||||
File dir = new File(dataDir)
|
||||
if (!dir.exists()) {
|
||||
new File("${context.config.cacheDataPath}/lineitem/").mkdir()
|
||||
new File("${context.config.cacheDataPath}/insert_into_lineitem_multiple_client/").mkdir()
|
||||
for (int i = 1; i <= 10; i++) {
|
||||
logger.info("download lineitem.tbl.${i}")
|
||||
def download_file = """/usr/bin/curl ${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i}
|
||||
--output ${context.config.cacheDataPath}/lineitem/lineitem.tbl.${i}""".execute().getText()
|
||||
--output ${context.config.cacheDataPath}/insert_into_lineitem_multiple_client/lineitem.tbl.${i}""".execute().getText()
|
||||
}
|
||||
}
|
||||
file_array = getFiles(dataDir, 10)
|
||||
|
||||
@ -35,14 +35,14 @@ String[] getFiles(String dirName, int num) {
|
||||
suite("test_group_commit_insert_into_lineitem_multiple_table") {
|
||||
String[] file_array;
|
||||
def prepare = {
|
||||
def dataDir = "${context.config.cacheDataPath}/lineitem/"
|
||||
def dataDir = "${context.config.cacheDataPath}/insert_into_lineitem_multiple_table/"
|
||||
File dir = new File(dataDir)
|
||||
if (!dir.exists()) {
|
||||
new File("${context.config.cacheDataPath}/lineitem/").mkdir()
|
||||
new File("${context.config.cacheDataPath}/insert_into_lineitem_multiple_table/").mkdir()
|
||||
for (int i = 1; i <= 10; i++) {
|
||||
logger.info("download lineitem.tbl.${i}")
|
||||
def download_file = """/usr/bin/curl ${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i}
|
||||
--output ${context.config.cacheDataPath}/lineitem/lineitem.tbl.${i}""".execute().getText()
|
||||
--output ${context.config.cacheDataPath}/insert_into_lineitem_multiple_table/lineitem.tbl.${i}""".execute().getText()
|
||||
}
|
||||
}
|
||||
file_array = getFiles(dataDir, 10)
|
||||
|
||||
@ -31,14 +31,14 @@ String[] getFiles(String dirName, int num) {
|
||||
suite("test_group_commit_insert_into_lineitem_normal") {
|
||||
String[] file_array;
|
||||
def prepare = {
|
||||
def dataDir = "${context.config.cacheDataPath}/lineitem/"
|
||||
def dataDir = "${context.config.cacheDataPath}/insert_into_lineitem_normal/"
|
||||
File dir = new File(dataDir)
|
||||
if (!dir.exists()) {
|
||||
new File("${context.config.cacheDataPath}/lineitem/").mkdir()
|
||||
new File("${context.config.cacheDataPath}/insert_into_lineitem_normal/").mkdir()
|
||||
for (int i = 1; i <= 10; i++) {
|
||||
logger.info("download lineitem.tbl.${i}")
|
||||
def download_file = """/usr/bin/curl ${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i}
|
||||
--output ${context.config.cacheDataPath}/lineitem/lineitem.tbl.${i}""".execute().getText()
|
||||
--output ${context.config.cacheDataPath}/insert_into_lineitem_normal/lineitem.tbl.${i}""".execute().getText()
|
||||
}
|
||||
}
|
||||
file_array = getFiles(dataDir, 10)
|
||||
|
||||
@ -63,14 +63,14 @@ String[] getFiles(String dirName, int num) {
|
||||
suite("test_group_commit_insert_into_lineitem_scheme_change") {
|
||||
String[] file_array;
|
||||
def prepare = {
|
||||
def dataDir = "${context.config.cacheDataPath}/lineitem/"
|
||||
def dataDir = "${context.config.cacheDataPath}/insert_into_lineitem_scheme_change/"
|
||||
File dir = new File(dataDir)
|
||||
if (!dir.exists()) {
|
||||
new File("${context.config.cacheDataPath}/lineitem/").mkdir()
|
||||
new File("${context.config.cacheDataPath}/insert_into_lineitem_scheme_change/").mkdir()
|
||||
for (int i = 1; i <= 10; i++) {
|
||||
logger.info("download lineitem.tbl.${i}")
|
||||
def download_file = """/usr/bin/curl ${getS3Url()}/regression/tpch/sf1/lineitem.tbl.${i}
|
||||
--output ${context.config.cacheDataPath}/lineitem/lineitem.tbl.${i}""".execute().getText()
|
||||
--output ${context.config.cacheDataPath}/insert_into_lineitem_scheme_change/lineitem.tbl.${i}""".execute().getText()
|
||||
}
|
||||
}
|
||||
file_array = getFiles(dataDir, 10)
|
||||
@ -417,7 +417,7 @@ PROPERTIES (
|
||||
|
||||
|
||||
def process = { table_name ->
|
||||
for (int i = 1; i <= 5; i++) {
|
||||
for (int i = 1; i <= 4; i++) {
|
||||
switch (i) {
|
||||
case SC.TRUNCATE_TABLE.value:
|
||||
truncate(table_name)
|
||||
|
||||
@ -51,8 +51,6 @@ l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipin
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertTrue(json.GroupCommit)
|
||||
assertTrue(json.Label.startsWith("group_commit_"))
|
||||
assertEquals(total_rows, json.NumberTotalRows)
|
||||
assertEquals(loaded_rows, json.NumberLoadedRows)
|
||||
assertEquals(filtered_rows, json.NumberFilteredRows)
|
||||
|
||||
@ -75,8 +75,6 @@ PROPERTIES (
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertTrue(json.GroupCommit)
|
||||
assertTrue(json.Label.startsWith("group_commit_"))
|
||||
assertEquals(total_rows, json.NumberTotalRows)
|
||||
assertEquals(loaded_rows, json.NumberLoadedRows)
|
||||
assertEquals(filtered_rows, json.NumberFilteredRows)
|
||||
|
||||
@ -39,8 +39,6 @@ suite("test_group_commit_stream_load_lineitem_normal") {
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertTrue(json.GroupCommit)
|
||||
assertTrue(json.Label.startsWith("group_commit_"))
|
||||
assertEquals(total_rows, json.NumberTotalRows)
|
||||
assertEquals(loaded_rows, json.NumberLoadedRows)
|
||||
assertEquals(filtered_rows, json.NumberFilteredRows)
|
||||
|
||||
@ -76,8 +76,6 @@ l_tax, l_returnflag,l_linestatus, l_shipdate,l_commitdate,l_receiptdate,l_shipin
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertTrue(json.GroupCommit)
|
||||
assertTrue(json.Label.startsWith("group_commit_"))
|
||||
assertEquals(total_rows, json.NumberTotalRows)
|
||||
assertEquals(loaded_rows, json.NumberLoadedRows)
|
||||
assertEquals(filtered_rows, json.NumberFilteredRows)
|
||||
@ -323,7 +321,7 @@ PROPERTIES (
|
||||
|
||||
|
||||
def process = { table_name ->
|
||||
for (int i = 1; i <= 5; i++) {
|
||||
for (int i = 1; i <= 4; i++) {
|
||||
switch (i) {
|
||||
case SC.TRUNCATE_TABLE.value:
|
||||
truncate(table_name)
|
||||
|
||||
@ -69,6 +69,17 @@ suite("test_group_commit_http_stream") {
|
||||
}
|
||||
}
|
||||
|
||||
def checkStreamLoadResult2 = { exception, result ->
|
||||
if (exception != null) {
|
||||
throw exception
|
||||
}
|
||||
log.info("Stream load result: ${result}".toString())
|
||||
def json = parseJson(result)
|
||||
assertEquals("success", json.Status.toLowerCase())
|
||||
assertTrue(json.GroupCommit)
|
||||
assertTrue(json.Label.startsWith("group_commit_"))
|
||||
}
|
||||
|
||||
try {
|
||||
// create table
|
||||
sql """ drop table if exists ${tableName}; """
|
||||
@ -166,7 +177,8 @@ suite("test_group_commit_http_stream") {
|
||||
|
||||
check { result, exception, startTime, endTime ->
|
||||
// TODO different with stream load: 2, 1, 0, 1
|
||||
checkStreamLoadResult(exception, result, 1, 1, 0, 0)
|
||||
//checkStreamLoadResult(exception, result, 1, 1, 0, 0)
|
||||
checkStreamLoadResult2(exception, result)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user