diff --git a/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy b/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy index 17a8e5da71..d81bddee12 100644 --- a/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy +++ b/regression-test/suites/load_p0/routine_load/test_multi_table_load_error.groovy @@ -59,9 +59,35 @@ suite("test_multi_table_load_eror","nonConcurrent") { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // add timeout config + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + + // check conenction + def verifyKafkaConnection = { prod -> + try { + logger.info("=====try to connect Kafka========") + def partitions = prod.partitionsFor("__connection_verification_topic") + return partitions != null + } catch (Exception e) { + throw new Exception("Kafka connect fail: ${e.message}".toString()) + } + } // Create kafka producer def producer = new KafkaProducer<>(props) + try { + logger.info("Kafka connecting: ${kafka_broker}") + if (!verifyKafkaConnection(producer)) { + throw new Exception("can't get any kafka info") + } + } catch (Exception e) { + logger.error("FATAL: " + e.getMessage()) + producer.close() + throw e + } + logger.info("Kafka connect success") + for (String kafkaCsvTopic in kafkaCsvTpoics) { def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text def lines = txt.readLines() diff --git a/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy b/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy index 1ae74b7330..12205801f9 100644 --- a/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy +++ b/regression-test/suites/load_p0/routine_load/test_out_of_range_error.groovy @@ -36,9 +36,35 @@ suite("test_out_of_range","nonConcurrent") { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // add timeout config + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + + // check conenction + def verifyKafkaConnection = { prod -> + try { + logger.info("=====try to connect Kafka========") + def partitions = prod.partitionsFor("__connection_verification_topic") + return partitions != null + } catch (Exception e) { + throw new Exception("Kafka connect fail: ${e.message}".toString()) + } + } // Create kafka producer def producer = new KafkaProducer<>(props) + try { + logger.info("Kafka connecting: ${kafka_broker}") + if (!verifyKafkaConnection(producer)) { + throw new Exception("can't get any kafka info") + } + } catch (Exception e) { + logger.error("FATAL: " + e.getMessage()) + producer.close() + throw e + } + logger.info("Kafka connect success") + for (String kafkaCsvTopic in kafkaCsvTpoics) { def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text def lines = txt.readLines() diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy index c8aef50bc7..b8eb7fa8ae 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load.groovy @@ -182,9 +182,35 @@ suite("test_routine_load","p0") { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // add timeout config + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + + // check conenction + def verifyKafkaConnection = { prod -> + try { + logger.info("=====try to connect Kafka========") + def partitions = prod.partitionsFor("__connection_verification_topic") + return partitions != null + } catch (Exception e) { + throw new Exception("Kafka connect fail: ${e.message}".toString()) + } + } // Create kafka producer def producer = new KafkaProducer<>(props) + try { + logger.info("Kafka connecting: ${kafka_broker}") + if (!verifyKafkaConnection(producer)) { + throw new Exception("can't get any kafka info") + } + } catch (Exception e) { + logger.error("FATAL: " + e.getMessage()) + producer.close() + throw e + } + logger.info("Kafka connect success") + for (String kafkaCsvTopic in kafkaCsvTpoics) { def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text def lines = txt.readLines() diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy index 7735867c74..f53a44cb1d 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_condition.groovy @@ -36,9 +36,35 @@ suite("test_routine_load_condition","p0") { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // add timeout config + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + + // check conenction + def verifyKafkaConnection = { prod -> + try { + logger.info("=====try to connect Kafka========") + def partitions = prod.partitionsFor("__connection_verification_topic") + return partitions != null + } catch (Exception e) { + throw new Exception("Kafka connect fail: ${e.message}".toString()) + } + } // Create kafka producer def producer = new KafkaProducer<>(props) + try { + logger.info("Kafka connecting: ${kafka_broker}") + if (!verifyKafkaConnection(producer)) { + throw new Exception("can't get any kafka info") + } + } catch (Exception e) { + logger.error("FATAL: " + e.getMessage()) + producer.close() + throw e + } + logger.info("Kafka connect success") + for (String kafkaCsvTopic in kafkaCsvTpoics) { def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text def lines = txt.readLines() diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy index ac0b08248e..b289545211 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy @@ -37,9 +37,35 @@ suite("test_routine_load_eof","nonConcurrent") { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // add timeout config + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + + // check conenction + def verifyKafkaConnection = { prod -> + try { + logger.info("=====try to connect Kafka========") + def partitions = prod.partitionsFor("__connection_verification_topic") + return partitions != null + } catch (Exception e) { + throw new Exception("Kafka connect fail: ${e.message}".toString()) + } + } // Create kafka producer def producer = new KafkaProducer<>(props) + try { + logger.info("Kafka connecting: ${kafka_broker}") + if (!verifyKafkaConnection(producer)) { + throw new Exception("can't get any kafka info") + } + } catch (Exception e) { + logger.error("FATAL: " + e.getMessage()) + producer.close() + throw e + } + logger.info("Kafka connect success") + def count = 0 while(true) { Thread.sleep(1000) diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy index 825752941d..7ddac94924 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error.groovy @@ -40,9 +40,35 @@ suite("test_routine_load_error","p0") { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // add timeout config + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + + // check conenction + def verifyKafkaConnection = { prod -> + try { + logger.info("=====try to connect Kafka========") + def partitions = prod.partitionsFor("__connection_verification_topic") + return partitions != null + } catch (Exception e) { + throw new Exception("Kafka connect fail: ${e.message}".toString()) + } + } // Create kafka producer def producer = new KafkaProducer<>(props) + try { + logger.info("Kafka connecting: ${kafka_broker}") + if (!verifyKafkaConnection(producer)) { + throw new Exception("can't get any kafka info") + } + } catch (Exception e) { + logger.error("FATAL: " + e.getMessage()) + producer.close() + throw e + } + logger.info("Kafka connect success") + for (String kafkaCsvTopic in kafkaCsvTpoics) { def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text def lines = txt.readLines() diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy index 2f018a9372..0b92612543 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_error_info.groovy @@ -36,7 +36,35 @@ suite("test_routine_load_error_info","nonConcurrent") { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // add timeout config + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + + // check conenction + def verifyKafkaConnection = { prod -> + try { + logger.info("=====try to connect Kafka========") + def partitions = prod.partitionsFor("__connection_verification_topic") + return partitions != null + } catch (Exception e) { + throw new Exception("Kafka connect fail: ${e.message}".toString()) + } + } + // Create kafka producer def producer = new KafkaProducer<>(props) + + try { + logger.info("Kafka connecting: ${kafka_broker}") + if (!verifyKafkaConnection(producer)) { + throw new Exception("can't get any kafka info") + } + } catch (Exception e) { + logger.error("FATAL: " + e.getMessage()) + producer.close() + throw e + } + logger.info("Kafka connect success") + for (String kafkaCsvTopic in kafkaCsvTpoics) { def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text def lines = txt.readLines() diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy index fed1236ca2..8763d61572 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_jsonpath_dollar.groovy @@ -17,6 +17,8 @@ import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.ProducerConfig suite("test_routine_load_jsonpath_dollar", "p0") { def tableName = "test_routine_load_jsonpath_dollar" @@ -25,6 +27,7 @@ suite("test_routine_load_jsonpath_dollar", "p0") { String enabled = context.config.otherConfigs.get("enableKafkaTest") String kafka_port = context.config.otherConfigs.get("kafka_port") String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" if (enabled != null && enabled.equalsIgnoreCase("true")) { // Send test data to Kafka @@ -32,8 +35,35 @@ suite("test_routine_load_jsonpath_dollar", "p0") { props.put("bootstrap.servers", "${externalEnvIp}:${kafka_port}".toString()) props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") + // add timeout config + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + + // check conenction + def verifyKafkaConnection = { prod -> + try { + logger.info("=====try to connect Kafka========") + def partitions = prod.partitionsFor("__connection_verification_topic") + return partitions != null + } catch (Exception e) { + throw new Exception("Kafka connect fail: ${e.message}".toString()) + } + } + // Create kafka producer def producer = new KafkaProducer<>(props) + try { + logger.info("Kafka connecting: ${kafka_broker}") + if (!verifyKafkaConnection(producer)) { + throw new Exception("can't get any kafka info") + } + } catch (Exception e) { + logger.error("FATAL: " + e.getMessage()) + producer.close() + throw e + } + logger.info("Kafka connect success") + def kafkaJson = new File("""${context.file.parent}/data/${jobName}.json""").text def lines = kafkaJson.readLines() lines.each { line -> diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy index bb1afb6dd3..1e26248708 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_metrics.groovy @@ -38,9 +38,35 @@ suite("test_routine_load_metrics","p0") { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // add timeout config + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + + // check conenction + def verifyKafkaConnection = { prod -> + try { + logger.info("=====try to connect Kafka========") + def partitions = prod.partitionsFor("__connection_verification_topic") + return partitions != null + } catch (Exception e) { + throw new Exception("Kafka connect fail: ${e.message}".toString()) + } + } // Create kafka producer def producer = new KafkaProducer<>(props) + try { + logger.info("Kafka connecting: ${kafka_broker}") + if (!verifyKafkaConnection(producer)) { + throw new Exception("can't get any kafka info") + } + } catch (Exception e) { + logger.error("FATAL: " + e.getMessage()) + producer.close() + throw e + } + logger.info("Kafka connect success") + for (String kafkaCsvTopic in kafkaCsvTpoics) { def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text def lines = txt.readLines() diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy index 84d0509cea..ebbd73127d 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_offset.groovy @@ -34,9 +34,35 @@ suite("test_routine_load_offset","p0") { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // add timeout config + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + + // check conenction + def verifyKafkaConnection = { prod -> + try { + logger.info("=====try to connect Kafka========") + def partitions = prod.partitionsFor("__connection_verification_topic") + return partitions != null + } catch (Exception e) { + throw new Exception("Kafka connect fail: ${e.message}".toString()) + } + } // Create kafka producer def producer = new KafkaProducer<>(props) + try { + logger.info("Kafka connecting: ${kafka_broker}") + if (!verifyKafkaConnection(producer)) { + throw new Exception("can't get any kafka info") + } + } catch (Exception e) { + logger.error("FATAL: " + e.getMessage()) + producer.close() + throw e + } + logger.info("Kafka connect success") + for (String kafkaCsvTopic in kafkaCsvTpoics) { def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text def lines = txt.readLines() diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy index 9cc1fa0d2d..40c8f40da3 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_property.groovy @@ -36,9 +36,35 @@ suite("test_routine_load_property","p0") { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // add timeout config + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + + // check conenction + def verifyKafkaConnection = { prod -> + try { + logger.info("=====try to connect Kafka========") + def partitions = prod.partitionsFor("__connection_verification_topic") + return partitions != null + } catch (Exception e) { + throw new Exception("Kafka connect fail: ${e.message}".toString()) + } + } // Create kafka producer def producer = new KafkaProducer<>(props) + try { + logger.info("Kafka connecting: ${kafka_broker}") + if (!verifyKafkaConnection(producer)) { + throw new Exception("can't get any kafka info") + } + } catch (Exception e) { + logger.error("FATAL: " + e.getMessage()) + producer.close() + throw e + } + logger.info("Kafka connect success") + for (String kafkaCsvTopic in kafkaCsvTpoics) { def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text def lines = txt.readLines() diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy index c8044ad140..4c75979d64 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_schedule.groovy @@ -37,9 +37,35 @@ suite("test_routine_load_schedule","p0") { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // add timeout config + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + + // check conenction + def verifyKafkaConnection = { prod -> + try { + logger.info("=====try to connect Kafka========") + def partitions = prod.partitionsFor("__connection_verification_topic") + return partitions != null + } catch (Exception e) { + throw new Exception("Kafka connect fail: ${e.message}".toString()) + } + } // Create kafka producer def producer = new KafkaProducer<>(props) + try { + logger.info("Kafka connecting: ${kafka_broker}") + if (!verifyKafkaConnection(producer)) { + throw new Exception("can't get any kafka info") + } + } catch (Exception e) { + logger.error("FATAL: " + e.getMessage()) + producer.close() + throw e + } + logger.info("Kafka connect success") + for (String kafkaCsvTopic in kafkaCsvTpoics) { def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text def lines = txt.readLines() diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy index 25bf9933d1..24b8a533b8 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_topic_change.groovy @@ -36,9 +36,35 @@ suite("test_routine_load_topic_change","p0") { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // add timeout config + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + + // check conenction + def verifyKafkaConnection = { prod -> + try { + logger.info("=====try to connect Kafka========") + def partitions = prod.partitionsFor("__connection_verification_topic") + return partitions != null + } catch (Exception e) { + throw new Exception("Kafka connect fail: ${e.message}".toString()) + } + } // Create kafka producer def producer = new KafkaProducer<>(props) + try { + logger.info("Kafka connecting: ${kafka_broker}") + if (!verifyKafkaConnection(producer)) { + throw new Exception("can't get any kafka info") + } + } catch (Exception e) { + logger.error("FATAL: " + e.getMessage()) + producer.close() + throw e + } + logger.info("Kafka connect success") + for (String kafkaCsvTopic in kafkaCsvTpoics) { def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text def lines = txt.readLines() diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy index 33c047062d..5e2dab7b37 100644 --- a/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_with_sc.groovy @@ -36,9 +36,35 @@ suite("test_routine_load_with_sc","p0") { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // add timeout config + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + + // check conenction + def verifyKafkaConnection = { prod -> + try { + logger.info("=====try to connect Kafka========") + def partitions = prod.partitionsFor("__connection_verification_topic") + return partitions != null + } catch (Exception e) { + throw new Exception("Kafka connect fail: ${e.message}".toString()) + } + } // Create kafka producer def producer = new KafkaProducer<>(props) + try { + logger.info("Kafka connecting: ${kafka_broker}") + if (!verifyKafkaConnection(producer)) { + throw new Exception("can't get any kafka info") + } + } catch (Exception e) { + logger.error("FATAL: " + e.getMessage()) + producer.close() + throw e + } + logger.info("Kafka connect success") + for (String kafkaCsvTopic in kafkaCsvTpoics) { def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text def lines = txt.readLines() diff --git a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy index 6075dc20db..0a97f84c6e 100644 --- a/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy +++ b/regression-test/suites/load_p0/routine_load/test_show_routine_load.groovy @@ -34,9 +34,35 @@ suite("test_show_routine_load","p0") { props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + // add timeout config + props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000") + props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") + + // check conenction + def verifyKafkaConnection = { prod -> + try { + logger.info("=====try to connect Kafka========") + def partitions = prod.partitionsFor("__connection_verification_topic") + return partitions != null + } catch (Exception e) { + throw new Exception("Kafka connect fail: ${e.message}".toString()) + } + } // Create kafka producer def producer = new KafkaProducer<>(props) + try { + logger.info("Kafka connecting: ${kafka_broker}") + if (!verifyKafkaConnection(producer)) { + throw new Exception("can't get any kafka info") + } + } catch (Exception e) { + logger.error("FATAL: " + e.getMessage()) + producer.close() + throw e + } + logger.info("Kafka connect success") + for (String kafkaCsvTopic in kafkaCsvTpoics) { def txt = new File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text def lines = txt.readLines()