branch-2.1: (fix)[case] add fast fail to avoid routine load case hang (#53970)

### What problem does this PR solve?

Issue Number: close #xxx

Related PR: #53214
This commit is contained in:
MoanasDaddyXu
2025-08-01 11:26:42 +08:00
committed by GitHub
parent 3e0543a490
commit 72bde71602
15 changed files with 396 additions and 0 deletions

View File

@ -59,9 +59,35 @@ suite("test_multi_table_load_eror","nonConcurrent") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// add timeout config
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
// check conenction
def verifyKafkaConnection = { prod ->
try {
logger.info("=====try to connect Kafka========")
def partitions = prod.partitionsFor("__connection_verification_topic")
return partitions != null
} catch (Exception e) {
throw new Exception("Kafka connect fail: ${e.message}".toString())
}
}
// Create kafka producer
def producer = new KafkaProducer<>(props)
try {
logger.info("Kafka connecting: ${kafka_broker}")
if (!verifyKafkaConnection(producer)) {
throw new Exception("can't get any kafka info")
}
} catch (Exception e) {
logger.error("FATAL: " + e.getMessage())
producer.close()
throw e
}
logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()

View File

@ -36,9 +36,35 @@ suite("test_out_of_range","nonConcurrent") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// add timeout config
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
// check conenction
def verifyKafkaConnection = { prod ->
try {
logger.info("=====try to connect Kafka========")
def partitions = prod.partitionsFor("__connection_verification_topic")
return partitions != null
} catch (Exception e) {
throw new Exception("Kafka connect fail: ${e.message}".toString())
}
}
// Create kafka producer
def producer = new KafkaProducer<>(props)
try {
logger.info("Kafka connecting: ${kafka_broker}")
if (!verifyKafkaConnection(producer)) {
throw new Exception("can't get any kafka info")
}
} catch (Exception e) {
logger.error("FATAL: " + e.getMessage())
producer.close()
throw e
}
logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()

View File

@ -182,9 +182,35 @@ suite("test_routine_load","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// add timeout config
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
// check conenction
def verifyKafkaConnection = { prod ->
try {
logger.info("=====try to connect Kafka========")
def partitions = prod.partitionsFor("__connection_verification_topic")
return partitions != null
} catch (Exception e) {
throw new Exception("Kafka connect fail: ${e.message}".toString())
}
}
// Create kafka producer
def producer = new KafkaProducer<>(props)
try {
logger.info("Kafka connecting: ${kafka_broker}")
if (!verifyKafkaConnection(producer)) {
throw new Exception("can't get any kafka info")
}
} catch (Exception e) {
logger.error("FATAL: " + e.getMessage())
producer.close()
throw e
}
logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()

View File

@ -36,9 +36,35 @@ suite("test_routine_load_condition","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// add timeout config
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
// check conenction
def verifyKafkaConnection = { prod ->
try {
logger.info("=====try to connect Kafka========")
def partitions = prod.partitionsFor("__connection_verification_topic")
return partitions != null
} catch (Exception e) {
throw new Exception("Kafka connect fail: ${e.message}".toString())
}
}
// Create kafka producer
def producer = new KafkaProducer<>(props)
try {
logger.info("Kafka connecting: ${kafka_broker}")
if (!verifyKafkaConnection(producer)) {
throw new Exception("can't get any kafka info")
}
} catch (Exception e) {
logger.error("FATAL: " + e.getMessage())
producer.close()
throw e
}
logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()

View File

@ -37,9 +37,35 @@ suite("test_routine_load_eof","nonConcurrent") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// add timeout config
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
// check conenction
def verifyKafkaConnection = { prod ->
try {
logger.info("=====try to connect Kafka========")
def partitions = prod.partitionsFor("__connection_verification_topic")
return partitions != null
} catch (Exception e) {
throw new Exception("Kafka connect fail: ${e.message}".toString())
}
}
// Create kafka producer
def producer = new KafkaProducer<>(props)
try {
logger.info("Kafka connecting: ${kafka_broker}")
if (!verifyKafkaConnection(producer)) {
throw new Exception("can't get any kafka info")
}
} catch (Exception e) {
logger.error("FATAL: " + e.getMessage())
producer.close()
throw e
}
logger.info("Kafka connect success")
def count = 0
while(true) {
Thread.sleep(1000)

View File

@ -40,9 +40,35 @@ suite("test_routine_load_error","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// add timeout config
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
// check conenction
def verifyKafkaConnection = { prod ->
try {
logger.info("=====try to connect Kafka========")
def partitions = prod.partitionsFor("__connection_verification_topic")
return partitions != null
} catch (Exception e) {
throw new Exception("Kafka connect fail: ${e.message}".toString())
}
}
// Create kafka producer
def producer = new KafkaProducer<>(props)
try {
logger.info("Kafka connecting: ${kafka_broker}")
if (!verifyKafkaConnection(producer)) {
throw new Exception("can't get any kafka info")
}
} catch (Exception e) {
logger.error("FATAL: " + e.getMessage())
producer.close()
throw e
}
logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()

View File

@ -36,7 +36,35 @@ suite("test_routine_load_error_info","nonConcurrent") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// add timeout config
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
// check conenction
def verifyKafkaConnection = { prod ->
try {
logger.info("=====try to connect Kafka========")
def partitions = prod.partitionsFor("__connection_verification_topic")
return partitions != null
} catch (Exception e) {
throw new Exception("Kafka connect fail: ${e.message}".toString())
}
}
// Create kafka producer
def producer = new KafkaProducer<>(props)
try {
logger.info("Kafka connecting: ${kafka_broker}")
if (!verifyKafkaConnection(producer)) {
throw new Exception("can't get any kafka info")
}
} catch (Exception e) {
logger.error("FATAL: " + e.getMessage())
producer.close()
throw e
}
logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()

View File

@ -17,6 +17,8 @@
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.producer.ProducerConfig
suite("test_routine_load_jsonpath_dollar", "p0") {
def tableName = "test_routine_load_jsonpath_dollar"
@ -25,6 +27,7 @@ suite("test_routine_load_jsonpath_dollar", "p0") {
String enabled = context.config.otherConfigs.get("enableKafkaTest")
String kafka_port = context.config.otherConfigs.get("kafka_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
def kafka_broker = "${externalEnvIp}:${kafka_port}"
if (enabled != null && enabled.equalsIgnoreCase("true")) {
// Send test data to Kafka
@ -32,8 +35,35 @@ suite("test_routine_load_jsonpath_dollar", "p0") {
props.put("bootstrap.servers", "${externalEnvIp}:${kafka_port}".toString())
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// add timeout config
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
// check conenction
def verifyKafkaConnection = { prod ->
try {
logger.info("=====try to connect Kafka========")
def partitions = prod.partitionsFor("__connection_verification_topic")
return partitions != null
} catch (Exception e) {
throw new Exception("Kafka connect fail: ${e.message}".toString())
}
}
// Create kafka producer
def producer = new KafkaProducer<>(props)
try {
logger.info("Kafka connecting: ${kafka_broker}")
if (!verifyKafkaConnection(producer)) {
throw new Exception("can't get any kafka info")
}
} catch (Exception e) {
logger.error("FATAL: " + e.getMessage())
producer.close()
throw e
}
logger.info("Kafka connect success")
def kafkaJson = new File("""${context.file.parent}/data/${jobName}.json""").text
def lines = kafkaJson.readLines()
lines.each { line ->

View File

@ -38,9 +38,35 @@ suite("test_routine_load_metrics","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// add timeout config
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
// check conenction
def verifyKafkaConnection = { prod ->
try {
logger.info("=====try to connect Kafka========")
def partitions = prod.partitionsFor("__connection_verification_topic")
return partitions != null
} catch (Exception e) {
throw new Exception("Kafka connect fail: ${e.message}".toString())
}
}
// Create kafka producer
def producer = new KafkaProducer<>(props)
try {
logger.info("Kafka connecting: ${kafka_broker}")
if (!verifyKafkaConnection(producer)) {
throw new Exception("can't get any kafka info")
}
} catch (Exception e) {
logger.error("FATAL: " + e.getMessage())
producer.close()
throw e
}
logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()

View File

@ -34,9 +34,35 @@ suite("test_routine_load_offset","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// add timeout config
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
// check conenction
def verifyKafkaConnection = { prod ->
try {
logger.info("=====try to connect Kafka========")
def partitions = prod.partitionsFor("__connection_verification_topic")
return partitions != null
} catch (Exception e) {
throw new Exception("Kafka connect fail: ${e.message}".toString())
}
}
// Create kafka producer
def producer = new KafkaProducer<>(props)
try {
logger.info("Kafka connecting: ${kafka_broker}")
if (!verifyKafkaConnection(producer)) {
throw new Exception("can't get any kafka info")
}
} catch (Exception e) {
logger.error("FATAL: " + e.getMessage())
producer.close()
throw e
}
logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()

View File

@ -36,9 +36,35 @@ suite("test_routine_load_property","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// add timeout config
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
// check conenction
def verifyKafkaConnection = { prod ->
try {
logger.info("=====try to connect Kafka========")
def partitions = prod.partitionsFor("__connection_verification_topic")
return partitions != null
} catch (Exception e) {
throw new Exception("Kafka connect fail: ${e.message}".toString())
}
}
// Create kafka producer
def producer = new KafkaProducer<>(props)
try {
logger.info("Kafka connecting: ${kafka_broker}")
if (!verifyKafkaConnection(producer)) {
throw new Exception("can't get any kafka info")
}
} catch (Exception e) {
logger.error("FATAL: " + e.getMessage())
producer.close()
throw e
}
logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()

View File

@ -37,9 +37,35 @@ suite("test_routine_load_schedule","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// add timeout config
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
// check conenction
def verifyKafkaConnection = { prod ->
try {
logger.info("=====try to connect Kafka========")
def partitions = prod.partitionsFor("__connection_verification_topic")
return partitions != null
} catch (Exception e) {
throw new Exception("Kafka connect fail: ${e.message}".toString())
}
}
// Create kafka producer
def producer = new KafkaProducer<>(props)
try {
logger.info("Kafka connecting: ${kafka_broker}")
if (!verifyKafkaConnection(producer)) {
throw new Exception("can't get any kafka info")
}
} catch (Exception e) {
logger.error("FATAL: " + e.getMessage())
producer.close()
throw e
}
logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()

View File

@ -36,9 +36,35 @@ suite("test_routine_load_topic_change","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// add timeout config
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
// check conenction
def verifyKafkaConnection = { prod ->
try {
logger.info("=====try to connect Kafka========")
def partitions = prod.partitionsFor("__connection_verification_topic")
return partitions != null
} catch (Exception e) {
throw new Exception("Kafka connect fail: ${e.message}".toString())
}
}
// Create kafka producer
def producer = new KafkaProducer<>(props)
try {
logger.info("Kafka connecting: ${kafka_broker}")
if (!verifyKafkaConnection(producer)) {
throw new Exception("can't get any kafka info")
}
} catch (Exception e) {
logger.error("FATAL: " + e.getMessage())
producer.close()
throw e
}
logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()

View File

@ -36,9 +36,35 @@ suite("test_routine_load_with_sc","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// add timeout config
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
// check conenction
def verifyKafkaConnection = { prod ->
try {
logger.info("=====try to connect Kafka========")
def partitions = prod.partitionsFor("__connection_verification_topic")
return partitions != null
} catch (Exception e) {
throw new Exception("Kafka connect fail: ${e.message}".toString())
}
}
// Create kafka producer
def producer = new KafkaProducer<>(props)
try {
logger.info("Kafka connecting: ${kafka_broker}")
if (!verifyKafkaConnection(producer)) {
throw new Exception("can't get any kafka info")
}
} catch (Exception e) {
logger.error("FATAL: " + e.getMessage())
producer.close()
throw e
}
logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()

View File

@ -34,9 +34,35 @@ suite("test_show_routine_load","p0") {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString())
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// add timeout config
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000")
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000")
// check conenction
def verifyKafkaConnection = { prod ->
try {
logger.info("=====try to connect Kafka========")
def partitions = prod.partitionsFor("__connection_verification_topic")
return partitions != null
} catch (Exception e) {
throw new Exception("Kafka connect fail: ${e.message}".toString())
}
}
// Create kafka producer
def producer = new KafkaProducer<>(props)
try {
logger.info("Kafka connecting: ${kafka_broker}")
if (!verifyKafkaConnection(producer)) {
throw new Exception("can't get any kafka info")
}
} catch (Exception e) {
logger.error("FATAL: " + e.getMessage())
producer.close()
throw e
}
logger.info("Kafka connect success")
for (String kafkaCsvTopic in kafkaCsvTpoics) {
def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
def lines = txt.readLines()