[fix](routine-load) fix routine load lag is negative (#34113)
* [fix](routine-load) fix routine load lag is negative (#33846) * fix merge error
This commit is contained in:
committed by
GitHub
parent
e3ed861e4b
commit
b873be6588
@ -85,6 +85,10 @@ public class KafkaProgress extends RoutineLoadProgress {
|
||||
return partitionIdToOffset.get(kafkaPartition);
|
||||
}
|
||||
|
||||
public Map<Integer, Long> getOffsetByPartition() {
|
||||
return partitionIdToOffset;
|
||||
}
|
||||
|
||||
public boolean containsPartition(Integer kafkaPartition) {
|
||||
return partitionIdToOffset.containsKey(kafkaPartition);
|
||||
}
|
||||
|
||||
@ -285,16 +285,27 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
|
||||
return false;
|
||||
}
|
||||
|
||||
private void updateProgressAndOffsetsCache(RLTaskTxnCommitAttachment attachment) {
|
||||
((KafkaProgress) attachment.getProgress()).getOffsetByPartition().entrySet().stream()
|
||||
.forEach(entity -> {
|
||||
if (cachedPartitionWithLatestOffsets.containsKey(entity.getKey())
|
||||
&& cachedPartitionWithLatestOffsets.get(entity.getKey()) < entity.getValue() + 1) {
|
||||
cachedPartitionWithLatestOffsets.put(entity.getKey(), entity.getValue() + 1);
|
||||
}
|
||||
});
|
||||
this.progress.update(attachment);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws UserException {
|
||||
super.updateProgress(attachment);
|
||||
this.progress.update(attachment);
|
||||
updateProgressAndOffsetsCache(attachment);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void replayUpdateProgress(RLTaskTxnCommitAttachment attachment) {
|
||||
super.replayUpdateProgress(attachment);
|
||||
this.progress.update(attachment);
|
||||
updateProgressAndOffsetsCache(attachment);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user