[fix](routine-load) update partition offset cache timely to avoid negative lag #30455
This commit is contained in:
committed by
yiguolei
parent
7f0d3d9dcb
commit
7935dc9fae
@ -707,6 +707,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
|
||||
// check if given partitions has more data to consume.
|
||||
// 'partitionIdToOffset' to the offset to be consumed.
|
||||
public boolean hasMoreDataToConsume(UUID taskId, Map<Integer, Long> partitionIdToOffset) throws UserException {
|
||||
boolean needUpdateCache = false;
|
||||
// it is need check all partitions, for some partitions offset may be out of time
|
||||
for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
|
||||
if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey())
|
||||
&& entry.getValue() < cachedPartitionWithLatestOffsets.get(entry.getKey())) {
|
||||
@ -717,9 +719,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
|
||||
// query_watermark_offsets() will return 4.)
|
||||
LOG.debug("has more data to consume. offsets to be consumed: {}, latest offsets: {}, task {}, job {}",
|
||||
partitionIdToOffset, cachedPartitionWithLatestOffsets, taskId, id);
|
||||
return true;
|
||||
} else {
|
||||
needUpdateCache = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (needUpdateCache == false) {
|
||||
return true;
|
||||
}
|
||||
|
||||
try {
|
||||
// all offsets to be consumed are newer than offsets in cachedPartitionWithLatestOffsets,
|
||||
|
||||
Reference in New Issue
Block a user