[test] support a lot of actions (#8632)

Support a lot of actions for regression testing framework.
e.g. thread, lazyCheck, onSuccess, connect, selectUnionAll, timer

Demo exists in ${DORIS_HOME}/regression-test/suites/demo
This commit is contained in:
924060929
2022-03-24 20:22:24 +08:00
committed by GitHub
parent c69dd54116
commit 9db2a96af1
23 changed files with 607 additions and 96 deletions

View File

@ -36,7 +36,7 @@ under the License.
1. 需要预先安装好集群
2. 修改配置文件`${DORIS_HOME}/conf/regression-conf.groovy`,设置jdbc url、用户等配置项
3. 创建测试用例文件并编写用例
4. 如果用例文件包含`qt` Action,则需要创建关联的data文件,比如`suites/demo/qt_action.groovy`这个例子,需要用到`data/demo/qt_action.out`这个TSV文件来校验输出是否一致
4. 如果用例文件包含`qt` Action,则需要创建关联的data文件,比如`suites/demo/qt_action.groovy`这个例子,需要用到`data/demo/qt_action.out`这个TSV文件来校验输出是否一致
5. 运行`${DORIS_HOME}/run-regression-test.sh`测试全部用例,或运行`${DORIS_HOME}/run-regression-test.sh --run <suiteName>` 测试若干用例,更多例子见"启动脚本例子"章节
## 目录结构
@ -114,7 +114,7 @@ customConf1 = "test_custom_conf_value"
## 编写用例的步骤
1. 进入`${DORIS_HOME}/regression-test`目录
2. 根据测试的目的来选择用例的目录,正确性测试存在`suites/correctness`,而性能测试存在`suites/performance`
3. 新建一个groovy用例文件,增加若干`Action`用于测试,Action在后续章节具体说明
3. 新建一个groovy用例文件,增加若干`Action`用于测试,Action在后续章节具体说明
## Action
Action是一个测试框架默认提供的测试行为,使用DSL来定义。
@ -178,6 +178,7 @@ try {
* return xxx(args)
* } catch (Throwable t) {
* // do nothing
* return null
* }
*/
try_sql("DROP TABLE IF EXISTS ${testTable}")
@ -447,6 +448,10 @@ streamLoad {
}
```
### 其他Action
thread, lazyCheck, events, connect, selectUnionAll
具体可以在这个目录找到例子: `${DORIS_HOME}/regression-test/suites/demo`
## 启动脚本例子
```shell
# 查看脚本参数说明

View File

@ -6,7 +6,7 @@
-- !select2 --
2
-- !union --
-- !union_all --
\N
1
15

View File

@ -0,0 +1,10 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_union_all1 --
1
10
3
-- !select_union_all2 --
0 abc
1 123
2 \N

View File

@ -0,0 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !diffrent_tag1 --
100
-- !diffrent_tag2 --
100

View File

@ -57,6 +57,7 @@ class Config {
public Set<String> groups = new HashSet<>()
public InetSocketAddress feHttpInetSocketAddress
public Integer parallel
public Integer actionParallel
public Integer times
public boolean withOutLoadData
@ -124,6 +125,7 @@ class Config {
config.generateOutputFile = cmd.hasOption(genOutOpt)
config.forceGenerateOutputFile = cmd.hasOption(forceGenOutOpt)
config.parallel = Integer.parseInt(cmd.getOptionValue(parallelOpt, "1"))
config.actionParallel = Integer.parseInt(cmd.getOptionValue(actionParallelOpt, "10"))
config.times = Integer.parseInt(cmd.getOptionValue(timesOpt, "1"))
config.randomOrder = cmd.hasOption(randomOrderOpt)
config.withOutLoadData = cmd.hasOption(withOutLoadDataOpt)
@ -226,6 +228,11 @@ class Config {
log.info("Set parallel to 1 because not specify.".toString())
}
if (config.actionParallel == null) {
config.actionParallel = 10
log.info("Set actionParallel to 10 because not specify.".toString())
}
if (config.randomOrder == null) {
config.randomOrder = false
log.info("set randomOrder to false because not specify.".toString())
@ -263,15 +270,15 @@ class Config {
String urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3)
if (urlWithoutSchema.indexOf("/") >= 0) {
if (jdbcUrl.contains("?")) {
// e.g: jdbc:mysql://locahost:8080/?a=b
// e.g: jdbc:mysql://localhost:8080/?a=b
urlWithDb = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"))
urlWithDb += ("/" + defaultDb) + jdbcUrl.substring(jdbcUrl.lastIndexOf("?"))
} else {
// e.g: jdbc:mysql://locahost:8080/
// e.g: jdbc:mysql://localhost:8080/
urlWithDb += defaultDb
}
} else {
// e.g: jdbc:mysql://locahost:8080
// e.g: jdbc:mysql://localhost:8080
urlWithDb += ("/" + defaultDb)
}
this.jdbcUrl = urlWithDb

View File

@ -43,6 +43,7 @@ class ConfigOptions {
static Option genOutOpt
static Option forceGenOutOpt
static Option parallelOpt
static Option actionParallelOpt
static Option randomOrderOpt
static Option timesOpt
static Option withOutLoadDataOpt
@ -180,6 +181,14 @@ class ConfigOptions {
.longOpt("parallel")
.desc("the num of threads running test")
.build()
actionParallelOpt = Option.builder("actionParallel")
.argName("parallel")
.required(false)
.hasArg(true)
.type(String.class)
.longOpt("actionParallel")
.desc("the num of threads running for thread action")
.build()
randomOrderOpt = Option.builder("randomOrder")
.required(false)
.hasArg(false)
@ -219,6 +228,7 @@ class ConfigOptions {
.addOption(confFileOpt)
.addOption(forceGenOutOpt)
.addOption(parallelOpt)
.addOption(actionParallelOpt)
.addOption(randomOrderOpt)
.addOption(withOutLoadDataOpt)

View File

@ -38,7 +38,8 @@ class RegressionTest {
static ClassLoader classloader
static CompilerConfiguration compileConfig
static GroovyShell shell
static ExecutorService executorService;
static ExecutorService executorService
static ExecutorService actionExecutorService
static void main(String[] args) {
CommandLine cmd = ConfigOptions.initCommands(args)
@ -53,6 +54,7 @@ class RegressionTest {
Recorder recorder = runSuites(config)
printResult(config, recorder)
}
actionExecutorService.shutdown()
executorService.shutdown()
}
@ -63,7 +65,8 @@ class RegressionTest {
compileConfig.setScriptBaseClass((Suite as Class).name)
shell = new GroovyShell(classloader, new Binding(), compileConfig)
log.info("starting ${config.parallel} threads")
executorService = Executors.newFixedThreadPool(config.parallel);
executorService = Executors.newFixedThreadPool(config.parallel)
actionExecutorService = Executors.newFixedThreadPool(config.actionParallel)
}
static List<File> findSuiteFiles(String root) {
@ -117,23 +120,34 @@ class RegressionTest {
return groups;
}
static Integer runSuite(Config config, SuiteFile sf, Recorder recorder) {
static Integer runSuite(Config config, SuiteFile sf, ExecutorService executorService, Recorder recorder) {
File file = sf.file
String suiteName = sf.suiteName
String group = sf.group
def suiteConn = config.getConnection()
new SuiteContext(file, suiteConn, config, recorder).withCloseable { context ->
new SuiteContext(file, suiteConn, executorService, config, recorder).withCloseable { context ->
Suite suite = null
try {
log.info("Run ${suiteName} in $file".toString())
Suite suite = shell.parse(file) as Suite
suite = shell.parse(file) as Suite
suite.init(suiteName, group, context)
suite.run()
suite.doLazyCheck()
suite.successCallbacks.each { it() }
recorder.onSuccess(new SuiteInfo(file, group, suiteName))
log.info("Run ${suiteName} in ${file.absolutePath} succeed".toString())
} catch (Throwable t) {
if (suite != null) {
suite.failCallbacks.each { it() }
}
recorder.onFailure(new SuiteInfo(file, group, suiteName))
log.error("Run ${suiteName} in ${file.absolutePath} failed".toString(), t)
} finally {
if (suite != null) {
suite.finishCallbacks.each { it() }
}
}
shell.resetLoadedClasses()
}
return 0
@ -146,7 +160,7 @@ class RegressionTest {
String group = parseGroup(config, file)
return new SuiteFile(file, suiteName, group)
}).filter({ sf ->
{ suiteNameMatch(sf.suiteName) && canRun(config, sf.suiteName, sf.group) }
suiteNameMatch(sf.suiteName) && canRun(config, sf.suiteName, sf.group)
}).collect(Collectors.toList())
if (config.randomOrder) {
@ -157,11 +171,9 @@ class RegressionTest {
def futures = new ArrayList<Future>()
runScripts.eachWithIndex { sf, i ->
log.info("[${i + 1}/${totalFile}] Run ${sf.suiteName} in ${sf.file}".toString())
Future future = executorService.submit(
()-> {
runSuite(config, sf, recorder)
}
)
Future future = executorService.submit {
runSuite(config, sf, actionExecutorService, recorder)
}
futures.add(future)
}
@ -178,9 +190,9 @@ class RegressionTest {
static Recorder runSuites(Config config) {
def recorder = new Recorder()
if (!config.withOutLoadData) {
runSuites(config, recorder, suiteName -> { suiteName == "load" })
runSuites(config, recorder, {suiteName -> suiteName == "load" })
}
runSuites(config, recorder, suiteName -> { suiteName != "load" })
runSuites(config, recorder, {suiteName -> suiteName != "load" })
return recorder
}
@ -189,9 +201,7 @@ class RegressionTest {
Set<String> suiteGroups = group.split(',').collect { g -> g.trim() }.toSet()
if (config.suiteWildcard.size() == 0 ||
(suiteName != null && (config.suiteWildcard.any {
suiteWildcard -> {
Wildcard.match(suiteName, suiteWildcard)
}
suiteWildcard -> Wildcard.match(suiteName, suiteWildcard)
}))) {
if (config.groups == null || config.groups.isEmpty()
|| !config.groups.intersect(suiteGroups).isEmpty()) {

View File

@ -17,6 +17,8 @@
package org.apache.doris.regression.action
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.FromString
import org.apache.doris.regression.suite.SuiteContext
import org.apache.doris.regression.util.JdbcUtils
import groovy.util.logging.Slf4j
@ -51,7 +53,7 @@ class ExplainAction implements SuiteAction {
notContainsStrings.add(subString)
}
void check(Closure<Boolean> checkFunction) {
void check(@ClosureParams(value = FromString, options = ["String", "String,Throwable,Long,Long"]) Closure<Boolean> checkFunction) {
this.checkFunction = checkFunction
}
@ -119,7 +121,7 @@ class ExplainAction implements SuiteAction {
long startTime = System.currentTimeMillis()
String explainString = null
try {
explainString = JdbcUtils.executeToList(context.conn, explainSql).stream()
explainString = JdbcUtils.executeToList(context.getConnection(), explainSql).stream()
.map({row -> row.get(0).toString()})
.collect(Collectors.joining("\n"))
return new ActionResult(explainString, null, startTime, System.currentTimeMillis())

View File

@ -18,6 +18,8 @@
package org.apache.doris.regression.action
import com.google.common.collect.Iterators
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.FromString
import org.apache.doris.regression.suite.SuiteContext
import org.apache.doris.regression.util.BytesInputStream
import org.apache.doris.regression.util.OutputUtils
@ -115,7 +117,7 @@ class StreamLoadAction implements SuiteAction {
this.time = time.call()
}
void check(Closure check) {
void check(@ClosureParams(value = FromString, options = ["String,Throwable,Long,Long"]) Closure check) {
this.check = check
}

View File

@ -17,6 +17,8 @@
package org.apache.doris.regression.action
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.FromString
import groovy.util.logging.Slf4j
import java.sql.Connection
@ -41,7 +43,7 @@ class TestAction implements SuiteAction {
@Override
void run() {
try{
def result = doRun(context.conn)
def result = doRun(context.getConnection())
if (check != null) {
check.call(result.result, result.exception, result.startTime, result.endTime)
} else {
@ -131,7 +133,7 @@ class TestAction implements SuiteAction {
this.exception = exceptionMsgSupplier.call()
}
void check(Closure check) {
void check(@ClosureParams(value = FromString, options = ["String,Throwable,Long,Long"]) Closure check) {
this.check = check
}

View File

@ -17,9 +17,10 @@
package org.apache.doris.regression.suite
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import groovy.json.JsonSlurper
import groovy.util.logging.Slf4j
import com.google.common.collect.ImmutableList
import org.apache.doris.regression.util.DataUtils
import org.apache.doris.regression.util.OutputUtils
@ -29,14 +30,28 @@ import org.apache.doris.regression.action.SuiteAction
import org.apache.doris.regression.action.TestAction
import org.apache.doris.regression.util.JdbcUtils
import org.junit.jupiter.api.Assertions
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.Callable
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicBoolean
import java.util.stream.Collectors
import java.util.stream.LongStream
import static org.apache.doris.regression.util.DataUtils.sortByToString
@Slf4j
abstract class Suite extends Script implements GroovyInterceptable {
SuiteContext context
String name
String group
final Logger logger = LoggerFactory.getLogger(getClass())
final List<Closure> successCallbacks = new Vector<>()
final List<Closure> failCallbacks = new Vector<>()
final List<Closure> finishCallbacks = new Vector<>()
final List<Throwable> lazyCheckExceptions = new Vector<>()
final List<Future> lazyCheckFutures = new Vector<>()
void init(String name, String group, SuiteContext context) {
this.name = name
@ -59,6 +74,26 @@ abstract class Suite extends Script implements GroovyInterceptable {
return p
}
void onSuccess(Closure callback) {
successCallbacks.add(callback)
}
void onFail(Closure callback) {
failCallbacks.add(callback)
}
void onFinish(Closure callback) {
finishCallbacks.add(callback)
}
LongStream range(long startInclusive, long endExclusive) {
return LongStream.range(startInclusive, endExclusive)
}
LongStream rangeClosed(long startInclusive, long endInclusive) {
return LongStream.rangeClosed(startInclusive, endInclusive)
}
String toCsv(List<Object> rows) {
StringBuilder sb = new StringBuilder()
for (int i = 0; i < rows.size(); ++i) {
@ -76,16 +111,79 @@ abstract class Suite extends Script implements GroovyInterceptable {
return jsonSlurper.parseText(str)
}
Object sql(String sqlStr, boolean isOrder = false) {
log.info("Execute sql: ${sqlStr}".toString())
def result = JdbcUtils.executeToList(context.conn, sqlStr)
public <T> T lazyCheck(Closure<T> closure) {
try {
T result = closure.call()
if (result instanceof Future) {
lazyCheckFutures.add(result)
}
return result
} catch (Throwable t) {
lazyCheckExceptions.add(t)
return null
}
}
void doLazyCheck() {
if (!lazyCheckExceptions.isEmpty()) {
throw lazyCheckExceptions.get(0)
}
lazyCheckFutures.forEach { it.get() }
}
public <T> Tuple2<T, Long> timer(Closure<T> actionSupplier) {
long startTime = System.currentTimeMillis()
T result = actionSupplier.call()
long endTime = System.currentTimeMillis()
return [result, endTime - startTime]
}
public <T> ListenableFuture<T> thread(String threadName = null, Closure<T> actionSupplier) {
return MoreExecutors.listeningDecorator(context.executorService).submit((Callable<T>) {
def originThreadName = Thread.currentThread().name
try {
Thread.currentThread().setName(threadName == null ? originThreadName : threadName)
return actionSupplier.call()
} finally {
try {
context.closeThreadLocal()
} catch (Throwable t) {
logger.warn("Close thread local context failed", t)
}
Thread.currentThread().setName(originThreadName)
}
})
}
public <T> ListenableFuture<T> lazyCheckThread(String threadName = null, Closure<T> actionSupplier) {
return lazyCheck {
thread(threadName, actionSupplier)
}
}
public <T> ListenableFuture<T> combineFutures(ListenableFuture<T> ... futures) {
return Futures.allAsList(futures)
}
public <T> ListenableFuture<List<T>> combineFutures(Iterable<? extends ListenableFuture<? extends T>> futures) {
return Futures.allAsList(futures)
}
public <T> T connect(String user = context.config.jdbcUser, String password = context.config.jdbcPassword,
String url = context.config.jdbcUrl, Closure<T> actionSupplier) {
return context.connect(user, password, url, actionSupplier)
}
List<List<Object>> sql(String sqlStr, boolean isOrder = false) {
logger.info("Execute sql: ${sqlStr}".toString())
def result = JdbcUtils.executeToList(context.getConnection(), sqlStr)
if (isOrder) {
result = DataUtils.sortByToString(result)
}
return result
}
Object order_sql(String sqlStr) {
List<List<Object>> order_sql(String sqlStr) {
return sql(sqlStr, true)
}
@ -96,6 +194,46 @@ abstract class Suite extends Script implements GroovyInterceptable {
return DataUtils.sortByToString(result)
}
String selectUnionAll(List list) {
def toSelectString = { Object value ->
if (value == null) {
return "null"
} else if (value instanceof Number) {
return value.toString()
} else {
return "'${value.toString()}'".toString()
}
}
AtomicBoolean isFirst = new AtomicBoolean(true)
String sql = list.stream()
.map({ row ->
StringBuilder sb = new StringBuilder("SELECT ")
if (row instanceof List) {
if (isFirst.get()) {
String columns = row.withIndex().collect({ column, index ->
"${toSelectString(column)} AS c${index + 1}"
}).join(", ")
sb.append(columns)
isFirst.set(false)
} else {
String columns = row.collect({ column ->
"${toSelectString(column)}"
}).join(", ")
sb.append(columns)
}
} else {
if (isFirst.get()) {
sb.append(toSelectString(row)).append(" AS c1")
isFirst.set(false)
} else {
sb.append(toSelectString(row))
}
}
return sb.toString()
}).collect(Collectors.joining("\nUNION ALL\n"))
return sql
}
void explain(Closure actionSupplier) {
runAction(new ExplainAction(context), actionSupplier)
}
@ -116,10 +254,10 @@ abstract class Suite extends Script implements GroovyInterceptable {
}
void quickTest(String tag, String sql, boolean order = false) {
log.info("Execute tag: ${tag}, sql: ${sql}".toString())
logger.info("Execute tag: ${tag}, sql: ${sql}".toString())
if (context.config.generateOutputFile || context.config.forceGenerateOutputFile) {
def result = JdbcUtils.executorToStringList(context.conn, sql)
def result = JdbcUtils.executorToStringList(context.getConnection(), sql)
if (order) {
result = sortByToString(result)
}
@ -128,40 +266,37 @@ abstract class Suite extends Script implements GroovyInterceptable {
def writer = context.getOutputWriter(context.config.forceGenerateOutputFile)
writer.write(realResults, tag)
} else {
if (context.outputIterator == null) {
if (!context.outputFile.exists()) {
String res = "Missing outputFile: ${context.outputFile.getAbsolutePath()}"
List excelContentList = [context.file.getName(), context.file, context.file, res]
context.recorder.reportDiffResult(excelContentList)
throw new IllegalStateException("Missing outputFile: ${context.outputFile.getAbsolutePath()}")
}
if (!context.outputIterator.hasNext()) {
if (!context.getOutputIterator().hasNextTagBlock(tag)) {
String res = "Missing output block for tag '${tag}': ${context.outputFile.getAbsolutePath()}"
List excelContentList = [context.file.getName(), tag, context.file, res]
context.recorder.reportDiffResult(excelContentList)
throw new IllegalStateException("Missing output block for tag '${tag}': ${context.outputFile.getAbsolutePath()}")
}
OutputUtils.TagBlockIterator expectCsvResults = context.getOutputIterator().next()
List<List<Object>> realResults = JdbcUtils.executorToStringList(context.getConnection(), sql)
if (order) {
realResults = sortByToString(realResults)
}
String errorMsg = null
try {
Iterator<List<Object>> expectCsvResults = context.outputIterator.next() as Iterator
List<List<Object>> realResults = JdbcUtils.executorToStringList(context.conn, sql)
if (order) {
realResults = sortByToString(realResults)
}
def res = OutputUtils.assertEquals(expectCsvResults, realResults.iterator(), "Tag '${tag}' wrong")
if (res) {
List excelContentList = [context.file.getName(), tag, sql.trim(), res]
context.recorder.reportDiffResult(excelContentList)
throw new IllegalStateException("'${tag}' line not match . Detailed results is : '${res}'")
}
errorMsg = OutputUtils.checkOutput(expectCsvResults, realResults.iterator(), "Check tag '${tag}' failed")
} catch (Throwable t) {
if (t.toString().contains('line not match . Detailed results is')) {
throw t
} else {
List excelContentList = [context.file.getName(), tag, sql.trim(), t]
context.recorder.reportDiffResult(excelContentList)
throw new IllegalStateException("'${tag}' run failed . Detailed failure information is : '${t}'", t)
}
List excelContentList = [context.file.getName(), tag, sql.trim(), t]
context.recorder.reportDiffResult(excelContentList)
throw new IllegalStateException("Check tag '${tag}' failed", t)
}
if (errorMsg != null) {
List excelContentList = [context.file.getName(), tag, sql.trim(), errorMsg]
context.recorder.reportDiffResult(excelContentList)
throw new IllegalStateException(errorMsg)
}
}
}
@ -182,15 +317,12 @@ abstract class Suite extends Script implements GroovyInterceptable {
return this."$realMethod"(*args)
} catch (Throwable t) {
// do nothing
return null
}
} else {
// invoke origin method
return metaClass.invokeMethod(this, name, args)
}
}
private Object invokeAssertions(String name, Object args) {
}
}

View File

@ -22,42 +22,77 @@ import org.apache.doris.regression.Config
import org.apache.doris.regression.util.OutputUtils
import org.apache.doris.regression.util.Recorder
import groovy.util.logging.Slf4j
import org.apache.doris.regression.util.CloseableIterator
import java.sql.Connection
import java.sql.DriverManager
import java.util.concurrent.ExecutorService
@Slf4j
@CompileStatic
class SuiteContext implements Closeable {
public final File file
public final Connection conn
private final Connection conn
public final ThreadLocal<Connection> threadLocalConn = new ThreadLocal<>()
public final Config config
public final File dataPath
public final File outputFile
public final ThreadLocal<OutputUtils.OutputBlocksIterator> threadLocalOutputIterator = new ThreadLocal<>()
public final ExecutorService executorService
public final Recorder recorder
// public final File tmpOutputPath
public final CloseableIterator<Iterator<List<String>>> outputIterator
private volatile OutputUtils.OutputBlocksWriter outputBlocksWriter
SuiteContext(File file, Connection conn, Config config, Recorder recorder) {
SuiteContext(File file, Connection conn, ExecutorService executorService, Config config, Recorder recorder) {
this.file = file
this.conn = conn
this.config = config
this.executorService = executorService
this.recorder = recorder
def path = new File(config.suitePath).relativePath(file)
def outputRelativePath = path.substring(0, path.lastIndexOf(".")) + ".out"
this.outputFile = new File(new File(config.dataPath), outputRelativePath)
this.dataPath = this.outputFile.getParentFile().getCanonicalFile()
if (!config.otherConfigs.getProperty("qt.generate.out", "false").toBoolean()
&& outputFile.exists()) {
this.outputIterator = OutputUtils.iterator(outputFile)
}
// def dataParentPath = new File(config.dataPath).parentFile.absolutePath
// def tmpOutputPath = "${dataParentPath}/tmp_output/${outputRelativePath}".toString()
// this.tmpOutputPath = new File(tmpOutputPath)
}
Connection getConnection() {
def threadConn = threadLocalConn.get()
if (threadConn != null) {
return threadConn
}
return this.conn
}
public <T> T connect(String user, String password, String url, Closure<T> actionSupplier) {
def originConnection = threadLocalConn.get()
try {
log.info("Create new connection for user '${user}'")
return DriverManager.getConnection(url, user, password).withCloseable { newConn ->
threadLocalConn.set(newConn)
return actionSupplier.call()
}
} finally {
log.info("Recover original connection")
if (originConnection == null) {
threadLocalConn.remove()
} else {
threadLocalConn.set(originConnection)
}
}
}
OutputUtils.OutputBlocksIterator getOutputIterator() {
def outputIt = threadLocalOutputIterator.get()
if (outputIt == null) {
outputIt = OutputUtils.iterator(outputFile)
threadLocalOutputIterator.set(outputIt)
}
return outputIt
}
OutputUtils.OutputBlocksWriter getOutputWriter(boolean deleteIfExist) {
if (outputBlocksWriter != null) {
return outputBlocksWriter
@ -84,15 +119,17 @@ class SuiteContext implements Closeable {
}
}
void closeThreadLocal() {
def outputIterator = threadLocalOutputIterator.get()
if (outputIterator != null) {
outputIterator.close()
threadLocalOutputIterator.remove()
}
}
@Override
void close() {
if (outputIterator != null) {
try {
outputIterator.close()
} catch (Throwable t) {
log.warn("Close outputFile failed", t)
}
}
closeThreadLocal()
if (outputBlocksWriter != null) {
outputBlocksWriter.close()

View File

@ -26,7 +26,7 @@ import org.apache.commons.io.LineIterator
@CompileStatic
class OutputUtils {
static toCsvString(List<Object> row) {
static String toCsvString(List<Object> row) {
StringWriter writer = new StringWriter()
def printer = new CSVPrinter(new PrintWriter(writer), CSVFormat.MYSQL)
for (int i = 0; i < row.size(); ++i) {
@ -35,17 +35,13 @@ class OutputUtils {
return writer.toString()
}
static assertEquals(Iterator<List<String>> expect, Iterator<List<Object>> real, String info) {
static String checkOutput(Iterator<List<String>> expect, Iterator<List<Object>> real, String info) {
while (true) {
if (expect.hasNext() && !real.hasNext()) {
def res = "${info}, line not match, real line is empty, but expect is ${expect.next()}"
return res
// throw new IllegalStateException("${info}, line not match, real line is empty, but expect is ${expect.next()}")
return "${info}, result mismatch, real line is empty, but expect is ${expect.next()}"
}
if (!expect.hasNext() && real.hasNext()) {
def res = "${info}, line not match, expect line is empty, but real is ${toCsvString(real.next())}"
return res
// throw new IllegalStateException("${info}, line not match, expect line is empty, but real is ${toCsvString(real.next())}")
return "${info}, result mismatch, expect line is empty, but real is ${toCsvString(real.next())}"
}
if (!expect.hasNext() && !real.hasNext()) {
break
@ -54,14 +50,12 @@ class OutputUtils {
def expectCsvString = toCsvString(expect.next() as List<Object>)
def realCsvString = toCsvString(real.next())
if (!expectCsvString.equals(realCsvString)) {
def res = "${info}, line not match.\nExpect line is: ${expectCsvString}\nBut real is : ${realCsvString}"
return res
// throw new IllegalStateException("${info}, line not match.\nExpect line is: ${expectCsvString}\nBut real is : ${realCsvString}")
return "${info}, result mismatch.\nExpect line is: ${expectCsvString}\nBut real is : ${realCsvString}"
}
}
}
static CloseableIterator<Iterator<List<String>>> iterator(File file) {
static OutputBlocksIterator iterator(File file) {
def it = new ReusableIterator<String>(new LineIteratorAdaptor(new LineIterator(new FileReader(file))))
return new OutputBlocksIterator(it)
}
@ -103,7 +97,7 @@ class OutputUtils {
}
}
void write(Iterator<List<String>> real, String comment) {
synchronized void write(Iterator<List<String>> real, String comment) {
if (writer != null) {
writer.println("-- !${comment} --")
while (real.hasNext()) {
@ -113,16 +107,40 @@ class OutputUtils {
}
}
void close() {
synchronized void close() {
if (writer != null) {
writer.close()
}
}
}
static class OutputBlocksIterator implements CloseableIterator<Iterator<List<String>>> {
static class TagBlockIterator implements Iterator<List<String>> {
private final String tag
private Iterator<List<String>> it
TagBlockIterator(String tag, Iterator<List<String>> it) {
this.tag = tag
this.it = it
}
String getTag() {
return tag
}
@Override
boolean hasNext() {
return it.hasNext()
}
@Override
List<String> next() {
return it.next()
}
}
static class OutputBlocksIterator implements CloseableIterator<TagBlockIterator> {
private ReusableIterator<String> lineIt
private CsvParserIterator cache
private TagBlockIterator cache
private boolean cached
OutputBlocksIterator(ReusableIterator<String> lineIt) {
@ -146,17 +164,21 @@ class OutputUtils {
return false
}
String tag = null
// find next comment block
while (true) {
String blockComment = lineIt.next() // skip block comment, e.g. -- !qt_sql_1 --
if (blockComment.startsWith("-- !") && blockComment.endsWith(" --")) {
if (blockComment.startsWith("-- !")) {
tag = blockComment.substring("-- !".length(), blockComment.length() - " --".length()).trim()
}
break
}
if (!lineIt.hasNext()) {
return false
}
}
cache = new CsvParserIterator(new SkipLastEmptyLineIterator(new OutputBlockIterator(lineIt)))
cache = new TagBlockIterator(tag, new CsvParserIterator(new SkipLastEmptyLineIterator(new OutputBlockIterator(lineIt))))
cached = true
return true
} else {
@ -164,8 +186,23 @@ class OutputUtils {
}
}
boolean hasNextTagBlock(String tag) {
while (hasNext()) {
if (Objects.equals(tag, cache.tag)) {
return true
}
// drain out
def it = next()
while (it.hasNext()) {
it.next()
}
}
return false
}
@Override
Iterator<List<String>> next() {
TagBlockIterator next() {
if (hasNext()) {
cached = false
return cache

View File

@ -21,8 +21,8 @@ import groovy.transform.CompileStatic
@CompileStatic
class Recorder {
public final List<SuiteInfo> successList = new ArrayList<>()
public final List<SuiteInfo> failureList = new ArrayList<>()
public final List<SuiteInfo> successList = new Vector<>()
public final List<SuiteInfo> failureList = new Vector<>()
void onSuccess(SuiteInfo suiteInfo) {
successList.add(suiteInfo)

View File

@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// GDSL guide: https://confluence.jetbrains.com/display/GRVY/Scripting+IDE+for+DSL+awareness
def suiteContext = context(
pathRegexp: ".*/regression-test/suites/.*",
filetypes: ["groovy"]
)
def suiteClassName = "org.apache.doris.regression.suite.Suite"
def bindAction = { actionName, actionClassName ->
def closureBody = context(scope: closureScope(isArg: false))
contributor([closureBody]) {
if (enclosingCall(actionName)) {
def actionClass = findClass(actionClassName)
delegatesTo(actionClass)
}
}
}
bindAction("test", "org.apache.doris.regression.action.TestAction")
bindAction("explain", "org.apache.doris.regression.action.ExplainAction")
bindAction("streamLoad", "org.apache.doris.regression.action.StreamLoadAction")
// bind qt_xxx and order_qt_xxx methods
contributor([suiteContext]) {
def place = getPlace()
if (place == null || !place.getClass().getName().contains("GrReferenceExpressionImpl")) {
return
}
def invokeMethodName = place.getQualifiedReferenceName()
if (invokeMethodName == null) {
return
}
if (invokeMethodName.startsWith("qt_") || invokeMethodName.startsWith("order_qt_")) {
def suiteClass = findClass(suiteClassName)
def quickTestMethods = suiteClass.findMethodsByName("quickTest")
method(name: invokeMethodName, bindsTo: quickTestMethods[0])
}
}
contributor([suiteContext]) {
// bind assertXxx
def assertionsClass = findClass("org.junit.jupiter.api.Assertions")
delegatesTo(assertionsClass)
if (enclosingCall("check") ||
(!enclosingCall("test") &&
!enclosingCall("explain") &&
!enclosingCall("streamLoad"))) {
// bind other suite method and field
def suiteClass = findClass(suiteClassName)
delegatesTo(suiteClass)
// bind try_xxx
suiteClass.methods.each { m ->
if (m.isConstructor()) {
return
}
def parameters = m.getParameterList().getParameters().collectEntries { p ->
[p.name, p.getType().getPresentableText()]
}
def returnType = m.returnType.getPresentableText()
method(name: "try_${m.name}", bindsTo: m, params: parameters, type: returnType)
}
}
}

View File

@ -0,0 +1,16 @@
def result1 = connect(user = 'admin', password = context.config.jdbcPassword, url = context.config.jdbcUrl) {
// execute sql with admin user
sql 'select 99 + 1'
}
// if not specify <user, password, url>, it will be set to context.config.jdbc<User, Password, Url>
//
// user: 'root'
// password: context.config.jdbcPassword
// url: context.config.jdbcUrl
def result2 = connect('root') {
// execute sql with root user
sql 'select 50 + 50'
}
assertEquals(result1, result2)

View File

@ -0,0 +1,40 @@
def createTable = { tableName ->
sql """
create table ${tableName}
(id int)
distributed by hash(id)
properties
(
"replication_num"="1"
)
"""
}
def tableName = "test_events_table1"
createTable(tableName)
// lazy drop table when execute this suite finished
onFinish {
try_sql "drop table if exists ${tableName}"
}
// all event: success, fail, finish
// and you can listen event multiple times
onSuccess {
try_sql "drop table if exists ${tableName}"
}
onSuccess {
try_sql "drop table if exists ${tableName}_not_exist"
}
onFail {
try_sql "drop table if exists ${tableName}"
}
onFail {
try_sql "drop table if exists ${tableName}_not_exist"
}

View File

@ -0,0 +1,33 @@
/***** 1. lazy check exceptions *****/
// will not throw exception immediately
def result = lazyCheck {
sql "a b c d e d" // syntax error
}
assertTrue(result == null)
result = lazyCheck {
sql "select 100"
}
assertEquals(result[0][0], 100)
logger.info("You will see this log")
// if you not clear the lazyCheckExceptions, and then,
// after this suite execute finished, the syntax error in the lazyCheck action will be thrown.
lazyCheckExceptions.clear()
/***** 2. lazy check futures *****/
// start new thread and lazy check future
def futureResult = lazyCheckThread {
sql "a b c d e d"
}
assertTrue(futureResult instanceof java.util.concurrent.Future)
logger.info("You will see this log too")
// if you not clear the lazyCheckFutures, and then,
// after this suite execute finished, the syntax error in the lazyCheckThread action will be thrown.
lazyCheckFutures.clear()

View File

@ -0,0 +1,19 @@
// 3 rows and 1 column
def rows = [3, 1, 10]
order_qt_select_union_all1 """
select c1
from
(
${selectUnionAll(rows)}
) a
"""
// 3 rows and 2 columns
rows = [[1, "123"], [2, null], [0, "abc"]]
order_qt_select_union_all2 """
select c1, c2
from
(
${selectUnionAll(rows)}
) b
"""

View File

@ -48,6 +48,7 @@ try {
* return xxx(args)
* } catch (Throwable t) {
* // do nothing
* return null
* }
*/
try_sql("DROP TABLE IF EXISTS ${testTable}")
@ -74,6 +75,7 @@ def list = order_sql """
union all
select 3
"""
assertEquals(null, list[0][0])
assertEquals(1, list[1][0])
assertEquals(15, list[2][0])

View File

@ -35,7 +35,9 @@ streamLoad {
// stream load 100 rows
def rowCount = 100
def rowIt = java.util.stream.LongStream.range(0, rowCount) // [0, rowCount)
// range: [0, rowCount)
// or rangeClosed: [0, rowCount]
def rowIt = range(0, rowCount)
.mapToObj({i -> [i, "a_" + i]}) // change Long to List<Long, String>
.iterator()

View File

@ -0,0 +1,48 @@
def (_, elapsedMillis) = timer {
/**
* the default max thread num is 10, you can specify by 'actionParallel' param.
* e.g. ./run-regression-test.sh --run someSuite -actionParallel 10
*/
def future1 = thread("threadName1") {
sleep(200)
sql"select 1"
}
// create new thread but not specify name
def future2 = thread {
sleep(200)
sql "select 2"
}
def future3 = thread("threadName3") {
sleep(200)
sql "select 3"
}
def future4 = thread {
sleep(200)
sql "select 4"
}
// equals to combineFutures([future1, future2, future3, future4]), which [] is a Iterable<ListenableFuture>
def combineFuture = combineFutures(future1, future2, future3, future4)
// or you can use lazyCheckThread action(see lazyCheck_action.groovy), and not have to check exception from futures.
List<List<List<Object>>> result = combineFuture.get()
assertEquals(result[0][0][0], 1)
assertEquals(result[1][0][0], 2)
assertEquals(result[2][0][0], 3)
assertEquals(result[3][0][0], 4)
}
assertTrue(elapsedMillis < 600)
// you can use qt action in thread action, and you **MUST** specify different tag,
// testing framework can compare different qt result in different order.
lazyCheckThread {
sleep(100)
qt_diffrent_tag1 "select 100"
}
lazyCheckThread("lazyCheckThread2") {
qt_diffrent_tag2 "select 100"
}

View File

@ -0,0 +1,7 @@
def (sumResult, elapsedMillis) = timer {
long sum = 0
(1..10000).each {sum += it}
sum // return value
}
logger.info("sum: ${sumResult}, elapsed: ${elapsedMillis} ms")