diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 43ae98d8f7..201412027a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -765,7 +765,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { List> tmp = KafkaUtil.getLatestOffsets(id, taskId, getBrokerList(), getTopic(), getConvertedCustomProperties(), Lists.newArrayList(partitionIdToOffset.keySet())); for (Pair pair : tmp) { - cachedPartitionWithLatestOffsets.put(pair.first, pair.second); + if (pair.second >= cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE)) { + cachedPartitionWithLatestOffsets.put(pair.first, pair.second); + } else { + LOG.warn("Kafka offset fallback. partition: {}, cache offset: {}" + + " get latest offset: {}, task {}, job {}", + pair.first, cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE), + pair.second, taskId, id); + } } } catch (Exception e) { // It needs to pause job when can not get partition meta.