[test](regression-test) use unified trigger_and_wait_compaction metho… (#45908)

This commit is contained in:
py023
2024-12-25 13:59:51 +08:00
committed by GitHub
parent a6e4497811
commit 303f6bd849
60 changed files with 344 additions and 1605 deletions

View File

@ -0,0 +1,156 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.apache.doris.regression.suite.Suite
import java.util.concurrent.TimeUnit
import org.awaitility.Awaitility;
Suite.metaClass.be_get_compaction_status{ String ip, String port, String tablet_id /* param */->
return curl("GET", String.format("http://%s:%s/api/compaction/run_status?tablet_id=%s", ip, port, tablet_id))
}
Suite.metaClass.be_get_overall_compaction_status{ String ip, String port /* param */->
return curl("GET", String.format("http://%s:%s/api/compaction/run_status", ip, port))
}
Suite.metaClass.be_show_tablet_status{ String ip, String port, String tablet_id /* param */->
return curl("GET", String.format("http://%s:%s/api/compaction/show?tablet_id=%s", ip, port, tablet_id))
}
Suite.metaClass._be_run_compaction = { String ip, String port, String tablet_id, String compact_type ->
return curl("POST", String.format("http://%s:%s/api/compaction/run?tablet_id=%s&compact_type=%s",
ip, port, tablet_id, compact_type))
}
Suite.metaClass.be_run_base_compaction = { String ip, String port, String tablet_id /* param */->
return _be_run_compaction(ip, port, tablet_id, "base")
}
logger.info("Added 'be_run_base_compaction' function to Suite")
Suite.metaClass.be_run_cumulative_compaction = { String ip, String port, String tablet_id /* param */->
return _be_run_compaction(ip, port, tablet_id, "cumulative")
}
logger.info("Added 'be_run_cumulative_compaction' function to Suite")
Suite.metaClass.be_run_full_compaction = { String ip, String port, String tablet_id /* param */->
return _be_run_compaction(ip, port, tablet_id, "full")
}
Suite.metaClass.be_run_full_compaction_by_table_id = { String ip, String port, String table_id /* param */->
return curl("POST", String.format("http://%s:%s/api/compaction/run?table_id=%s&compact_type=full", ip, port, table_id))
}
logger.info("Added 'be_run_full_compaction' function to Suite")
Suite.metaClass.trigger_and_wait_compaction = { String table_name, String compaction_type, int timeout_seconds=300 ->
if (!(compaction_type in ["cumulative", "base", "full"])) {
throw new IllegalArgumentException("invalid compaction type: ${compaction_type}, supported types: cumulative, base, full")
}
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
def tablets = sql_return_maparray """show tablets from ${table_name}"""
def exit_code, stdout, stderr
def auto_compaction_disabled = sql("show create table ${table_name}")[0][1].contains('"disable_auto_compaction" = "true"')
def is_time_series_compaction = sql("show create table ${table_name}")[0][1].contains('"compaction_policy" = "time_series"')
// 1. cache compaction status
def be_tablet_compaction_status = [:]
for (tablet in tablets) {
def be_host = backendId_to_backendIP["${tablet.BackendId}"]
def be_port = backendId_to_backendHttpPort["${tablet.BackendId}"]
(exit_code, stdout, stderr) = be_show_tablet_status(be_host, be_port, tablet.TabletId)
assert exit_code == 0: "get tablet status failed, exit code: ${exit_code}, stdout: ${stdout}, stderr: ${stderr}"
def tabletStatus = parseJson(stdout.trim())
be_tablet_compaction_status.put("${be_host}-${tablet.TabletId}", tabletStatus)
}
// 2. trigger compaction
def triggered_tablets = []
for (tablet in tablets) {
def be_host = backendId_to_backendIP["${tablet.BackendId}"]
def be_port = backendId_to_backendHttpPort["${tablet.BackendId}"]
switch (compaction_type) {
case "cumulative":
(exit_code, stdout, stderr) = be_run_cumulative_compaction(be_host, be_port, tablet.TabletId)
break
case "base":
(exit_code, stdout, stderr) = be_run_base_compaction(be_host, be_port, tablet.TabletId)
break
case "full":
(exit_code, stdout, stderr) = be_run_full_compaction(be_host, be_port, tablet.TabletId)
break
}
assert exit_code == 0: "trigger compaction failed, exit code: ${exit_code}, stdout: ${stdout}, stderr: ${stderr}"
def trigger_status = parseJson(stdout.trim())
if (trigger_status.status.toLowerCase() != "success") {
if (trigger_status.status.toLowerCase() == "already_exist") {
triggered_tablets.add(tablet) // compaction already in queue, treat it as successfully triggered
} else if (!auto_compaction_disabled) {
// ignore the error if auto compaction enabled
} else {
throw new Exception("trigger compaction failed, be host: ${be_host}, tablet id: ${tablet.TabletId}, status: ${trigger_status.status}")
}
} else {
triggered_tablets.add(tablet)
}
}
// 3. wait all compaction finished
def running = triggered_tablets.size() > 0
Awaitility.await().atMost(timeout_seconds, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).until(() -> {
for (tablet in triggered_tablets) {
def be_host = backendId_to_backendIP["${tablet.BackendId}"]
def be_port = backendId_to_backendHttpPort["${tablet.BackendId}"]
(exit_code, stdout, stderr) = be_get_compaction_status(be_host, be_port, tablet.TabletId)
assert exit_code == 0: "get compaction status failed, exit code: ${exit_code}, stdout: ${stdout}, stderr: ${stderr}"
def compactionStatus = parseJson(stdout.trim())
assert compactionStatus.status.toLowerCase() == "success": "compaction failed, be host: ${be_host}, tablet id: ${tablet.TabletId}, status: ${compactionStatus.status}"
// running is true means compaction is still running
running = compactionStatus.run_status
if (!isCloudMode() && !is_time_series_compaction) {
(exit_code, stdout, stderr) = be_show_tablet_status(be_host, be_port, tablet.TabletId)
assert exit_code == 0: "get tablet status failed, exit code: ${exit_code}, stdout: ${stdout}, stderr: ${stderr}"
def tabletStatus = parseJson(stdout.trim())
def oldStatus = be_tablet_compaction_status.get("${be_host}-${tablet.TabletId}")
// last compaction success time isn't updated, indicates compaction is not started(so we treat it as running and wait)
running = running || (oldStatus["last ${compaction_type} success time"] == tabletStatus["last ${compaction_type} success time"])
if (running) {
logger.info("compaction is still running, be host: ${be_host}, tablet id: ${tablet.TabletId}, run status: ${compactionStatus.run_status}, old status: ${oldStatus}, new status: ${tabletStatus}")
return false
}
} else {
// 1. cloud mode doesn't show compaction success time in tablet status for the time being,
// 2. time series compaction sometimes doesn't update compaction success time
// so we solely check run_status for these two cases
if (running) {
logger.info("compaction is still running, be host: ${be_host}, tablet id: ${tablet.TabletId}")
return false
}
}
}
return true
})
assert !running: "wait compaction timeout, be host: ${be_host}"
}

View File

@ -27,6 +27,7 @@ import org.apache.http.conn.ConnectTimeoutException
import org.apache.http.conn.HttpHostConnectException
import org.codehaus.groovy.runtime.IOGroovyMethods
Suite.metaClass.http_client = { String method, String url /* param */ ->
Suite suite = delegate as Suite
if (method != "GET" && method != "POST") {
@ -35,7 +36,7 @@ Suite.metaClass.http_client = { String method, String url /* param */ ->
if (!url || !(url =~ /^https?:\/\/.+/)) {
throw new Exception("Invalid url: ${url}")
}
Integer timeout = 300 // seconds
Integer maxRetries = 10
Integer retryCount = 0
@ -119,7 +120,7 @@ Suite.metaClass.curl = { String method, String url, String body = null /* param
if (url.isBlank()) {
throw new Exception("invalid curl url, blank")
}
Integer timeout = 10; // 10 seconds;
Integer maxRetries = 10; // Maximum number of retries
Integer retryCount = 0; // Current retry count
@ -161,10 +162,8 @@ Suite.metaClass.curl = { String method, String url, String body = null /* param
return [code, out, err]
}
logger.info("Added 'curl' function to Suite")
Suite.metaClass.show_be_config = { String ip, String port /*param */ ->
return curl("GET", String.format("http://%s:%s/api/show_config", ip, port))
}
@ -231,7 +230,6 @@ Suite.metaClass.update_all_be_config = { String key, Object value ->
logger.info("Added 'update_all_be_config' function to Suite")
Suite.metaClass._be_report = { String ip, int port, String reportName ->
def url = "http://${ip}:${port}/api/report/${reportName}"
def result = Http.GET(url, true)

View File

@ -88,32 +88,7 @@ suite('compaction_width_array_column', "p2") {
while (isOverLap && tryCnt < 3) {
isOverLap = false
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
assertEquals("success", compactJson.status.toLowerCase())
}
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "cumulative")
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
(code, out, err) = curl("GET", tablet.CompactionStatus)

View File

@ -27,7 +27,7 @@ suite("test_base_compaction") {
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())

View File

@ -19,11 +19,11 @@ import org.codehaus.groovy.runtime.IOGroovyMethods
suite("test_base_compaction_with_dup_key_max_file_size_limit", "p2") {
def tableName = "test_base_compaction_with_dup_key_max_file_size_limit"
// use customer table of tpch_sf100
def rows = 15000000
def load_tpch_sf100_customer = {
def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
def load_tpch_sf100_customer = {
def uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
def rowCount = sql "select count(*) from ${tableName}"
def s3BucketName = getS3BucketName()
def s3WithProperties = """WITH S3 (
@ -62,7 +62,7 @@ suite("test_base_compaction_with_dup_key_max_file_size_limit", "p2") {
}
sleep(5000)
}
}
}
}
try {
String backend_id;
@ -72,7 +72,7 @@ suite("test_base_compaction_with_dup_key_max_file_size_limit", "p2") {
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
@ -106,29 +106,6 @@ suite("test_base_compaction_with_dup_key_max_file_size_limit", "p2") {
}
assertEquals(code, 0)
return out
}
def waitForCompaction = { be_host, be_http_port, tablet_id ->
// wait for all compactions done
boolean running = true
do {
Thread.sleep(1000)
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://${be_host}:${be_http_port}")
sb.append("/api/compaction/run_status?tablet_id=")
sb.append(tablet_id)
String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
out = process.getText()
logger.info("Get compaction status: code=" + code + ", out=" + out)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
sql """ DROP TABLE IF EXISTS ${tableName}; """
@ -148,7 +125,7 @@ suite("test_base_compaction_with_dup_key_max_file_size_limit", "p2") {
PROPERTIES (
"replication_num" = "1", "disable_auto_compaction" = "true"
)
"""
"""
def tablet = (sql_return_maparray """ show tablets from ${tableName}; """)[0]
String tablet_id = tablet.TabletId
@ -164,10 +141,7 @@ suite("test_base_compaction_with_dup_key_max_file_size_limit", "p2") {
// [0-1] 0
// [2-2] 1G nooverlapping
// cp: 3
assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id],
"cumulative", tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
trigger_and_wait_compaction(tableName, "cumulative")
// rowsets:
// [0-1] 0
@ -180,21 +154,15 @@ suite("test_base_compaction_with_dup_key_max_file_size_limit", "p2") {
// [0-1] 0
// [2-2] 1G nooverlapping
// [3-3] 1G nooverlapping
// cp: 4
assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id],
"cumulative", tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
// cp: 4
trigger_and_wait_compaction(tableName, "cumulative")
// The conditions for base compaction have been satisfied.
// Since the size of first input rowset is 0, there is no file size limitation. (maybe fix it?)
// rowsets:
// [0-3] 2G nooverlapping
// cp: 4
assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id],
"base", tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
// cp: 4
trigger_and_wait_compaction(tableName, "base")
// rowsets:
// [0-3] 2G nooverlapping
@ -206,10 +174,7 @@ suite("test_base_compaction_with_dup_key_max_file_size_limit", "p2") {
// [0-3] 2G nooverlapping
// [4-4] 1G nooverlapping
// cp: 5
assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id],
"cumulative", tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id], tablet_id)
trigger_and_wait_compaction(tableName, "cumulative")
// Due to the limit of config::base_compaction_dup_key_max_file_size_mbytes(1G),
// can not do base compaction, return E-808
@ -217,6 +182,7 @@ suite("test_base_compaction_with_dup_key_max_file_size_limit", "p2") {
// [0-3] 2G nooverlapping
// [4-4] 1G nooverlapping
// cp: 5
// WHAT: replace with plugin and handle fail?
assertTrue(triggerCompaction(backendId_to_backendIP[trigger_backend_id], backendId_to_backendHttpPort[trigger_backend_id],
"base", tablet_id).contains("E-808"));

View File

@ -99,37 +99,7 @@ suite("test_compaction_with_delete") {
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "cumulative")
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)

View File

@ -26,7 +26,7 @@ suite("test_compaction_agg_keys") {
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
@ -145,7 +145,7 @@ suite("test_compaction_agg_keys") {
(code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List

View File

@ -26,7 +26,7 @@ suite("test_compaction_agg_keys_with_array_map") {
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
@ -96,38 +96,7 @@ suite("test_compaction_agg_keys_with_array_map") {
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "cumulative")
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
@ -137,7 +106,7 @@ suite("test_compaction_agg_keys_with_array_map") {
(code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List

View File

@ -115,44 +115,14 @@ suite("test_compaction_agg_keys_with_delete") {
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "cumulative")
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
int rowCount = 0
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
(code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)

View File

@ -66,53 +66,6 @@ suite("test_compaction_cumu_delete") {
return
}
def triggerCompaction = { be_host, be_http_port, compact_type ->
// trigger compactions for all tablets in ${tableName}
String tablet_id = tablet[0]
StringBuilder sb = new StringBuilder();
sb.append("curl -X POST http://${be_host}:${be_http_port}")
sb.append("/api/compaction/run?tablet_id=")
sb.append(tablet_id)
sb.append("&compact_type=${compact_type}")
String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Run compaction: code=" + code + ", out=" + out + ", disableAutoCompaction " + disableAutoCompaction + ", err=" + err)
if (!disableAutoCompaction) {
return "Success"
}
assertEquals(code, 0)
return out
}
def waitForCompaction = { be_host, be_http_port ->
// wait for all compactions done
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet[0]
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://${be_host}:${be_http_port}")
sb.append("/api/compaction/run_status?tablet_id=")
sb.append(tablet_id)
String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
out = process.getText()
logger.info("Get compaction status: code=" + code + ", out=" + out)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
// insert 11 values for 11 version
sql """ INSERT INTO ${tableName} VALUES (1, "a", 100); """
sql """ INSERT INTO ${tableName} VALUES (2, "a", 100); """
@ -127,7 +80,7 @@ suite("test_compaction_cumu_delete") {
// [0-1] [2-12]
// write some key in version 13, delete it in version 14, write same key in version 15
// make sure the key in version 15 will not be deleted
assertTrue(triggerCompaction(backendId_to_backendIP[backend_id], backendId_to_backendHttpPort[backend_id], "base").contains("Success"));
trigger_and_wait_compaction(tableName, "base")
sql """ INSERT INTO ${tableName} VALUES (4, "a", 100); """
qt_select_default """ SELECT * FROM ${tableName}; """
sql """ DELETE FROM ${tableName} WHERE id = 4; """
@ -147,12 +100,10 @@ suite("test_compaction_cumu_delete") {
sql """ INSERT INTO ${tableName} VALUES (7, "a", 100); """
qt_select_default """ SELECT * FROM ${tableName}; """
assertTrue(triggerCompaction(backendId_to_backendIP[backend_id], backendId_to_backendHttpPort[backend_id], "cumulative").contains("Success"));
waitForCompaction(backendId_to_backendIP[backend_id], backendId_to_backendHttpPort[backend_id])
trigger_and_wait_compaction(tableName, "cumulative")
qt_select_default """ SELECT * FROM ${tableName}; """
assertTrue(triggerCompaction(backendId_to_backendIP[backend_id], backendId_to_backendHttpPort[backend_id], "base").contains("Success"));
waitForCompaction(backendId_to_backendIP[backend_id], backendId_to_backendHttpPort[backend_id])
trigger_and_wait_compaction(tableName, "base")
qt_select_default """ SELECT * FROM ${tableName}; """
} finally {
// try_sql("DROP TABLE IF EXISTS ${tableName}")

View File

@ -28,7 +28,7 @@ suite("test_compaction_dup_keys") {
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
@ -103,38 +103,7 @@ suite("test_compaction_dup_keys") {
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "cumulative")
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
@ -145,7 +114,7 @@ suite("test_compaction_dup_keys") {
(code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
for (String rowset in (List<String>) tabletJson.rowsets) {

View File

@ -28,7 +28,7 @@ suite("test_compaction_dup_keys_with_delete") {
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
@ -104,7 +104,7 @@ suite("test_compaction_dup_keys_with_delete") {
sql """
DELETE FROM ${tableName} where user_id <= 5
"""
sql """ INSERT INTO ${tableName} VALUES
(4, '2017-10-01', '2017-10-01', '2017-10-01 11:11:11.110000', '2017-10-01 11:11:11.110111', 'Beijing', 10, 1, NULL, NULL, NULL, NULL, '2020-01-05', 1, 34, 20)
"""
@ -115,50 +115,18 @@ suite("test_compaction_dup_keys_with_delete") {
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "cumulative")
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
int rowCount = 0
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
(code, out, err) = curl("GET", tablet.CompactionStatus)
logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def tabletJson = parseJson(out.trim())
assert tabletJson.rowsets instanceof List
for (String rowset in (List<String>) tabletJson.rowsets) {

View File

@ -28,7 +28,7 @@ suite("test_compaction_uniq_keys") {
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
@ -137,7 +137,7 @@ suite("test_compaction_uniq_keys") {
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)
int rowCount = 0
for (def tablet in tablets) {
String tablet_id = tablet.TabletId

View File

@ -44,7 +44,7 @@ suite("test_compaction_uniq_keys_row_store", "p0") {
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
@ -159,38 +159,7 @@ suite("test_compaction_uniq_keys_row_store", "p0") {
checkValue()
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "cumulative")
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)

View File

@ -28,7 +28,7 @@ suite("test_compaction_uniq_keys_with_delete") {
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
@ -119,37 +119,7 @@ suite("test_compaction_uniq_keys_with_delete") {
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "cumulative")
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)

View File

@ -46,9 +46,9 @@ suite("test_full_compaction") {
sql """
CREATE TABLE ${tableName} (
`user_id` INT NOT NULL, `value` INT NOT NULL)
UNIQUE KEY(`user_id`)
DISTRIBUTED BY HASH(`user_id`)
BUCKETS 1
UNIQUE KEY(`user_id`)
DISTRIBUTED BY HASH(`user_id`)
BUCKETS 1
PROPERTIES ("replication_allocation" = "tag.location.default: 1",
"disable_auto_compaction" = "true",
"enable_mow_light_delete" = "false",
@ -117,46 +117,9 @@ suite("test_full_compaction") {
assert (rowsetCount == 7 * replicaNum)
// trigger full compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
def times = 1
do{
(code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
++times
sleep(2000)
} while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for full compaction done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "full")
// after full compaction, there is only 1 rowset.
rowsetCount = 0
for (def tablet in tablets) {
String tablet_id = tablet.TabletId

View File

@ -48,9 +48,9 @@ suite("test_full_compaction_by_table_id") {
sql """
CREATE TABLE ${tableName} (
`user_id` INT NOT NULL, `value` INT NOT NULL)
UNIQUE KEY(`user_id`)
DISTRIBUTED BY HASH(`user_id`)
BUCKETS 8
UNIQUE KEY(`user_id`)
DISTRIBUTED BY HASH(`user_id`)
BUCKETS 8
PROPERTIES ("replication_allocation" = "tag.location.default: 1",
"disable_auto_compaction" = "true",
"enable_mow_light_delete" = "false",
@ -116,52 +116,9 @@ suite("test_full_compaction_by_table_id") {
}
// trigger full compactions for all tablets by table id in ${tableName}
// TODO: get table id
for (def tablet : tablets) {
String tablet_id = tablet.TabletId
def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """
logger.info("tablet"+tablet_info)
def table_id = tablet_info[0].TableId
backend_id = tablet.BackendId
def times = 1
def code, out, err
do{
(code, out, err) = be_run_full_compaction_by_table_id(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), table_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
++times
sleep(2000)
} while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for full compaction done
{
for (def tablet : tablets) {
boolean running = true
do {
Thread.sleep(1000)
def tablet_id = tablet.TabletId
backend_id = tablet.BackendId
def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
}
trigger_and_wait_compaction(tableName, "full")
// after full compaction, there is only 1 rowset.
for (def tablet : tablets) {
int rowsetCount = 0
def (code, out, err) = curl("GET", tablet.CompactionStatus)

View File

@ -22,7 +22,7 @@ suite("test_single_compaction_p2", "p2") {
return;
}
def tableName = "test_single_replica_compaction"
def calc_file_crc_on_tablet = { ip, port, tablet ->
return curl("GET", String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet))
}
@ -146,7 +146,7 @@ suite("test_single_compaction_p2", "p2") {
// wait for update replica infos
Thread.sleep(70000)
// find the master be for single replica compaction
Boolean found = false
String master_backend_id;
@ -210,12 +210,12 @@ suite("test_single_compaction_p2", "p2") {
// trigger master be to do compaction
assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id],
"full", tablet_id).contains("Success"));
"full", tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], tablet_id)
// trigger follower be to fetch compaction result
for (String id in follower_backend_id) {
assertTrue(triggerSingleCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id).contains("Success"));
assertTrue(triggerSingleCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id)
}
@ -231,12 +231,12 @@ suite("test_single_compaction_p2", "p2") {
// trigger master be to do compaction with delete
assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id],
"full", tablet_id).contains("Success"));
"full", tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[master_backend_id], backendId_to_backendHttpPort[master_backend_id], tablet_id)
// trigger follower be to fetch compaction result
for (String id in follower_backend_id) {
assertTrue(triggerSingleCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id).contains("Success"));
assertTrue(triggerSingleCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[id], backendId_to_backendHttpPort[id], tablet_id)
}

View File

@ -22,45 +22,6 @@ suite("test_time_series_compaction_polciy", "p0") {
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
def trigger_cumulative_compaction_on_tablets = { tablets ->
for (def tablet : tablets) {
String tablet_id = tablet.TabletId
String backend_id = tablet.BackendId
int times = 1
String compactionStatus;
do{
def (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
++times
sleep(1000)
compactionStatus = parseJson(out.trim()).status.toLowerCase();
} while (compactionStatus!="success" && times<=3)
if (compactionStatus!="success") {
assertTrue(compactionStatus.contains("2000"))
continue;
}
assertEquals("success", compactionStatus)
}
}
def wait_cumulative_compaction_done = { tablets ->
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
String backend_id = tablet.BackendId
def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
}
def get_rowset_count = { tablets ->
int rowsetCount = 0
@ -109,7 +70,7 @@ suite("test_time_series_compaction_polciy", "p0") {
sql """ INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 99); """
sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 100); """
sql """ INSERT INTO ${tableName} VALUES (100, "andy", "andy love apple", 100); """
qt_sql_1 """ select count() from ${tableName} """
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,PathHash,MetaUrl,CompactionStatus
@ -123,17 +84,14 @@ suite("test_time_series_compaction_polciy", "p0") {
assert(false)
}
}
// BUCKETS = 2
// before cumulative compaction, there are 17 * 2 = 34 rowsets.
int rowsetCount = get_rowset_count.call(tablets);
assert (rowsetCount == 34 * replicaNum)
// trigger cumulative compactions for all tablets in table
trigger_cumulative_compaction_on_tablets.call(tablets)
// wait for cumulative compaction done
wait_cumulative_compaction_done.call(tablets)
trigger_and_wait_compaction(tableName, "cumulative")
// after cumulative compaction, there is only 26 rowset.
// 5 consecutive empty versions are merged into one empty version
@ -142,10 +100,7 @@ suite("test_time_series_compaction_polciy", "p0") {
assert (rowsetCount == 26 * replicaNum)
// trigger cumulative compactions for all tablets in ${tableName}
trigger_cumulative_compaction_on_tablets.call(tablets)
// wait for cumulative compaction done
wait_cumulative_compaction_done.call(tablets)
trigger_and_wait_compaction(tableName, "cumulative")
// after cumulative compaction, there is only 22 rowset.
// 26 - 4 = 22
@ -159,10 +114,7 @@ suite("test_time_series_compaction_polciy", "p0") {
sql """ alter table ${tableName} set ("time_series_compaction_file_count_threshold"="10")"""
sql """sync"""
// trigger cumulative compactions for all tablets in ${tableName}
trigger_cumulative_compaction_on_tablets.call(tablets)
// wait for cumulative compaction done
wait_cumulative_compaction_done.call(tablets)
trigger_and_wait_compaction(tableName, "cumulative")
// after cumulative compaction, there is only 11 rowset.
rowsetCount = get_rowset_count.call(tablets);

View File

@ -116,37 +116,7 @@ suite("test_vertical_compaction_agg_keys") {
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "cumulative")
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)

View File

@ -77,37 +77,7 @@ suite("test_vertical_compaction_agg_state") {
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "cumulative")
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)

View File

@ -28,7 +28,7 @@ suite("test_vertical_compaction_dup_keys") {
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
@ -76,7 +76,7 @@ suite("test_vertical_compaction_dup_keys") {
sql """
DELETE from ${tableName} where user_id <= 0
"""
qt_select_default """ SELECT * FROM ${tableName} t ORDER BY user_id,date,city,age,sex,last_visit_date,last_update_date,last_visit_date_not_null,cost,max_dwell_time,min_dwell_time; """
@ -116,37 +116,7 @@ suite("test_vertical_compaction_dup_keys") {
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "cumulative")
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)

View File

@ -113,37 +113,7 @@ suite("test_vertical_compaction_uniq_keys") {
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "cumulative")
def replicaNum = get_table_replica_num(tableName)
logger.info("get table replica num: " + replicaNum)

View File

@ -54,7 +54,7 @@ suite("test_partial_update_skip_compaction", "nonConcurrent") {
}
logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}");
def check_rs_metas = { expected_rs_meta_size, check_func ->
def check_rs_metas = { expected_rs_meta_size, check_func ->
if (isCloudMode()) {
return
}

View File

@ -19,7 +19,7 @@ suite("test_full_compaction_run_status","nonConcurrent") {
def tableName = "full_compaction_run_status_test"
// test successful group commit async load
sql """ DROP TABLE IF EXISTS ${tableName} """
@ -36,8 +36,8 @@ suite("test_full_compaction_run_status","nonConcurrent") {
`k` int ,
`v` int ,
) engine=olap
DISTRIBUTED BY HASH(`k`)
BUCKETS 2
DISTRIBUTED BY HASH(`k`)
BUCKETS 2
properties(
"replication_num" = "1",
"disable_auto_compaction" = "true")

View File

@ -22,7 +22,7 @@ suite("test_full_compaction_with_ordered_data","nonConcurrent") {
return
}
def tableName = "test_full_compaction_with_ordered_data"
sql """ DROP TABLE IF EXISTS ${tableName} """
String backend_id;
@ -39,8 +39,8 @@ suite("test_full_compaction_with_ordered_data","nonConcurrent") {
`v` int ,
) engine=olap
DUPLICATE KEY(k)
DISTRIBUTED BY HASH(k)
BUCKETS 3
DISTRIBUTED BY HASH(k)
BUCKETS 3
properties(
"replication_num" = "1",
"disable_auto_compaction" = "true")
@ -70,38 +70,9 @@ suite("test_full_compaction_with_ordered_data","nonConcurrent") {
assert (rowsetCount == 5 * replicaNum * 3)
// trigger full compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
times = 1
do{
(code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
++times
sleep(2000)
} while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10)
}
// wait for full compaction done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "full")
// after full compaction, there is only 1 rowset.
rowsetCount = 0
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
@ -155,38 +126,9 @@ suite("test_full_compaction_with_ordered_data","nonConcurrent") {
assert (rowsetCount == 12 * replicaNum * 3)
// trigger full compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
times = 1
do{
(code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
++times
sleep(2000)
} while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10)
}
// wait for full compaction done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "full")
// after full compaction, there is only 1 rowset.
rowsetCount = 0
for (def tablet in tablets) {
String tablet_id = tablet.TabletId

View File

@ -28,8 +28,8 @@ suite("test_variant_bloom_filter", "nonConcurrent") {
table "${table_name}"
// set http request header params
set 'read_json_by_line', 'true'
set 'format', 'json'
set 'read_json_by_line', 'true'
set 'format', 'json'
set 'max_filter_ratio', '0.1'
set 'memtable_on_sink_node', 'true'
file file_name // import json file
@ -72,7 +72,7 @@ suite("test_variant_bloom_filter", "nonConcurrent") {
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
def tablets = sql_return_maparray """ show tablets from ${index_table}; """
for (def tablet in tablets) {
int beforeSegmentCount = 0
String tablet_id = tablet.TabletId
@ -88,29 +88,7 @@ suite("test_variant_bloom_filter", "nonConcurrent") {
}
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
assertEquals("success", compactJson.status.toLowerCase())
}
// wait for all compactions done
for (def tablet in tablets) {
Awaitility.await().atMost(3, TimeUnit.MINUTES).untilAsserted(() -> {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("compaction task for this tablet is not running", compactionStatus.msg.toLowerCase())
return compactionStatus.run_status;
});
}
trigger_and_wait_compaction(index_table, "full")
for (def tablet in tablets) {
int afterSegmentCount = 0
@ -126,7 +104,7 @@ suite("test_variant_bloom_filter", "nonConcurrent") {
}
assertEquals(afterSegmentCount, 1)
}
try {
GetDebugPoint().enableDebugPointForAllBEs("bloom_filter_must_filter_data")
@ -139,4 +117,4 @@ suite("test_variant_bloom_filter", "nonConcurrent") {
} finally {
GetDebugPoint().disableDebugPointForAllBEs("bloom_filter_must_filter_data")
}
}
}

View File

@ -142,69 +142,13 @@ suite("test_index_change_with_cumulative_compaction") {
sql """ CREATE INDEX idx_city ON ${tableName}(`city`) USING INVERTED """
wait_for_latest_op_on_table_finish(tableName, timeout)
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
StringBuilder sb = new StringBuilder();
sb.append("curl -X POST http://")
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run?tablet_id=")
sb.append(tablet_id)
sb.append("&compact_type=cumulative")
String command = sb.toString()
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// build index
sql "build index idx_user_id on ${tableName}"
sql "build index idx_date on ${tableName}"
sql "build index idx_city on ${tableName}"
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://")
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run_status?tablet_id=")
sb.append(tablet_id)
String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
// trigger compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "cumulative")
int rowCount = 0
for (def tablet in tablets) {

View File

@ -142,36 +142,6 @@ suite("test_index_change_with_full_compaction") {
sql """ CREATE INDEX idx_city ON ${tableName}(`city`) USING INVERTED """
wait_for_latest_op_on_table_finish(tableName, timeout)
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
StringBuilder sb = new StringBuilder();
sb.append("curl -X POST http://")
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run?tablet_id=")
sb.append(tablet_id)
sb.append("&compact_type=full")
String command = sb.toString()
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// build index
if (!isCloudMode()) {
sql "build index idx_user_id on ${tableName}"
@ -179,34 +149,8 @@ suite("test_index_change_with_full_compaction") {
sql "build index idx_city on ${tableName}"
}
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://")
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run_status?tablet_id=")
sb.append(tablet_id)
String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
// trigger compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "full")
int rowCount = 0
for (def tablet in tablets) {

View File

@ -24,7 +24,7 @@ suite("test_index_compaction_dup_keys", "nonConcurrent") {
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
boolean disableAutoCompaction = false
def set_be_config = { key, value ->
for (String backend_id: backendId_to_backendIP.keySet()) {
def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
@ -32,49 +32,6 @@ suite("test_index_compaction_dup_keys", "nonConcurrent") {
}
}
def trigger_full_compaction_on_tablets = { tablets ->
for (def tablet : tablets) {
String tablet_id = tablet.TabletId
String backend_id = tablet.BackendId
int times = 1
String compactionStatus;
do{
def (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
++times
sleep(2000)
compactionStatus = parseJson(out.trim()).status.toLowerCase();
} while (compactionStatus!="success" && times<=10)
if (compactionStatus == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactionStatus)
}
}
}
def wait_full_compaction_done = { tablets ->
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
String backend_id = tablet.BackendId
def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
}
def get_rowset_count = { tablets ->
int rowsetCount = 0
for (def tablet in tablets) {
@ -110,7 +67,7 @@ suite("test_index_compaction_dup_keys", "nonConcurrent") {
String backend_id;
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
@ -179,10 +136,7 @@ suite("test_index_compaction_dup_keys", "nonConcurrent") {
assert (rowsetCount == 7 * replicaNum)
// trigger full compactions for all tablets in ${tableName}
trigger_full_compaction_on_tablets.call(tablets)
// wait for full compaction done
wait_full_compaction_done.call(tablets)
trigger_and_wait_compaction(tableName, "full")
// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets);
@ -210,10 +164,7 @@ suite("test_index_compaction_dup_keys", "nonConcurrent") {
assert (rowsetCount == 7 * replicaNum)
// trigger full compactions for all tablets in ${tableName}
trigger_full_compaction_on_tablets.call(tablets)
// wait for full compaction done
wait_full_compaction_done.call(tablets)
trigger_and_wait_compaction(tableName, "full")
// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets);

View File

@ -24,7 +24,7 @@ suite("test_index_compaction_unique_keys", "nonConcurrent") {
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
boolean disableAutoCompaction = false
def set_be_config = { key, value ->
for (String backend_id: backendId_to_backendIP.keySet()) {
def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
@ -32,49 +32,6 @@ suite("test_index_compaction_unique_keys", "nonConcurrent") {
}
}
def trigger_full_compaction_on_tablets = { tablets ->
for (def tablet : tablets) {
String tablet_id = tablet.TabletId
String backend_id = tablet.BackendId
int times = 1
String compactionStatus;
do{
def (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
++times
sleep(2000)
compactionStatus = parseJson(out.trim()).status.toLowerCase();
} while (compactionStatus!="success" && times<=10)
if (compactionStatus == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactionStatus)
}
}
}
def wait_full_compaction_done = { tablets ->
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
String backend_id = tablet.BackendId
def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
}
def get_rowset_count = { tablets ->
int rowsetCount = 0
for (def tablet in tablets) {
@ -110,7 +67,7 @@ suite("test_index_compaction_unique_keys", "nonConcurrent") {
String backend_id;
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
@ -146,7 +103,7 @@ suite("test_index_compaction_unique_keys", "nonConcurrent") {
UNIQUE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
PROPERTIES (
"replication_num" = "1",
"disable_auto_compaction" = "true",
"enable_unique_key_merge_on_write" = "true",
@ -184,10 +141,7 @@ suite("test_index_compaction_unique_keys", "nonConcurrent") {
assert (rowsetCount == 7 * replicaNum)
// trigger full compactions for all tablets in ${tableName}
trigger_full_compaction_on_tablets.call(tablets)
// wait for full compaction done
wait_full_compaction_done.call(tablets)
trigger_and_wait_compaction(tableName, "full")
// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets);
@ -215,10 +169,7 @@ suite("test_index_compaction_unique_keys", "nonConcurrent") {
assert (rowsetCount == 7 * replicaNum)
// trigger full compactions for all tablets in ${tableName}
trigger_full_compaction_on_tablets.call(tablets)
// wait for full compaction done
wait_full_compaction_done.call(tablets)
trigger_and_wait_compaction(tableName, "full")
// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets);

View File

@ -24,7 +24,7 @@ suite("test_index_compaction_with_multi_index_segments", "nonConcurrent") {
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
boolean disableAutoCompaction = false
def set_be_config = { key, value ->
for (String backend_id: backendId_to_backendIP.keySet()) {
def (code, out, err) = update_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), key, value)
@ -32,49 +32,6 @@ suite("test_index_compaction_with_multi_index_segments", "nonConcurrent") {
}
}
def trigger_full_compaction_on_tablets = { tablets ->
for (def tablet : tablets) {
String tablet_id = tablet.TabletId
String backend_id = tablet.BackendId
int times = 1
String compactionStatus;
do{
def (code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
++times
sleep(2000)
compactionStatus = parseJson(out.trim()).status.toLowerCase();
} while (compactionStatus!="success" && times<=10)
if (compactionStatus == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactionStatus)
}
}
}
def wait_full_compaction_done = { tablets ->
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
String backend_id = tablet.BackendId
def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
}
def get_rowset_count = { tablets ->
int rowsetCount = 0
for (def tablet in tablets) {
@ -111,7 +68,7 @@ suite("test_index_compaction_with_multi_index_segments", "nonConcurrent") {
try {
String backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
@ -210,10 +167,7 @@ suite("test_index_compaction_with_multi_index_segments", "nonConcurrent") {
assert (rowsetCount == 3 * replicaNum)
// trigger full compactions for all tablets in ${tableName}
trigger_full_compaction_on_tablets.call(tablets)
// wait for full compaction done
wait_full_compaction_done.call(tablets)
trigger_and_wait_compaction(tableName, "full")
// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets)
@ -243,10 +197,7 @@ suite("test_index_compaction_with_multi_index_segments", "nonConcurrent") {
assert (rowsetCount == 2 * replicaNum)
// trigger full compactions for all tablets in ${tableName}
trigger_full_compaction_on_tablets.call(tablets)
// wait for full compaction done
wait_full_compaction_done.call(tablets)
trigger_and_wait_compaction(tableName, "full")
// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets)
@ -322,11 +273,7 @@ suite("test_index_compaction_with_multi_index_segments", "nonConcurrent") {
assert (rowsetCount == 3 * replicaNum)
// trigger full compactions for all tablets in ${tableName}
trigger_full_compaction_on_tablets.call(tablets)
// wait for full compaction done
wait_full_compaction_done.call(tablets)
trigger_and_wait_compaction.call(tableName, "full")
// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets)
assert (rowsetCount == 1 * replicaNum)
@ -355,10 +302,7 @@ suite("test_index_compaction_with_multi_index_segments", "nonConcurrent") {
assert (rowsetCount == 2 * replicaNum)
// trigger full compactions for all tablets in ${tableName}
trigger_full_compaction_on_tablets.call(tablets)
// wait for full compaction done
wait_full_compaction_done.call(tablets)
trigger_and_wait_compaction(tableName, "full")
// after full compaction, there is only 1 rowset.
rowsetCount = get_rowset_count.call(tablets)

View File

@ -40,7 +40,7 @@ suite("test_cumulative_compaction_with_format_v2", "inverted_index_format_v2") {
assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout")
}
def calc_segment_count = { tablet ->
def calc_segment_count = { tablet ->
int segment_count = 0
String tablet_id = tablet.TabletId
StringBuilder sb = new StringBuilder();
@ -163,69 +163,11 @@ suite("test_cumulative_compaction_with_format_v2", "inverted_index_format_v2") {
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
String ip = backendId_to_backendIP.get(backend_id)
String port = backendId_to_backendHttpPort.get(backend_id)
int segment_count = calc_segment_count(tablet)
logger.info("TabletId: " + tablet_id + ", segment_count: " + segment_count)
check_nested_index_file(ip, port, tablet_id, 9, 3, "V2")
StringBuilder sb = new StringBuilder();
sb.append("curl -X POST http://")
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run?tablet_id=")
sb.append(tablet_id)
sb.append("&compact_type=cumulative")
String command = sb.toString()
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
trigger_and_wait_compaction(tableName, "cumulative")
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
do {
Thread.sleep(1000)
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://")
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run_status?tablet_id=")
sb.append(tablet_id)
String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
String ip = backendId_to_backendIP.get(backend_id)
String port = backendId_to_backendHttpPort.get(backend_id)
int segment_count = calc_segment_count(tablet)

View File

@ -40,7 +40,7 @@ suite("test_mor_table_with_format_v2", "inverted_index_format_v2") {
assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout")
}
def calc_segment_count = { tablet ->
def calc_segment_count = { tablet ->
int segment_count = 0
String tablet_id = tablet.TabletId
StringBuilder sb = new StringBuilder();
@ -165,69 +165,12 @@ suite("test_mor_table_with_format_v2", "inverted_index_format_v2") {
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
String ip = backendId_to_backendIP.get(backend_id)
String port = backendId_to_backendHttpPort.get(backend_id)
int segment_count = calc_segment_count(tablet)
logger.info("TabletId: " + tablet_id + ", segment_count: " + segment_count)
check_nested_index_file(ip, port, tablet_id, 10, 3, "V2")
StringBuilder sb = new StringBuilder();
sb.append("curl -X POST http://")
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run?tablet_id=")
sb.append(tablet_id)
sb.append("&compact_type=full")
String command = sb.toString()
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
trigger_and_wait_compaction(tableName, "full")
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
do {
Thread.sleep(1000)
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://")
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run_status?tablet_id=")
sb.append(tablet_id)
String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
String ip = backendId_to_backendIP.get(backend_id)
String port = backendId_to_backendHttpPort.get(backend_id)
// after compaction, there are 1 rwoset in local mode and 2 rowsets in cloud mode.

View File

@ -40,7 +40,7 @@ suite("test_mow_table_with_format_v2", "inverted_index_format_v2") {
assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout")
}
def calc_segment_count = { tablet ->
def calc_segment_count = { tablet ->
int segment_count = 0
String tablet_id = tablet.TabletId
StringBuilder sb = new StringBuilder();
@ -163,69 +163,12 @@ suite("test_mow_table_with_format_v2", "inverted_index_format_v2") {
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
String ip = backendId_to_backendIP.get(backend_id)
String port = backendId_to_backendHttpPort.get(backend_id)
int segment_count = calc_segment_count(tablet)
logger.info("TabletId: " + tablet_id + ", segment_count: " + segment_count)
check_nested_index_file(ip, port, tablet_id, 9, 3, "V2")
StringBuilder sb = new StringBuilder();
sb.append("curl -X POST http://")
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run?tablet_id=")
sb.append(tablet_id)
sb.append("&compact_type=cumulative")
String command = sb.toString()
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for all compactions done
trigger_and_wait_compaction(tableName, "cumulative")
// check indexes
for (def tablet in tablets) {
boolean running = true
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
do {
Thread.sleep(1000)
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://")
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run_status?tablet_id=")
sb.append(tablet_id)
String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
String ip = backendId_to_backendIP.get(backend_id)
String port = backendId_to_backendHttpPort.get(backend_id)
check_nested_index_file(ip, port, tablet_id, 2, 3, "V2")

View File

@ -46,7 +46,7 @@ suite("test_single_replica_compaction_with_format_v2", "inverted_index_format_v2
assertTrue(useTime <= OpTimeout, "wait_for_latest_op_on_table_finish timeout")
}
def calc_segment_count = { tablet ->
def calc_segment_count = { tablet ->
int segment_count = 0
String tablet_id = tablet.TabletId
StringBuilder sb = new StringBuilder();
@ -167,67 +167,10 @@ suite("test_single_replica_compaction_with_format_v2", "inverted_index_format_v2
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
// trigger compactions for all tablets in ${tableName}
trigger_and_wait_compaction(tableName, "cumulative")
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
String ip = backendId_to_backendIP.get(backend_id)
String port = backendId_to_backendHttpPort.get(backend_id)
check_nested_index_file(ip, port, tablet_id, 9, 3, "V2")
StringBuilder sb = new StringBuilder();
sb.append("curl -X POST http://")
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run?tablet_id=")
sb.append(tablet_id)
sb.append("&compact_type=cumulative")
String command = sb.toString()
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
do {
Thread.sleep(1000)
StringBuilder sb = new StringBuilder();
sb.append("curl -X GET http://")
sb.append(backendId_to_backendIP.get(backend_id))
sb.append(":")
sb.append(backendId_to_backendHttpPort.get(backend_id))
sb.append("/api/compaction/run_status?tablet_id=")
sb.append(tablet_id)
String command = sb.toString()
logger.info(command)
process = command.execute()
code = process.waitFor()
err = IOGroovyMethods.getText(new BufferedReader(new InputStreamReader(process.getErrorStream())));
out = process.getText()
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
String ip = backendId_to_backendIP.get(backend_id)
String port = backendId_to_backendHttpPort.get(backend_id)
check_nested_index_file(ip, port, tablet_id, 2, 3, "V2")

View File

@ -16,7 +16,7 @@
// under the License.
suite("test_dup_table_inverted_index", "p1") {
// load data
def load_data = { loadTableName, fileName ->
streamLoad {
@ -59,51 +59,6 @@ suite("test_dup_table_inverted_index", "p1") {
}
}
def run_compaction = { compactionTableName ->
String backend_id;
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
def tablets = sql_return_maparray """ show tablets from ${compactionTableName}; """
// run
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
times = 1
do{
(code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
++times
sleep(2000)
} while (parseJson(out.trim()).status.toLowerCase()!="success" && times<=10)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
logger.info("Compaction was done automatically!")
}
}
// wait
for (def tablet : tablets) {
boolean running = true
do {
Thread.sleep(1000)
def tablet_id = tablet.TabletId
backend_id = tablet.BackendId
def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
}
// def generate_dup_mow_sql = { tableName ->
// List<String> list = new ArrayList<>()
// // FULLTEXT
@ -174,7 +129,7 @@ suite("test_dup_table_inverted_index", "p1") {
// <=
list.add("SELECT COUNT(*) FROM ${tableName} WHERE request <= 'POST'");
list.add("SELECT id FROM ${tableName} WHERE request <= 'POST' ORDER BY id LIMIT 2");
// >
list.add("SELECT COUNT(*) FROM ${tableName} WHERE request > 'POST'");
list.add("SELECT id FROM ${tableName} WHERE request > 'POST' ORDER BY id LIMIT 2");
@ -418,11 +373,11 @@ suite("test_dup_table_inverted_index", "p1") {
list.add("SELECT COUNT(*) FROM ${tableName} WHERE request MATCH_REGEXP 'GET' AND request <= 'POST';");
list.add("SELECT id FROM ${tableName} WHERE request MATCH_REGEXP 'GET' AND request <= 'POST' ORDER BY id LIMIT 2;");
list.add("SELECT COUNT(*) FROM ${tableName} WHERE request MATCH_REGEXP 'GET' AND request > 'POST' ")
list.add("SELECT id FROM ${tableName} WHERE request MATCH_REGEXP 'GET' AND request > 'POST' ORDER BY id LIMIT 2 ")
list.add("SELECT COUNT(*) FROM ${tableName} WHERE request MATCH_REGEXP 'GET' AND request > 'POST' ")
list.add("SELECT id FROM ${tableName} WHERE request MATCH_REGEXP 'GET' AND request > 'POST' ORDER BY id LIMIT 2 ")
list.add("SELECT COUNT(*) FROM ${tableName} WHERE request MATCH_REGEXP 'GET' AND request >= 'POST' ")
list.add("SELECT id FROM ${tableName} WHERE request MATCH_REGEXP 'GET' AND request >= 'POST' ORDER BY id LIMIT 2 ")
list.add("SELECT COUNT(*) FROM ${tableName} WHERE request MATCH_REGEXP 'GET' AND request >= 'POST' ")
list.add("SELECT id FROM ${tableName} WHERE request MATCH_REGEXP 'GET' AND request >= 'POST' ORDER BY id LIMIT 2 ")
// FULLTEXT MATCH_PHRASE_EDGE with others
list.add("SELECT COUNT(*) FROM ${tableName} WHERE request MATCH_PHRASE_EDGE 'GET' AND request LIKE '%GET%' ");
@ -713,8 +668,8 @@ suite("test_dup_table_inverted_index", "p1") {
list.add("SELECT `@timestamp` FROM ${tableName} WHERE id != 200 OR request LIKE '%GET%' OR NOT (size > 100 AND size LIKE '%0%') OR clientip > '17.0.0.0' ORDER BY id LIMIT 2;");
return list
}
try {
sql """ set enable_match_without_inverted_index = true """
sql """ set enable_common_expr_pushdown = true """
@ -723,7 +678,7 @@ suite("test_dup_table_inverted_index", "p1") {
// create table
sql """
CREATE TABLE IF NOT EXISTS dup_httplogs
(
(
`id` bigint NOT NULL AUTO_INCREMENT(100),
`@timestamp` int(11) NULL,
`clientip` varchar(20) NULL,
@ -787,7 +742,7 @@ suite("test_dup_table_inverted_index", "p1") {
logger.info("dup_result4 is {}", dup_result4);
compare_result(dup_result3, dup_result4, all_dup_sql)
run_compaction.call(dupTableName)
trigger_and_wait_compaction(dupTableName, "full")
def dup_result5 = execute_sql.call("enable_no_need_read_data_opt", "true", all_dup_sql)
logger.info("dup_result5 is {}", dup_result5);
def dup_result6 = execute_sql.call("enable_no_need_read_data_opt", "false", all_dup_sql)
@ -800,7 +755,7 @@ suite("test_dup_table_inverted_index", "p1") {
// create table
sql """
CREATE TABLE IF NOT EXISTS mow_httplogs
(
(
`@timestamp` int(11) NULL,
`clientip` varchar(20) NULL,
`request` text NULL,
@ -847,7 +802,7 @@ suite("test_dup_table_inverted_index", "p1") {
sql """ INSERT INTO ${mowTable} (`@timestamp`, clientip, request, status, size) VALUES (100, '1zyvBkWVAy5H0DDDaQnrp9MmAhfo0UNB9bOGyvSEX9MW66eymeDElVzVmsvUKHORwWg2hRLN7yd253zhXGs6k7PVPHy6uqtYTaxvHWm7njZYWtlqraDGE1fvrtnyUvlrGFPJZzkuj5FQfpYl6dV2bJHV0A3gpzogKSXJSyfH02ryb2ObKaJC6dnkMic00P6R3rUCBotrU7KAaGieALbFUXBGTvjsFKUvLgJexqAEJcKwiioTp0JH9Y3NUWgi2y5kclPmUG4xVrKHWXu7bI1MYJ1DCL1eCQCuqXmUf7eFyKcR6pzTFpkurcYq5R3SjprK13EkuLmVcDJMS8DNiLVCcCIOpHQMNgVFNLI7SPCl461FPOrL1xSuULAsLNjP5xgjjpn5Bu2dAug906fSVcnJwfHuuCly0sqYfNEI0Bd1IMiQOyoqA1pwdJMYMa6hig6imR3bJcnPptA6Fo1rooqzzt6gFnloqXeo9Hd9UB1F7QhfZO21QOZho19A5d12wcnOZCb3sRzomQqcPKSyvb17SxzoP9coAEpfXZEBrds60iuPjZaez79zeGP8X4KxuK1WwVDFw661zB6nvKCtNKFQqeKVMSFWAazw735TkQRGkjlif31f3uspvmBrLagvtjlfMoT138NnNxc2FbsK5wmssNfKFRk9zNg629b46rX7qLnC3ItPYgXyPSFqSF7snjqOUHJpzvcPhyY7tuDZVW2VTd3OtRdjdlAwHbSUrI5jWI1BCeP8cObIsOjd5', '1zyvBkWVAy5H0DDDaQnrp9MmAhfo0UNB9bOGyvSEX9MW66eymeDElVzVmsvUKHORwWg2hRLN7yd253zhXGs6k7PVPHy6uqtYTaxvHWm7njZYWtlqraDGE1fvrtnyUvlrGFPJZzkuj5FQfpYl6dV2bJHV0A3gpzogKSXJSyfH02ryb2ObKaJC6dnkMic00P6R3rUCBotrU7KAaGieALbFUXBGTvjsFKUvLgJexqAEJcKwiioTp0JH9Y3NUWgi2y5kclPmUG4xVrKHWXu7bI1MYJ1DCL1eCQCuqXmUf7eFyKcR6pzTFpkurcYq5R3SjprK13EkuLmVcDJMS8DNiLVCcCIOpHQMNgVFNLI7SPCl461FPOrL1xSuULAsLNjP5xgjjpn5Bu2dAug906fSVcnJwfHuuCly0sqYfNEI0Bd1IMiQOyoqA1pwdJMYMa6hig6imR3bJcnPptA6Fo1rooqzzt6gFnloqXeo9Hd9UB1F7QhfZO21QOZho19A5d12wcnOZCb3sRzomQqcPKSyvb17SxzoP9coAEpfXZEBrds60iuPjZaez79zeGP8X4KxuK1WwVDFw661zB6nvKCtNKFQqeKVMSFWAazw735TkQRGkjlif31f3uspvmBrLagvtjlfMoT138NnNxc2FbsK5wmssNfKFRk9zNg629b46rX7qLnC3ItPYgXyPSFqSF7snjqOUHJpzvcPhyY7tuDZVW2VTd3OtRdjdlAwHbSUrI5jWI1BCeP8cObIsOjd5', -2, -3) """
sql """ INSERT INTO ${mowTable} (`@timestamp`, clientip, request, status, size) VALUES (100, '1zyvBkWVAy5H0DDDaQnrp9MmAhfo0UNB9bOGyvSEX9MW66eymeDElVzVmsvUKHORwWg2hRLN7yd253zhXGs6k7PVPHy6uqtYTaxvHWm7njZYWtlqraDGE1fvrtnyUvlrGFPJZzkuj5FQfpYl6dV2bJHV0A3gpzogKSXJSyfH02ryb2ObKaJC6dnkMic00P6R3rUCBotrU7KAaGieALbFUXBGTvjsFKUvLgJexqAEJcKwiioTp0JH9Y3NUWgi2y5kclPmUG4xVrKHWXu7bI1MYJ1DCL1eCQCuqXmUf7eFyKcR6pzTFpkurcYq5R3SjprK13EkuLmVcDJMS8DNiLVCcCIOpHQMNgVFNLI7SPCl461FPOrL1xSuULAsLNjP5xgjjpn5Bu2dAug906fSVcnJwfHuuCly0sqYfNEI0Bd1IMiQOyoqA1pwdJMYMa6hig6imR3bJcnPptA6Fo1rooqzzt6gFnloqXeo9Hd9UB1F7QhfZO21QOZho19A5d12wcnOZCb3sRzomQqcPKSyvb17SxzoP9coAEpfXZEBrds60iuPjZaez79zeGP8X4KxuK1WwVDFw661zB6nvKCtNKFQqeKVMSFWAazw735TkQRGkjlif31f3uspvmBrLagvtjlfMoT138NnNxc2FbsK5wmssNfKFRk9zNg629b46rX7qLnC3ItPYgXyPSFqSF7snjqOUHJpzvcPhyY7tuDZVW2VTd3OtRdjdlAwHbSUrI5jWI1BCeP8cObIsOjd5', '1zyvBkWVAy5H0DDDaQnrp9MmAhfo0UNB9bOGyvSEX9MW66eymeDElVzVmsvUKHORwWg2hRLN7yd253zhXGs6k7PVPHy6uqtYTaxvHWm7njZYWtlqraDGE1fvrtnyUvlrGFPJZzkuj5FQfpYl6dV2bJHV0A3gpzogKSXJSyfH02ryb2ObKaJC6dnkMic00P6R3rUCBotrU7KAaGieALbFUXBGTvjsFKUvLgJexqAEJcKwiioTp0JH9Y3NUWgi2y5kclPmUG4xVrKHWXu7bI1MYJ1DCL1eCQCuqXmUf7eFyKcR6pzTFpkurcYq5R3SjprK13EkuLmVcDJMS8DNiLVCcCIOpHQMNgVFNLI7SPCl461FPOrL1xSuULAsLNjP5xgjjpn5Bu2dAug906fSVcnJwfHuuCly0sqYfNEI0Bd1IMiQOyoqA1pwdJMYMa6hig6imR3bJcnPptA6Fo1rooqzzt6gFnloqXeo9Hd9UB1F7QhfZO21QOZho19A5d12wcnOZCb3sRzomQqcPKSyvb17SxzoP9coAEpfXZEBrds60iuPjZaez79zeGP8X4KxuK1WwVDFw661zB6nvKCtNKFQqeKVMSFWAazw735TkQRGkjlif31f3uspvmBrLagvtjlfMoT138NnNxc2FbsK5wmssNfKFRk9zNg629b46rX7qLnC3ItPYgXyPSFqSF7snjqOUHJpzvcPhyY7tuDZVW2VTd3OtRdjdlAwHbSUrI5jWI1BCeP8cObIsOjd5', -2, -3) """
sql """ sync """
def all_mow_sql = generate_dup_mow_sql.call(mowTable)
def mow_result1 = execute_sql.call("enable_no_need_read_data_opt", "true", all_mow_sql)
logger.info("mow_result1 is {}", mow_result1);
@ -868,7 +823,7 @@ suite("test_dup_table_inverted_index", "p1") {
logger.info("mow_result4 is {}", mow_result4);
compare_result(mow_result3, mow_result4, all_mow_sql)
run_compaction.call(mowTable)
trigger_and_wait_compaction(mowTable, "full")
def mow_result5 = execute_sql.call("enable_no_need_read_data_opt", "true", all_mow_sql)
logger.info("mow_result5 is {}", mow_result5);
def mow_result6 = execute_sql.call("enable_no_need_read_data_opt", "false", all_mow_sql)
@ -894,4 +849,4 @@ suite("test_dup_table_inverted_index", "p1") {
} finally {
sql """ set enable_match_without_inverted_index = true """
}
}
}

View File

@ -551,7 +551,7 @@ suite("test_show_data_with_compaction", "p1") {
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
@ -664,42 +664,6 @@ suite("test_show_data_with_compaction", "p1") {
return "wait_timeout"
}
def run_compaction_and_wait = { tableName ->
//TabletId,ReplicaId,BackendId,SchemaHash,Version,LstSuccessVersion,LstFailedVersion,LstFailedTime,LocalDataSize,RemoteDataSize,RowCount,State,LstConsistencyCheckTime,CheckVersion,VersionCount,QueryHits,PathHash,MetaUrl,CompactionStatus
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_full_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
logger.info("Compaction was done automatically!")
} else {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
}
def create_table_run_compaction_and_wait = { test_name ->
sql """ DROP TABLE IF EXISTS ${test_name}; """
sql """
@ -726,14 +690,14 @@ suite("test_show_data_with_compaction", "p1") {
sql """ INSERT INTO ${test_name} VALUES (3, "bason", "bason hate pear", 99); """
def data_size = wait_for_show_data_finish(test_name, 60000, 0)
assertTrue(data_size != "wait_timeout")
run_compaction_and_wait(test_name)
trigger_and_wait_compaction(test_name, "full")
data_size = wait_for_show_data_finish(test_name, 60000, data_size)
assertTrue(data_size != "wait_timeout")
return data_size
}
try {
set_be_config.call("inverted_index_compaction_enable", "true")
sql "DROP TABLE IF EXISTS ${tableWithIndexCompaction}"
create_table_with_index.call(tableWithIndexCompaction)
@ -748,7 +712,7 @@ suite("test_show_data_with_compaction", "p1") {
def with_index_size = wait_for_show_data_finish(tableWithIndexCompaction, 60000, 0)
assertTrue(with_index_size != "wait_timeout")
run_compaction_and_wait(tableWithIndexCompaction)
trigger_and_wait_compaction(tableWithIndexCompaction, "full")
with_index_size = wait_for_show_data_finish(tableWithIndexCompaction, 60000, with_index_size)
assertTrue(with_index_size != "wait_timeout")
@ -764,7 +728,7 @@ suite("test_show_data_with_compaction", "p1") {
def another_with_index_size = wait_for_show_data_finish(tableWithOutIndexCompaction, 60000, 0)
assertTrue(another_with_index_size != "wait_timeout")
run_compaction_and_wait(tableWithOutIndexCompaction)
trigger_and_wait_compaction(tableWithOutIndexCompaction, "full")
another_with_index_size = wait_for_show_data_finish(tableWithOutIndexCompaction, 60000, another_with_index_size)
assertTrue(another_with_index_size != "wait_timeout")

View File

@ -89,7 +89,7 @@ suite("test_map_load_and_compaction", "p0") {
for (int i = 0; i < 5; ++i) {
streamLoadJson.call(4063, dataFile1)
}
sql """sync"""
// check result
@ -105,31 +105,7 @@ suite("test_map_load_and_compaction", "p0") {
checkCompactionStatus.call(compactionStatus, 6)
// trigger compaction
String backend_id;
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
def (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
assertEquals("success", compactJson.status.toLowerCase())
def running = false
// wait compactions done
do {
Thread.sleep(1000)
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def cs = parseJson(out.trim())
assertEquals("success", cs.status.toLowerCase())
running = cs.run_status
} while (running)
trigger_and_wait_compaction(testTable, "cumulative")
checkCompactionStatus.call(compactionStatus, 1)
// finally check backend alive

View File

@ -31,7 +31,7 @@ suite("test_agg_keys_schema_change_datev2") {
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
@ -108,7 +108,7 @@ suite("test_agg_keys_schema_change_datev2") {
}
sql """sync"""
qt_sql """select * from ${tbName} ORDER BY `datek1`;"""
do_compact(tbName)
trigger_and_wait_compaction(tbName, "cumulative")
sql """sync"""
qt_sql """select * from ${tbName} ORDER BY `datek1`;"""
sql """delete from ${tbName} where `datev3` = '2022-01-01';"""
@ -159,7 +159,7 @@ suite("test_agg_keys_schema_change_datev2") {
}
sql """sync"""
qt_sql """select * from ${tbName} ORDER BY `datek1`;"""
do_compact(tbName)
trigger_and_wait_compaction(tbName, "cumulative")
sql """sync"""
qt_sql """select * from ${tbName} ORDER BY `datek1`;"""
sql """delete from ${tbName} where `datev3` = '2022-01-01 11:11:11';"""
@ -210,7 +210,7 @@ suite("test_agg_keys_schema_change_datev2") {
}
sql """sync"""
qt_sql """select * from ${tbName} ORDER BY `datek1`;"""
do_compact(tbName)
trigger_and_wait_compaction(tbName, "cumulative")
sql """sync"""
qt_sql """select * from ${tbName} ORDER BY `datek1`;"""
sql """delete from ${tbName} where `datev3` = '2022-01-01 11:11:11';"""

View File

@ -100,7 +100,7 @@ suite("test_schema_change_varchar_to_datev2") {
sql """sync"""
qt_sql_2 """select * from ${tbName} ORDER BY `k1`;"""
do_compact(tbName)
trigger_and_wait_compaction(tbName, "cumulative")
sql """sync"""
qt_sql_3 """select * from ${tbName} ORDER BY `k1`;"""
sql """delete from ${tbName} where `k3` = '2020-01-02';"""

View File

@ -113,7 +113,7 @@ suite("test_agg_keys_schema_change_decimalv2", "nonConcurrent") {
}
sql """sync"""
qt_sql2 """select * from ${tbName} ORDER BY 1,2,3,4;"""
do_compact(tbName)
trigger_and_wait_compaction(tbName, "cumulative")
sql """sync"""
qt_sql3 """select * from ${tbName} ORDER BY 1,2,3,4;"""

View File

@ -97,7 +97,7 @@ suite("test_agg_keys_schema_change_decimalv3") {
}
sql """sync"""
qt_sql """select * from ${tbName} ORDER BY `decimalv3k1`;"""
do_compact(tbName)
trigger_and_wait_compaction(tbName, "cumulative")
sql """sync"""
qt_sql """select * from ${tbName} ORDER BY `decimalv3k1`;"""
sql """ alter table ${tbName} drop column `decimalv3v3` """

View File

@ -22,7 +22,7 @@ suite ("test_agg_keys_schema_change") {
def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
return jobStateResult[0][9]
}
def tableName = "schema_change_agg_keys_regression_test"
try {
@ -72,7 +72,7 @@ suite ("test_agg_keys_schema_change") {
// add key column case 1, not light schema change
sql """
ALTER table ${tableName} ADD COLUMN new_key_column INT default "2"
ALTER table ${tableName} ADD COLUMN new_key_column INT default "2"
"""
int max_try_time = 3000
@ -171,31 +171,8 @@ suite ("test_agg_keys_schema_change") {
"""
// compaction
String[][] tablets = sql """ show tablets from ${tableName}; """
for (String[] tablet in tablets) {
String tablet_id = tablet[0]
def backend_id = tablet[2]
logger.info("run compaction:" + tablet_id)
def (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
}
trigger_and_wait_compaction(tableName, "cumulative")
// wait for all compactions done
for (String[] tablet in tablets) {
boolean running = true
do {
Thread.sleep(100)
String tablet_id = tablet[0]
def backend_id = tablet[2]
def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
qt_sc """ select count(*) from ${tableName} """
qt_sc """ SELECT * FROM schema_change_agg_keys_regression_test WHERE user_id=2 """

View File

@ -1,4 +1,4 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@ -146,32 +146,8 @@ suite ("test_agg_mv_schema_change") {
"""
// compaction
String[][] tablets = sql """ show tablets from ${tableName}; """
for (String[] tablet in tablets) {
String tablet_id = tablet[0]
def backend_id = tablet[2]
logger.info("run compaction:" + tablet_id)
def (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
//assertEquals(code, 0)
}
trigger_and_wait_compaction(tableName, "cumulative")
// wait for all compactions done
for (String[] tablet in tablets) {
boolean running = true
do {
Thread.sleep(100)
String tablet_id = tablet[0]
def backend_id = tablet[2]
def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
qt_sc """ select count(*) from ${tableName} """
qt_sc """ SELECT * FROM ${tableName} WHERE user_id=2 """
@ -182,4 +158,4 @@ suite ("test_agg_mv_schema_change") {
}

View File

@ -1,4 +1,4 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@ -154,32 +154,8 @@ suite ("test_agg_rollup_schema_change") {
"""
// compaction
String[][] tablets = sql """ show tablets from ${tableName}; """
for (String[] tablet in tablets) {
String tablet_id = tablet[0]
def backend_id = tablet[2]
logger.info("run compaction:" + tablet_id)
def (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
//assertEquals(code, 0)
}
trigger_and_wait_compaction(tableName, "cumulative")
// wait for all compactions done
for (String[] tablet in tablets) {
boolean running = true
do {
Thread.sleep(100)
String tablet_id = tablet[0]
def backend_id = tablet[2]
def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
qt_sc """ select count(*) from ${tableName} """
qt_sc """ SELECT * FROM ${tableName} WHERE user_id=2 """

View File

@ -78,7 +78,7 @@ suite ("test_agg_vals_schema_change") {
// add column
sql """
ALTER table ${tableName} ADD COLUMN new_column INT MAX default "1"
ALTER table ${tableName} ADD COLUMN new_column INT MAX default "1"
"""
qt_sc """ SELECT * FROM ${tableName} WHERE user_id=2 """
@ -136,31 +136,7 @@ suite ("test_agg_vals_schema_change") {
"""
// compaction
String[][] tablets = sql """ show tablets from ${tableName}; """
for (String[] tablet in tablets) {
String tablet_id = tablet[0]
def backend_id = tablet[2]
logger.info("run compaction:" + tablet_id)
def (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
//assertEquals(code, 0)
}
// wait for all compactions done
for (String[] tablet in tablets) {
boolean running = true
do {
Thread.sleep(100)
String tablet_id = tablet[0]
def backend_id = tablet[2]
def (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "cumulative")
qt_sc """ select count(*) from ${tableName} """
qt_sc """ SELECT * FROM ${tableName} WHERE user_id=2 """

View File

@ -71,7 +71,7 @@ suite ("test_dup_keys_schema_change") {
// add column
sql """
ALTER table ${tableName} ADD COLUMN new_column INT default "1"
ALTER table ${tableName} ADD COLUMN new_column INT default "1"
"""
sql """ SELECT * FROM ${tableName} WHERE user_id=2 order by min_dwell_time """

View File

@ -94,7 +94,7 @@ suite ("test_dup_mv_schema_change") {
// add column
sql """
ALTER table ${tableName} ADD COLUMN new_column INT default "1"
ALTER table ${tableName} ADD COLUMN new_column INT default "1"
"""
waitForJob(tableName, 3000)

View File

@ -103,7 +103,7 @@ suite ("test_dup_rollup_schema_change") {
// add column
sql """
ALTER table ${tableName} ADD COLUMN new_column INT default "1"
ALTER table ${tableName} ADD COLUMN new_column INT default "1"
"""
sql """ SELECT * FROM ${tableName} WHERE user_id=2 order by min_dwell_time """

View File

@ -70,7 +70,7 @@ suite ("test_dup_vals_schema_change") {
// add column
sql """
ALTER table ${tableName} ADD COLUMN new_column INT default "1"
ALTER table ${tableName} ADD COLUMN new_column INT default "1"
"""
sql """ SELECT * FROM ${tableName} WHERE user_id=2 order by min_dwell_time """

View File

@ -73,7 +73,7 @@ suite ("test_uniq_keys_schema_change") {
// add column
sql """
ALTER table ${tableName} ADD COLUMN new_column INT default "1"
ALTER table ${tableName} ADD COLUMN new_column INT default "1"
"""
sql """ SELECT * FROM ${tableName} WHERE user_id=2 """

View File

@ -104,7 +104,7 @@ suite ("test_uniq_mv_schema_change") {
// add column
sql """
ALTER table ${tableName} ADD COLUMN new_column INT default "1"
ALTER table ${tableName} ADD COLUMN new_column INT default "1"
"""
sql """ SELECT * FROM ${tableName} WHERE user_id=2 """
@ -165,15 +165,7 @@ suite ("test_uniq_mv_schema_change") {
"""
// compaction
String[][] tablets = sql """ show tablets from ${tableName}; """
for (String[] tablet in tablets) {
String tablet_id = tablet[0]
backend_id = tablet[2]
logger.info("run compaction:" + tablet_id)
def (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
//assertEquals(code, 0)
}
trigger_and_wait_compaction(tableName, "cumulative")
// wait for all compactions done
for (String[] tablet in tablets) {

View File

@ -97,7 +97,7 @@ suite ("test_uniq_rollup_schema_change") {
// add column
sql """
ALTER table ${tableName} ADD COLUMN new_column INT default "1"
ALTER table ${tableName} ADD COLUMN new_column INT default "1"
"""
sql """ SELECT * FROM ${tableName} WHERE user_id=2 """
@ -173,15 +173,7 @@ suite ("test_uniq_rollup_schema_change") {
"""
// compaction
String[][] tablets = sql """ show tablets from ${tableName}; """
for (String[] tablet in tablets) {
String tablet_id = tablet[0]
backend_id = tablet[2]
logger.info("run compaction:" + tablet_id)
def (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
//assertEquals(code, 0)
}
trigger_and_wait_compaction(tableName, "cumulative")
// wait for all compactions done
for (String[] tablet in tablets) {

View File

@ -73,7 +73,7 @@ suite ("test_uniq_vals_schema_change") {
// add column
sql """
ALTER table ${tableName} ADD COLUMN new_column INT default "1"
ALTER table ${tableName} ADD COLUMN new_column INT default "1"
"""
sql """ SELECT * FROM ${tableName} WHERE user_id=2 """

View File

@ -22,7 +22,7 @@ suite ("test_varchar_schema_change") {
def jobStateResult = sql """ SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
return jobStateResult[0][9]
}
def tableName = "varchar_schema_change_regression_test"
try {
@ -55,7 +55,7 @@ suite ("test_varchar_schema_change") {
"""
exception "Cannot shorten string length"
}
// test {//为什么第一次改没发生Nothing is changed错误?查看branch-1.2-lts代码
// sql """ alter table ${tableName} modify column c2 varchar(20)
// """
@ -146,8 +146,8 @@ suite ("test_varchar_schema_change") {
qt_sc " select min(c2),max(c2) from ${tableName} group by c0 order by 1,2; "
sleep(5000)
sql """ alter table ${tableName}
modify column c2 varchar(40),
sql """ alter table ${tableName}
modify column c2 varchar(40),
modify column c3 varchar(6) DEFAULT '0'
"""
max_try_time = 1200

View File

@ -23,7 +23,7 @@ suite("regression_test_variant_github_events_p2", "nonConcurrent,p2"){
def delta_time = 1000
def alter_res = "null"
def useTime = 0
def wait_for_latest_op_on_table_finish = { table_name, OpTimeout ->
for(int t = delta_time; t <= OpTimeout; t += delta_time){
alter_res = sql """SHOW ALTER TABLE COLUMN WHERE TableName = "${table_name}" ORDER BY CreateTime DESC LIMIT 1;"""
@ -106,7 +106,7 @@ suite("regression_test_variant_github_events_p2", "nonConcurrent,p2"){
logger.info("wait_for_last_build_index_on_table_running debug: " + alter_res)
assertTrue(useTime <= OpTimeout, "wait_for_last_build_index_on_table_running timeout")
return "wait_timeout"
}
}
def backendId_to_backendIP = [:]
@ -125,8 +125,8 @@ suite("regression_test_variant_github_events_p2", "nonConcurrent,p2"){
table "${table_name}"
// set http request header params
set 'read_json_by_line', 'true'
set 'format', 'json'
set 'read_json_by_line', 'true'
set 'format', 'json'
set 'max_filter_ratio', '0.1'
file file_name // import json file
time 10000 // limit inflight 10s
@ -187,32 +187,9 @@ suite("regression_test_variant_github_events_p2", "nonConcurrent,p2"){
def tablets = sql_return_maparray """ show tablets from github_events; """
// trigger compactions for all tablets in github_events
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
}
trigger_and_wait_compaction("github_events", "cumulative")
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
sql """set enable_match_without_inverted_index = false"""
sql """set enable_match_without_inverted_index = false"""
sql """ set enable_common_expr_pushdown = true """
// filter by bloom filter
qt_sql """select cast(v["payload"]["pull_request"]["additions"] as int) from github_events where cast(v["repo"]["name"] as string) = 'xpressengine/xe-core' order by 1;"""
@ -220,4 +197,4 @@ suite("regression_test_variant_github_events_p2", "nonConcurrent,p2"){
// query with inverted index
qt_sql """select cast(v["payload"]["pull_request"]["additions"] as int) from github_events where v["repo"]["name"] match 'xpressengine' order by 1;"""
qt_sql """select count() from github_events where v["repo"]["name"] match 'apache' order by 1;"""
}
}

View File

@ -26,7 +26,7 @@ suite("test_compaction_variant") {
backend_id = backendId_to_backendIP.keySet()[0]
def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id))
logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def configList = parseJson(out.trim())
@ -48,7 +48,7 @@ suite("test_compaction_variant") {
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
k bigint,
v ${var_def}
v ${var_def}
)
${key_type} KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS ${buckets}
@ -136,7 +136,7 @@ suite("test_compaction_variant") {
qt_sql_11 "SELECT * FROM ${tableName} ORDER BY k, cast(v as string); "
qt_sql_22 "select k, cast(v['a'] as array<int>) from ${tableName} where size(cast(v['a'] as array<int>)) > 0 order by k"
qt_sql_33 "select k, v['a'], cast(v['b'] as string) from ${tableName} where length(cast(v['b'] as string)) > 4 order by k"
qt_sql_55 "select cast(v['b'] as string), cast(v['b']['c'] as string) from ${tableName} where cast(v['b'] as string) != 'null' and cast(v['b'] as string) != '{}' order by k desc limit 10;"
qt_sql_55 "select cast(v['b'] as string), cast(v['b']['c'] as string) from ${tableName} where cast(v['b'] as string) != 'null' and cast(v['b'] as string) != '{}' order by k desc limit 10;"
}
} finally {

View File

@ -55,7 +55,7 @@ suite("test_compaction_sparse_column", "p1,nonConcurrent") {
k bigint,
v variant
)
DUPLICATE KEY(`k`)
DUPLICATE KEY(`k`)
DISTRIBUTED BY HASH(`k`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
@ -66,26 +66,26 @@ suite("test_compaction_sparse_column", "p1,nonConcurrent") {
sql """insert into ${tableName} select 0, '{"a": 11245, "b" : 42000}' as json_str
union all select 0, '{"a": 1123}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "aaaaa"}' as json_str from numbers("number" = "4096") limit 4096 ;"""
sql """insert into ${tableName} select 1, '{"a": 11245, "b" : 42001}' as json_str
union all select 1, '{"a": 1123}' as json_str union all select 1, '{"a" : 1234, "xxxx" : "bbbbb"}' as json_str from numbers("number" = "4096") limit 4096 ;"""
sql """insert into ${tableName} select 2, '{"a": 11245, "b" : 42002}' as json_str
union all select 2, '{"a": 1123}' as json_str union all select 2, '{"a" : 1234, "xxxx" : "ccccc"}' as json_str from numbers("number" = "4096") limit 4096 ;"""
sql """insert into ${tableName} select 3, '{"a" : 1234, "point" : 1, "xxxx" : "ddddd"}' as json_str
union all select 3, '{"a": 1123}' as json_str union all select 3, '{"a": 11245, "b" : 42003}' as json_str from numbers("number" = "4096") limit 4096 ;"""
sql """insert into ${tableName} select 4, '{"a" : 1234, "xxxx" : "eeeee", "point" : 5}' as json_str
union all select 4, '{"a": 1123}' as json_str union all select 4, '{"a": 11245, "b" : 42004}' as json_str from numbers("number" = "4096") limit 4096 ;"""
sql """insert into ${tableName} select 5, '{"a" : 1234, "xxxx" : "fffff", "point" : 42000}' as json_str
union all select 5, '{"a": 1123}' as json_str union all select 5, '{"a": 11245, "b" : 42005}' as json_str from numbers("number" = "4096") limit 4096 ;"""
qt_select_b_bfcompact """ SELECT count(cast(v['b'] as int)) FROM ${tableName};"""
qt_select_xxxx_bfcompact """ SELECT count(cast(v['xxxx'] as string)) FROM ${tableName};"""
qt_select_point_bfcompact """ SELECT count(cast(v['point'] as bigint)) FROM ${tableName};"""

View File

@ -51,7 +51,7 @@ suite("test_compaction_extract_root", "p1,nonConcurrent") {
k bigint,
v variant
)
DUPLICATE KEY(`k`)
DUPLICATE KEY(`k`)
DISTRIBUTED BY HASH(`k`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
@ -64,22 +64,22 @@ suite("test_compaction_extract_root", "p1,nonConcurrent") {
sql """insert into ${tableName} select 0, '{"a": 11245, "b" : {"state" : "open", "code" : 2}}' as json_str
union all select 8, '{"a": 1123}' as json_str union all select 0, '{"a" : 1234, "xxxx" : "aaaaa"}' as json_str from numbers("number" = "4096") limit 4096 ;"""
sql """insert into ${tableName} select 1, '{"a": 11245, "b" : {"state" : "colse", "code" : 2}}' as json_str
union all select 1, '{"a": 1123}' as json_str union all select 1, '{"a" : 1234, "xxxx" : "bbbbb"}' as json_str from numbers("number" = "4096") limit 4096 ;"""
sql """insert into ${tableName} select 2, '{"a": 11245, "b" : {"state" : "flat", "code" : 3}}' as json_str
union all select 2, '{"a": 1123}' as json_str union all select 2, '{"a" : 1234, "xxxx" : "ccccc"}' as json_str from numbers("number" = "4096") limit 4096 ;"""
sql """insert into ${tableName} select 3, '{"a" : 1234, "xxxx" : 4, "point" : 5}' as json_str
union all select 3, '{"a": 1123}' as json_str union all select 3, '{"a": 11245, "b" : 42003}' as json_str from numbers("number" = "4096") limit 4096 ;"""
sql """insert into ${tableName} select 4, '{"a" : 1234, "xxxx" : "eeeee", "point" : 5}' as json_str
union all select 4, '{"a": 1123}' as json_str union all select 4, '{"a": 11245, "b" : 42004}' as json_str from numbers("number" = "4096") limit 4096 ;"""
sql """insert into ${tableName} select 5, '{"a" : 1234, "xxxx" : "fffff", "point" : 42000}' as json_str
union all select 5, '{"a": 1123}' as json_str union all select 5, '{"a": 11245, "b" : 42005}' as json_str from numbers("number" = "4096") limit 4096 ;"""
@ -96,37 +96,7 @@ suite("test_compaction_extract_root", "p1,nonConcurrent") {
def tablets = sql_return_maparray """ show tablets from ${tableName}; """
// trigger compactions for all tablets in ${tableName}
for (def tablet in tablets) {
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactJson = parseJson(out.trim())
if (compactJson.status.toLowerCase() == "fail") {
assertEquals(disableAutoCompaction, false)
logger.info("Compaction was done automatically!")
}
if (disableAutoCompaction) {
assertEquals("success", compactJson.status.toLowerCase())
}
}
// wait for all compactions done
for (def tablet in tablets) {
boolean running = true
do {
Thread.sleep(1000)
String tablet_id = tablet.TabletId
backend_id = tablet.BackendId
(code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id)
logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err)
assertEquals(code, 0)
def compactionStatus = parseJson(out.trim())
assertEquals("success", compactionStatus.status.toLowerCase())
running = compactionStatus.run_status
} while (running)
}
trigger_and_wait_compaction(tableName, "cumulative")
int rowCount = 0
for (def tablet in tablets) {