branch-2.1: [fix](regress) new thread should connect to cluster.jdbcUrl in docker regression suite #53234 (#53316)
cherry pick from #53234
This commit is contained in:
@ -706,13 +706,6 @@ class Config {
|
||||
return DriverManager.getConnection(jdbcUrl, 'root', '')
|
||||
}
|
||||
|
||||
Connection getConnectionByDbName(String dbName) {
|
||||
String dbUrl = buildUrlWithDb(jdbcUrl, dbName)
|
||||
tryCreateDbIfNotExist(dbName)
|
||||
log.info("connect to ${dbUrl}".toString())
|
||||
return DriverManager.getConnection(dbUrl, jdbcUser, jdbcPassword)
|
||||
}
|
||||
|
||||
Connection getConnectionByArrowFlightSqlDbName(String dbName) {
|
||||
Class.forName("org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver")
|
||||
String arrowFlightSqlHost = otherConfigs.get("extArrowFlightSqlHost")
|
||||
|
||||
@ -62,8 +62,12 @@ class WaitForAction implements SuiteAction{
|
||||
if (forRollUp) {
|
||||
num = 8
|
||||
}
|
||||
Awaitility.await().atMost(time, TimeUnit.SECONDS).with().pollDelay(100, TimeUnit.MILLISECONDS).and()
|
||||
.pollInterval(100, TimeUnit.MILLISECONDS).await().until(() -> {
|
||||
Awaitility
|
||||
.with().pollInSameThread()
|
||||
.await()
|
||||
.atMost(time, TimeUnit.SECONDS)
|
||||
.with().pollDelay(100, TimeUnit.MILLISECONDS).and()
|
||||
.pollInterval(100, TimeUnit.MILLISECONDS).await().until({
|
||||
log.info("sql is :\n${sql}")
|
||||
def (result, meta) = JdbcUtils.executeToList(context.getConnection(), sql)
|
||||
String res = result.get(0).get(num)
|
||||
|
||||
@ -262,6 +262,25 @@ class Suite implements GroovyInterceptable {
|
||||
return context.connect(user, password, url, actionSupplier)
|
||||
}
|
||||
|
||||
// delete 'dockerAwaitUntil', should call 'Awaitility.await()...' directly or use 'awaitUntil(..., f)'
|
||||
// public void dockerAwaitUntil(int atMostSeconds, int intervalSecond = 1, Closure actionSupplier) {
|
||||
// def connInfo = context.threadLocalConn.get()
|
||||
// Awaitility.await().atMost(atMostSeconds, SECONDS).pollInterval(intervalSecond, SECONDS).until(
|
||||
// {
|
||||
// connect(connInfo.username, connInfo.password, connInfo.conn.getMetaData().getURL(), actionSupplier)
|
||||
// }
|
||||
// )
|
||||
// }
|
||||
public void awaitUntil(int atMostSeconds, double intervalSecond = 1, Closure actionSupplier) {
|
||||
Awaitility
|
||||
.with().pollInSameThread()
|
||||
.await()
|
||||
.atMost(atMostSeconds, TimeUnit.SECONDS)
|
||||
.pollInterval((int) (1000 * intervalSecond), TimeUnit.MILLISECONDS)
|
||||
.until(actionSupplier)
|
||||
}
|
||||
|
||||
// more explaination can see example file: demo_p0/docker_action.groovy
|
||||
public void docker(ClusterOptions options = new ClusterOptions(), Closure actionSupplier) throws Exception {
|
||||
if (context.config.excludeDockerTest) {
|
||||
return
|
||||
@ -283,6 +302,8 @@ class Suite implements GroovyInterceptable {
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("=== start run suite {} in not_cloud mode. ===", name)
|
||||
def originConnection = context.threadLocalConn.get()
|
||||
try {
|
||||
cluster.destroy(true)
|
||||
cluster.init(options, dockerIsCloud)
|
||||
@ -308,18 +329,18 @@ class Suite implements GroovyInterceptable {
|
||||
|
||||
// wait be report
|
||||
Thread.sleep(5000)
|
||||
def url = String.format(
|
||||
"jdbc:mysql://%s:%s/?useLocalSessionState=false&allowLoadLocalInfile=false",
|
||||
def jdbcUrl = String.format(
|
||||
"jdbc:mysql://%s:%s/?useLocalSessionState=true&allowLoadLocalInfile=false",
|
||||
fe.host, fe.queryPort)
|
||||
def conn = DriverManager.getConnection(url, user, password)
|
||||
def sql = "CREATE DATABASE IF NOT EXISTS " + context.dbName
|
||||
logger.info("try create database if not exists {}", context.dbName)
|
||||
JdbcUtils.executeToList(conn, sql)
|
||||
url = Config.buildUrlWithDb(url, context.dbName)
|
||||
|
||||
logger.info("connect to docker cluster: suite={}, url={}", name, url)
|
||||
connect(user, password, url, actionSupplier)
|
||||
cluster.jdbcUrl = jdbcUrl
|
||||
context.threadLocalConn.remove()
|
||||
actionSupplier.call()
|
||||
} finally {
|
||||
if (originConnection == null) {
|
||||
context.threadLocalConn.remove()
|
||||
} else {
|
||||
context.threadLocalConn.set(originConnection)
|
||||
}
|
||||
if (!context.config.dockerEndNoKill) {
|
||||
cluster.destroy(context.config.dockerEndDeleteFiles)
|
||||
}
|
||||
@ -603,6 +624,10 @@ class Suite implements GroovyInterceptable {
|
||||
return context.dbName
|
||||
}
|
||||
|
||||
String getCurDbConnectUrl() {
|
||||
return Config.buildUrlWithDb(context.getJdbcUrl(), getCurDbName())
|
||||
}
|
||||
|
||||
long getDbId() {
|
||||
def dbInfo = sql "show proc '/dbs'"
|
||||
for(List<Object> row : dbInfo) {
|
||||
|
||||
@ -266,6 +266,8 @@ class SuiteCluster {
|
||||
|
||||
static final Logger logger = LoggerFactory.getLogger(this.class)
|
||||
|
||||
// dockerImpl() will set jdbcUrl
|
||||
String jdbcUrl = ""
|
||||
final String name
|
||||
final Config config
|
||||
private boolean running
|
||||
|
||||
@ -145,7 +145,7 @@ class SuiteContext implements Closeable {
|
||||
def threadConnInfo = threadLocalConn.get()
|
||||
if (threadConnInfo == null) {
|
||||
threadConnInfo = new ConnectionInfo()
|
||||
threadConnInfo.conn = config.getConnectionByDbName(dbName)
|
||||
threadConnInfo.conn = getConnectionByDbName(dbName)
|
||||
threadConnInfo.username = config.jdbcUser
|
||||
threadConnInfo.password = config.jdbcPassword
|
||||
threadLocalConn.set(threadConnInfo)
|
||||
@ -153,6 +153,31 @@ class SuiteContext implements Closeable {
|
||||
return threadConnInfo.conn
|
||||
}
|
||||
|
||||
Connection getConnectionByDbName(String dbName) {
|
||||
def jdbcUrl = getJdbcUrl()
|
||||
def jdbcConn = DriverManager.getConnection(jdbcUrl, config.jdbcUser, config.jdbcPassword)
|
||||
try {
|
||||
String sql = "CREATE DATABASE IF NOT EXISTS ${dbName}"
|
||||
log.info("Try to create db, sql: ${sql}".toString())
|
||||
if (!config.dryRun) {
|
||||
jdbcConn.withCloseable { conn -> JdbcUtils.executeToList(conn, sql) }
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
throw new IllegalStateException("Create database failed, jdbcUrl: ${jdbcUrl}", t)
|
||||
}
|
||||
def dbUrl = Config.buildUrlWithDb(jdbcUrl, dbName)
|
||||
log.info("connect to ${dbUrl}".toString())
|
||||
return DriverManager.getConnection(dbUrl, config.jdbcUser, config.jdbcPassword)
|
||||
}
|
||||
|
||||
String getJdbcUrl() {
|
||||
if (cluster.isRunning()) {
|
||||
return cluster.jdbcUrl
|
||||
} else {
|
||||
return config.jdbcUrl
|
||||
}
|
||||
}
|
||||
|
||||
// like getConnection, but connect to FE master
|
||||
Connection getMasterConnection() {
|
||||
def threadConnInfo = threadLocalMasterConn.get()
|
||||
|
||||
@ -17,6 +17,13 @@
|
||||
|
||||
import org.apache.doris.regression.suite.ClusterOptions
|
||||
|
||||
// Run docker suite steps:
|
||||
// 1. Read 'docker/runtime/doris-compose/Readme.md', make sure you can setup a doris docker cluster;
|
||||
// 2. update regression-conf-custom.groovy with config:
|
||||
// image = "xxxx" // your doris docker image
|
||||
// excludeDockerTest = false // do run docker suite, default is true
|
||||
// dockerEndDeleteFiles = false // after run docker suite, whether delete contains's log and data in directory '/tmp/doris/<suite-name>'
|
||||
|
||||
suite('docker_action', 'docker') {
|
||||
// run a new docker
|
||||
docker {
|
||||
|
||||
@ -155,7 +155,7 @@ suite("test_local_multi_segments_re_calc_in_publish", "docker") {
|
||||
|
||||
Thread.sleep(1000)
|
||||
do_streamload_2pc_commit(txnId)
|
||||
dockerAwaitUntil(30) {
|
||||
awaitUntil(30) {
|
||||
def result = sql_return_maparray "show transaction from ${dbName} where id = ${txnId}"
|
||||
result[0].TransactionStatus as String == "VISIBLE"
|
||||
}
|
||||
|
||||
@ -40,7 +40,7 @@ suite("test_clean_trash", "docker") {
|
||||
|
||||
def checkFunc = { boolean trashZero ->
|
||||
def succ = false
|
||||
dockerAwaitUntil(300) {
|
||||
awaitUntil(300) {
|
||||
def bes = sql_return_maparray """show backends"""
|
||||
succ = bes.every {
|
||||
if (trashZero) {
|
||||
|
||||
Reference in New Issue
Block a user