From af960f7c70f8569d856b30be36515417bf8dbc67 Mon Sep 17 00:00:00 2001 From: hui lai <1353307710@qq.com> Date: Sun, 7 Jul 2024 18:15:54 +0800 Subject: [PATCH] [branch-2.1](routine-load) dealing with the high watermark of Kafka may fallback (#37372) pick #35901 --- .../doris/load/routineload/KafkaRoutineLoadJob.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 43ae98d8f7..201412027a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -765,7 +765,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { List> tmp = KafkaUtil.getLatestOffsets(id, taskId, getBrokerList(), getTopic(), getConvertedCustomProperties(), Lists.newArrayList(partitionIdToOffset.keySet())); for (Pair pair : tmp) { - cachedPartitionWithLatestOffsets.put(pair.first, pair.second); + if (pair.second >= cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE)) { + cachedPartitionWithLatestOffsets.put(pair.first, pair.second); + } else { + LOG.warn("Kafka offset fallback. partition: {}, cache offset: {}" + + " get latest offset: {}, task {}, job {}", + pair.first, cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE), + pair.second, taskId, id); + } } } catch (Exception e) { // It needs to pause job when can not get partition meta.