[improve](routine-load) optimize error msg when failed to fetch Kafka info #30298
This commit is contained in:
committed by
yiguolei
parent
0f81ecf415
commit
92d4ce31ae
@ -325,13 +325,15 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
|
||||
try {
|
||||
this.newCurrentKafkaPartition = getAllKafkaPartitions();
|
||||
} catch (Exception e) {
|
||||
String msg = e.getMessage()
|
||||
+ " may be Kafka properties set in job is error"
|
||||
+ " or no partition in this topic that should check Kafka";
|
||||
LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
|
||||
.add("error_msg", "Job failed to fetch all current partition with error " + e.getMessage())
|
||||
.add("error_msg", msg)
|
||||
.build(), e);
|
||||
if (this.state == JobState.NEED_SCHEDULE) {
|
||||
unprotectUpdateState(JobState.PAUSED,
|
||||
new ErrorReason(InternalErrorCode.PARTITIONS_ERR,
|
||||
"Job failed to fetch all current partition with error " + e.getMessage()),
|
||||
new ErrorReason(InternalErrorCode.PARTITIONS_ERR, msg),
|
||||
false /* not replay */);
|
||||
}
|
||||
}
|
||||
|
||||
@ -248,4 +248,47 @@ suite("test_routine_load_error","p0") {
|
||||
sql """ DROP TABLE IF EXISTS ${tableName} """
|
||||
}
|
||||
}
|
||||
|
||||
// test failed to fetch all current partition
|
||||
if (enabled != null && enabled.equalsIgnoreCase("true")) {
|
||||
def jobName = "invalid_topic"
|
||||
try {
|
||||
sql """
|
||||
CREATE ROUTINE LOAD ${jobName}
|
||||
COLUMNS TERMINATED BY "|"
|
||||
PROPERTIES
|
||||
(
|
||||
"max_batch_interval" = "5",
|
||||
"max_batch_rows" = "300000",
|
||||
"max_batch_size" = "209715200"
|
||||
)
|
||||
FROM KAFKA
|
||||
(
|
||||
"kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
|
||||
"kafka_topic" = "invalid_topic",
|
||||
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
|
||||
);
|
||||
"""
|
||||
sql "sync"
|
||||
|
||||
def count = 0
|
||||
while (true) {
|
||||
sleep(1000)
|
||||
def res = sql "show routine load for ${jobName}"
|
||||
def state = res[0][8].toString()
|
||||
if (state != "PAUSED") {
|
||||
count++
|
||||
if (count > 60) {
|
||||
assertEquals(1, 2)
|
||||
}
|
||||
continue;
|
||||
}
|
||||
log.info("reason of state changed: ${res[0][17].toString()}".toString())
|
||||
assertTrue(res[0][17].toString().contains("may be Kafka properties set in job is error or no partition in this topic that should check Kafka"))
|
||||
break;
|
||||
}
|
||||
} finally {
|
||||
sql "stop routine load for ${jobName}"
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user