From 7935dc9fae4bca7852c9b3ca7f3670f2459a657c Mon Sep 17 00:00:00 2001 From: HHoflittlefish777 <77738092+HHoflittlefish777@users.noreply.github.com> Date: Thu, 1 Feb 2024 18:35:11 +0800 Subject: [PATCH] [fix](routine-load) update partition offset cache timely to avoid negative lag #30455 --- .../doris/load/routineload/KafkaRoutineLoadJob.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 77171b6a4c..2f4d250930 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -707,6 +707,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { // check if given partitions has more data to consume. // 'partitionIdToOffset' to the offset to be consumed. public boolean hasMoreDataToConsume(UUID taskId, Map partitionIdToOffset) throws UserException { + boolean needUpdateCache = false; + // it is need check all partitions, for some partitions offset may be out of time for (Map.Entry entry : partitionIdToOffset.entrySet()) { if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey()) && entry.getValue() < cachedPartitionWithLatestOffsets.get(entry.getKey())) { @@ -717,9 +719,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { // query_watermark_offsets() will return 4.) LOG.debug("has more data to consume. offsets to be consumed: {}, latest offsets: {}, task {}, job {}", partitionIdToOffset, cachedPartitionWithLatestOffsets, taskId, id); - return true; + } else { + needUpdateCache = true; + break; } } + if (needUpdateCache == false) { + return true; + } try { // all offsets to be consumed are newer than offsets in cachedPartitionWithLatestOffsets,