[branch-2.1](routine-load) dealing with the high watermark of Kafka may fallback (#37372)
pick #35901
This commit is contained in:
@ -765,7 +765,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
|
||||
List<Pair<Integer, Long>> tmp = KafkaUtil.getLatestOffsets(id, taskId, getBrokerList(),
|
||||
getTopic(), getConvertedCustomProperties(), Lists.newArrayList(partitionIdToOffset.keySet()));
|
||||
for (Pair<Integer, Long> pair : tmp) {
|
||||
cachedPartitionWithLatestOffsets.put(pair.first, pair.second);
|
||||
if (pair.second >= cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE)) {
|
||||
cachedPartitionWithLatestOffsets.put(pair.first, pair.second);
|
||||
} else {
|
||||
LOG.warn("Kafka offset fallback. partition: {}, cache offset: {}"
|
||||
+ " get latest offset: {}, task {}, job {}",
|
||||
pair.first, cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE),
|
||||
pair.second, taskId, id);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// It needs to pause job when can not get partition meta.
|
||||
|
||||
Reference in New Issue
Block a user