[improve](routine-load) timely pause job if Kafka cluster exception when consume (#33372)

This commit is contained in:
HHoflittlefish777
2024-04-10 14:53:51 +08:00
committed by yiguolei
parent 26d9082b9a
commit d4a67d93f3

View File

@ -746,7 +746,13 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
cachedPartitionWithLatestOffsets.put(pair.first, pair.second);
}
} catch (Exception e) {
LOG.warn("failed to get latest partition offset. {}", e.getMessage(), e);
// It needs to pause job when can not get partition meta.
// To ensure the stability of the routine load,
// the scheduler will automatically pull up routine load job in this scenario,
// to avoid some network and Kafka exceptions causing the routine load job to stop
updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.PARTITIONS_ERR,
"failed to get latest partition offset. {}" + e.getMessage()),
false /* not replay */);
return false;
}