[Fix](group commit) Fix wal mem back pressure fault injection case (#29493)
This commit is contained in:
@ -34,7 +34,7 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
|
||||
RETURN_IF_ERROR(status);
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
while (!runtime_state->is_cancelled() && status.ok() &&
|
||||
_all_block_queues_bytes->load(std::memory_order_relaxed) >
|
||||
_all_block_queues_bytes->load(std::memory_order_relaxed) >=
|
||||
config::group_commit_queue_mem_limit) {
|
||||
_put_cond.wait_for(l,
|
||||
std::chrono::milliseconds(LoadBlockQueue::MEM_BACK_PRESSURE_WAIT_TIME));
|
||||
|
||||
@ -20,53 +20,17 @@ suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") {
|
||||
|
||||
def tableName = "wal_test"
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS `wal_baseall` (
|
||||
`k0` boolean null comment "",
|
||||
`k1` tinyint(4) null comment "",
|
||||
`k2` smallint(6) null comment "",
|
||||
`k3` int(11) null comment "",
|
||||
`k4` bigint(20) null comment "",
|
||||
`k5` decimal(9, 3) null comment "",
|
||||
`k6` char(5) null comment "",
|
||||
`k10` date null comment "",
|
||||
`k11` datetime null comment "",
|
||||
`k7` varchar(20) null comment "",
|
||||
`k8` double max null comment "",
|
||||
`k9` float sum null comment "",
|
||||
`k12` string replace null comment "",
|
||||
`k13` largeint(40) replace null comment ""
|
||||
) engine=olap
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
|
||||
"""
|
||||
|
||||
sql """
|
||||
CREATE TABLE IF NOT EXISTS `wal_test` (
|
||||
`k0` boolean null comment "",
|
||||
`k1` tinyint(4) null comment "",
|
||||
`k2` smallint(6) null comment "",
|
||||
`k3` int(11) null comment "",
|
||||
`k4` bigint(20) null comment "",
|
||||
`k5` decimal(9, 3) null comment "",
|
||||
`k6` char(5) null comment "",
|
||||
`k10` date null comment "",
|
||||
`k11` datetime null comment "",
|
||||
`k7` varchar(20) null comment "",
|
||||
`k8` double max null comment "",
|
||||
`k9` float sum null comment "",
|
||||
`k12` string replace_if_not_null null comment "",
|
||||
`k13` largeint(40) replace null comment ""
|
||||
CREATE TABLE IF NOT EXISTS ${tableName} (
|
||||
`k` int ,
|
||||
`v` int ,
|
||||
) engine=olap
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
|
||||
DISTRIBUTED BY HASH(`k`)
|
||||
BUCKETS 5
|
||||
properties("replication_num" = "1")
|
||||
"""
|
||||
|
||||
streamLoad {
|
||||
table "wal_baseall"
|
||||
db "regression_test_fault_injection_p0"
|
||||
set 'column_separator', ','
|
||||
file "baseall.txt"
|
||||
}
|
||||
|
||||
def enable_back_pressure = {
|
||||
try {
|
||||
def fes = sql_return_maparray "show frontends"
|
||||
@ -76,14 +40,23 @@ suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") {
|
||||
def be = bes[0]
|
||||
def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/"
|
||||
logger.info("observer url: " + url)
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
|
||||
sb.append("/rest/v2/manager/node/set_config/be")
|
||||
sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"")
|
||||
sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\": \\"false\\"}}\"""")
|
||||
String command = sb.toString()
|
||||
logger.info(command)
|
||||
def process = command.execute()
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
|
||||
sb.append("/rest/v2/manager/node/set_config/be")
|
||||
sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"")
|
||||
sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\": \\"false\\"}}\"""")
|
||||
String command = sb.toString()
|
||||
logger.info(command)
|
||||
def process = command.execute()
|
||||
|
||||
sb = new StringBuilder();
|
||||
sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
|
||||
sb.append("/rest/v2/manager/node/set_config/be")
|
||||
sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"")
|
||||
sb.append(""" -d \"{\\"group_commit_memory_rows_for_max_filter_ratio\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"0\\",\\"persist\\": \\"false\\"}}\"""")
|
||||
command = sb.toString()
|
||||
logger.info(command)
|
||||
process = command.execute()
|
||||
} finally {
|
||||
}
|
||||
}
|
||||
@ -97,14 +70,23 @@ suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") {
|
||||
def be = bes[0]
|
||||
def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/"
|
||||
logger.info("observer url: " + url)
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
|
||||
sb.append("/rest/v2/manager/node/set_config/be")
|
||||
sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"")
|
||||
sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"67108864\\",\\"persist\\": \\"false\\"}}\"""")
|
||||
String command = sb.toString()
|
||||
logger.info(command)
|
||||
def process = command.execute()
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
|
||||
sb.append("/rest/v2/manager/node/set_config/be")
|
||||
sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"")
|
||||
sb.append(""" -d \"{\\"group_commit_queue_mem_limit\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"67108864\\",\\"persist\\": \\"false\\"}}\"""")
|
||||
String command = sb.toString()
|
||||
logger.info(command)
|
||||
def process = command.execute()
|
||||
|
||||
sb = new StringBuilder();
|
||||
sb.append("curl -X POST http://${fe.Host}:${fe.HttpPort}")
|
||||
sb.append("/rest/v2/manager/node/set_config/be")
|
||||
sb.append(" -H \"Content-Type: application/json\" -H \"Authorization: Basic cm9vdDo= \"")
|
||||
sb.append(""" -d \"{\\"group_commit_memory_rows_for_max_filter_ratio\\": {\\"node\\": [\\"${be.Host}:${be.HttpPort}\\"],\\"value\\": \\"10000\\",\\"persist\\": \\"false\\"}}\"""")
|
||||
command = sb.toString()
|
||||
logger.info(command)
|
||||
process = command.execute()
|
||||
} finally {
|
||||
}
|
||||
}
|
||||
@ -114,28 +96,30 @@ suite("test_wal_mem_back_pressure_fault_injection","nonConcurrent") {
|
||||
def thread1 = new Thread({
|
||||
sql """ set group_commit = async_mode; """
|
||||
try {
|
||||
sql """insert into ${tableName} select * from wal_baseall where k1 <= 3"""
|
||||
sql """insert into ${tableName} values(1,1)"""
|
||||
} catch (Exception e) {
|
||||
logger.info(e.getMessage())
|
||||
assertTrue(e.getMessage().contains('Communications link failure'))
|
||||
}
|
||||
disable_back_pressure()
|
||||
finish = true
|
||||
} finally {
|
||||
finish = true
|
||||
}
|
||||
})
|
||||
thread1.start()
|
||||
|
||||
for(int i = 0;i<10;i++){
|
||||
while(!finish){
|
||||
def processList = sql "show processlist"
|
||||
logger.info(processList.toString())
|
||||
processList.each { item ->
|
||||
logger.info(item[1].toString())
|
||||
logger.info(item[11].toString())
|
||||
if (item[11].toString() == "insert into ${tableName} select * from wal_baseall where k1 <= 3".toString()){
|
||||
if (item[11].toString() == "".toString()){
|
||||
def res = sql "kill ${item[1]}"
|
||||
logger.info(res.toString())
|
||||
}
|
||||
}
|
||||
sleep(1000)
|
||||
}
|
||||
disable_back_pressure()
|
||||
|
||||
thread1.join()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user